Apache Spark

Spark is a distributed computing framework designed for big data processing and analytics. The Qdrant-Spark connector enables Qdrant to be a storage destination in Spark.

Installation

To integrate the connector into your Spark environment, get the JAR file from one of the sources listed below.

  • GitHub Releases

The packaged jar file with all the required dependencies can be found here.

  • Building from Source

To build the jar from source, you need JDK@8 and Maven installed. Once the requirements have been satisfied, run the following command in the project root.

mvn package -DskipTests

The JAR file will be written into the target directory by default.

  • Maven Central

Find the project on Maven Central here.

Usage

Creating a Spark session with Qdrant support

pythonscalajava
from pyspark.sql import SparkSession

spark = SparkSession.builder.config(
        "spark.jars",
        "path/to/file/spark-VERSION.jar",  # Specify the path to the downloaded JAR file
    )
    .master("local[*]")
    .appName("qdrant")
    .getOrCreate()
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder
  .config("spark.jars", "path/to/file/spark-VERSION.jar") // Specify the path to the downloaded JAR file
  .master("local[*]")
  .appName("qdrant")
  .getOrCreate()
import org.apache.spark.sql.SparkSession;

public class QdrantSparkJavaExample {
    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .config("spark.jars", "path/to/file/spark-VERSION.jar") // Specify the path to the downloaded JAR file
                .master("local[*]")
                .appName("qdrant")
                .getOrCreate(); 
    }
}

Loading data

Before loading the data using this connector, a collection has to be created in advance with the appropriate vector dimensions and configurations.

The connector supports ingesting multiple named/unnamed, dense/sparse vectors.

Click each to expand.

Unnamed/Default vector
  <pyspark.sql.DataFrame>
   .write
   .format("io.qdrant.spark.Qdrant")
   .option("qdrant_url", <QDRANT_GRPC_URL>)
   .option("collection_name", <QDRANT_COLLECTION_NAME>)
   .option("embedding_field", <EMBEDDING_FIELD_NAME>)  # Expected to be a field of type ArrayType(FloatType)
   .option("schema", <pyspark.sql.DataFrame>.schema.json())
   .mode("append")
   .save()
Named vector
  <pyspark.sql.DataFrame>
   .write
   .format("io.qdrant.spark.Qdrant")
   .option("qdrant_url", <QDRANT_GRPC_URL>)
   .option("collection_name", <QDRANT_COLLECTION_NAME>)
   .option("embedding_field", <EMBEDDING_FIELD_NAME>)  # Expected to be a field of type ArrayType(FloatType)
   .option("vector_name", <VECTOR_NAME>)
   .option("schema", <pyspark.sql.DataFrame>.schema.json())
   .mode("append")
   .save()

NOTE

The embedding_field and vector_name options are maintained for backward compatibility. It is recommended to use vector_fields and vector_names for named vectors as shown below.

Multiple named vectors
  <pyspark.sql.DataFrame>
   .write
   .format("io.qdrant.spark.Qdrant")
   .option("qdrant_url", "<QDRANT_GRPC_URL>")
   .option("collection_name", "<QDRANT_COLLECTION_NAME>")
   .option("vector_fields", "<COLUMN_NAME>,<ANOTHER_COLUMN_NAME>")
   .option("vector_names", "<VECTOR_NAME>,<ANOTHER_VECTOR_NAME>")
   .option("schema", <pyspark.sql.DataFrame>.schema.json())
   .mode("append")
   .save()
Sparse vectors
  <pyspark.sql.DataFrame>
   .write
   .format("io.qdrant.spark.Qdrant")
   .option("qdrant_url", "<QDRANT_GRPC_URL>")
   .option("collection_name", "<QDRANT_COLLECTION_NAME>")
   .option("sparse_vector_value_fields", "<COLUMN_NAME>")
   .option("sparse_vector_index_fields", "<COLUMN_NAME>")
   .option("sparse_vector_names", "<SPARSE_VECTOR_NAME>")
   .option("schema", <pyspark.sql.DataFrame>.schema.json())
   .mode("append")
   .save()
Multiple sparse vectors
  <pyspark.sql.DataFrame>
   .write
   .format("io.qdrant.spark.Qdrant")
   .option("qdrant_url", "<QDRANT_GRPC_URL>")
   .option("collection_name", "<QDRANT_COLLECTION_NAME>")
   .option("sparse_vector_value_fields", "<COLUMN_NAME>,<ANOTHER_COLUMN_NAME>")
   .option("sparse_vector_index_fields", "<COLUMN_NAME>,<ANOTHER_COLUMN_NAME>")
   .option("sparse_vector_names", "<SPARSE_VECTOR_NAME>,<ANOTHER_SPARSE_VECTOR_NAME>")
   .option("schema", <pyspark.sql.DataFrame>.schema.json())
   .mode("append")
   .save()
