Qdrant on Databricks

Time: 30 minLevel: IntermediateComplete Notebook

Databricks is a unified analytics platform for working with big data and AI. It’s built around Apache Spark, a powerful open-source distributed computing system well-suited for processing large-scale datasets and performing complex analytics tasks.

Apache Spark is designed to scale horizontally, meaning it can handle expensive operations like generating vector embeddings by distributing computation across a cluster of machines. This scalability is crucial when dealing with large datasets.

In this example, we will demonstrate how to vectorize a dataset with dense and sparse embeddings using Qdrant’s FastEmbed library. We will then load this vectorized data into a Qdrant cluster using the Qdrant Spark connector on Databricks.

Setting up a Databricks project

  • Set up a Databricks cluster following the official documentation guidelines.

  • Install the Qdrant Spark connector as a library:

    • Navigate to the Libraries section in your cluster dashboard.

    • Click on Install New at the top-right to open the library installation modal.

    • Search for io.qdrant:spark:VERSION in the Maven packages and click on Install.

      Install the library

  • Create a new Databricks notebook on your cluster to begin working with your data and libraries.

Download a dataset

  • Install the required dependencies:
%pip install fastembed datasets
  • Download the dataset:
from datasets import load_dataset

dataset_name = "tasksource/med"
dataset = load_dataset(dataset_name, split="train")
# We'll use the first 100 entries from this dataset and exclude some unused columns.
dataset = dataset.select(range(100)).remove_columns(["gold_label", "genre"])
  • Convert the dataset into a Spark dataframe:
dataset.to_parquet("/dbfs/pq.pq")
dataset_df = spark.read.parquet("file:/dbfs/pq.pq")

Vectorizing the data

In this section, we’ll be generating both dense and sparse vectors for our rows using FastEmbed. We’ll create a user-defined function (UDF) to handle this step.

Creating the vectorization function

from fastembed import TextEmbedding, SparseTextEmbedding

def vectorize(partition_data):
    # Initialize dense and sparse models
    dense_model = TextEmbedding(model_name="BAAI/bge-small-en-v1.5")
    sparse_model = SparseTextEmbedding(model_name="prithivida/Splade_PP_en_v1")

    for row in partition_data:
        # Generate dense and sparse vectors
        dense_vector = next(dense_model.embed(row.sentence1))
        sparse_vector = next(sparse_model.embed(row.sentence2))

        yield [
            row.sentence1,  # 1st column: original text
            row.sentence2,  # 2nd column: original text
            dense_vector.tolist(),  # 3rd column: dense vector
            sparse_vector.indices.tolist(),  # 4th column: sparse vector indices
            sparse_vector.values.tolist(),  # 5th column: sparse vector values
        ]

We’re using the BAAI/bge-small-en-v1.5 model for dense embeddings and prithivida/Splade_PP_en_v1 for sparse embeddings.

Applying the UDF on our dataframe

Next, let’s apply our vectorize UDF on our Spark dataframe to generate embeddings.

embeddings = dataset_df.rdd.mapPartitions(vectorize)

The mapPartitions() method returns a Resilient Distributed Dataset (RDD) which should then be converted back to a Spark dataframe.

Building the new Spark dataframe with the vectorized data

We’ll now create a new Spark dataframe (embeddings_df) with the vectorized data using the specified schema.

from pyspark.sql.types import StructType, StructField, StringType, ArrayType, FloatType, IntegerType

# Define the schema for the new dataframe
schema = StructType([
    StructField("sentence1", StringType()),
    StructField("sentence2", StringType()),
    StructField("dense_vector", ArrayType(FloatType())),
    StructField("sparse_vector_indices", ArrayType(IntegerType())),
    StructField("sparse_vector_values", ArrayType(FloatType()))
])

# Create the new dataframe with the vectorized data
embeddings_df = spark.createDataFrame(data=embeddings, schema=schema)

Uploading the data to Qdrant

  • Create a Qdrant collection:

    • Follow the documentation to create a collection with the appropriate configurations. Here’s an example request to support both dense and sparse vectors:
    PUT /collections/{collection_name}
    {
      "vectors": {
        "dense": {
          "size": 384,
          "distance": "Cosine"
        }
      },
      "sparse_vectors": {
        "sparse": {}
      }
    }
    
  • Upload the dataframe to Qdrant:

options = {
    "qdrant_url": "<QDRANT_GRPC_URL>",
    "api_key": "<QDRANT_API_KEY>",
    "collection_name": "<QDRANT_COLLECTION_NAME>",
    "vector_fields": "dense_vector",
    "vector_names": "dense",
    "sparse_vector_value_fields": "sparse_vector_values",
    "sparse_vector_index_fields": "sparse_vector_indices",
    "sparse_vector_names": "sparse",
    "schema": embeddings_df.schema.json(),
}

embeddings_df.write.format("io.qdrant.spark.Qdrant").options(**options).mode(
    "append"
).save()

Ensure to replace the placeholder values (<QDRANT_GRPC_URL>, <QDRANT_API_KEY>, <QDRANT_COLLECTION_NAME>) with your actual values. If the id_field option is not specified, Qdrant Spark connector generates random UUIDs for each point.

The command output you should see is similar to:

Command took 40.37 seconds -- by xxxxx90@xxxxxx.com at 4/17/2024, 12:13:28 PM on fastembed

Conclusion

That wraps up our tutorial! Feel free to explore more functionalities and experiments with different models, parameters, and features available in Databricks, Spark, and Qdrant.

Happy data engineering!