Combination of named dense and sparse vectors
  <pyspark.sql.DataFrame>
   .write
   .format("io.qdrant.spark.Qdrant")
   .option("qdrant_url", "<QDRANT_GRPC_URL>")
   .option("collection_name", "<QDRANT_COLLECTION_NAME>")
   .option("vector_fields", "<COLUMN_NAME>,<ANOTHER_COLUMN_NAME>")
   .option("vector_names", "<VECTOR_NAME>,<ANOTHER_VECTOR_NAME>")
   .option("sparse_vector_value_fields", "<COLUMN_NAME>,<ANOTHER_COLUMN_NAME>")
   .option("sparse_vector_index_fields", "<COLUMN_NAME>,<ANOTHER_COLUMN_NAME>")
   .option("sparse_vector_names", "<SPARSE_VECTOR_NAME>,<ANOTHER_SPARSE_VECTOR_NAME>")
   .option("schema", <pyspark.sql.DataFrame>.schema.json())
   .mode("append")
   .save()
Multi-vectors
  <pyspark.sql.DataFrame>
   .write
   .format("io.qdrant.spark.Qdrant")
   .option("qdrant_url", "<QDRANT_GRPC_URL>")
   .option("collection_name", "<QDRANT_COLLECTION_NAME>")
   .option("multi_vector_fields", "<COLUMN_NAME>")
   .option("multi_vector_names", "<MULTI_VECTOR_NAME>")
   .option("schema", <pyspark.sql.DataFrame>.schema.json())
   .mode("append")
   .save()
Multiple Multi-vectors
  <pyspark.sql.DataFrame>
   .write
   .format("io.qdrant.spark.Qdrant")
   .option("qdrant_url", "<QDRANT_GRPC_URL>")
   .option("collection_name", "<QDRANT_COLLECTION_NAME>")
   .option("multi_vector_fields", "<COLUMN_NAME>,<ANOTHER_COLUMN_NAME>")
   .option("multi_vector_names", "<MULTI_VECTOR_NAME>,<ANOTHER_MULTI_VECTOR_NAME>")
   .option("schema", <pyspark.sql.DataFrame>.schema.json())
   .mode("append")
   .save()
No vectors - Entire dataframe is stored as payload
  <pyspark.sql.DataFrame>
   .write
   .format("io.qdrant.spark.Qdrant")
   .option("qdrant_url", "<QDRANT_GRPC_URL>")
   .option("collection_name", "<QDRANT_COLLECTION_NAME>")
   .option("schema", <pyspark.sql.DataFrame>.schema.json())
   .mode("append")
   .save()

Databricks

You can use the qdrant-spark connector as a library in Databricks.

  • Go to the Libraries section in your Databricks cluster dashboard.
  • Select Install New to open the library installation modal.
  • Search for io.qdrant:spark:VERSION in the Maven packages and click Install.

Databricks

Datatype support

The appropriate Spark data types are mapped to the Qdrant payload based on the provided schema.

Options and Spark types

OptionDescriptionColumn DataTypeRequired
qdrant_urlgRPC URL of the Qdrant instance. Eg: http://localhost:6334-
collection_nameName of the collection to write data into-
schemaJSON string of the dataframe schema-
embedding_fieldName of the column holding the embeddings (Deprecated - Use vector_fields instead)ArrayType(FloatType)
id_fieldName of the column holding the point IDs. Default: Random UUIDStringType or IntegerType
batch_sizeMax size of the upload batch. Default: 64-
retriesNumber of upload retries. Default: 3-
api_keyQdrant API key for authentication-
vector_nameName of the vector in the collection.-
vector_fieldsComma-separated names of columns holding the vectors.ArrayType(FloatType)
vector_namesComma-separated names of vectors in the collection.-
sparse_vector_index_fieldsComma-separated names of columns holding the sparse vector indices.ArrayType(IntegerType)
sparse_vector_value_fieldsComma-separated names of columns holding the sparse vector values.ArrayType(FloatType)
sparse_vector_namesComma-separated names of the sparse vectors in the collection.-
multi_vector_fieldsComma-separated names of columns holding the multi-vector values.ArrayType(ArrayType(FloatType))
multi_vector_namesComma-separated names of the multi-vectors in the collection.-
shard_key_selectorComma-separated names of custom shard keys to use during upsert.-
waitWait for each batch upsert to complete. true or false. Defaults to true.-

For more information, be sure to check out the Qdrant-Spark GitHub repository. The Apache Spark guide is available here. Happy data processing!

Was this page useful?

Thank you for your feedback! 🙏

We are sorry to hear that. 😔 You can edit this page on GitHub, or create a GitHub issue.

We use cookies to learn more about you. At any time you can delete or block cookies through your browser settings.

Learn moreI accept