Calendar Day 5

Project: Building a Recommendation System

Bring together dense, sparse, and multivectors in one atomic Universal Query. You’ll retrieve candidates, fuse signals, rerank with ColBERT, and apply business filters - in a single request.

Your Mission

Build a complete recommendation system using Qdrant’s Universal Query API with dense, sparse, and ColBERT multivectors in one request.

Estimated Time: 90 minutes

What You’ll Build

A hybrid recommendation system using:

  • Multi-vector architecture with dense, sparse, and ColBERT vectors
  • Universal Query API for atomic multi-stage search
  • RRF fusion for combining candidates
  • ColBERT reranking for fine-grained relevance scoring
  • Business rule filtering at multiple pipeline stages
  • Production-ready patterns for recommendation systems

Setup

Prerequisites

  • Qdrant Cloud cluster (URL + API key)
  • Python 3.9+ (or Google Colab)
  • Packages: qdrant-client, fastembed

Models

  • Dense: sentence-transformers/all-MiniLM-L6-v2 (384-dim)
  • Sparse: prithivida/Splade_PP_en_v1 (SPLADE)
  • Multivector: colbert-ir/colbertv2.0 (128-dim tokens)

Dataset

  • Scope: A small set of sample items (e.g., 10-20 movies).
  • Payload Fields: title, description, category, genre, year, rating, user_segment, popularity_score, release_date.
  • Filters Used: category, user_segment, release_date, popularity_score.

Build Steps

Step 1: Set Up the Hybrid Collection

Initialize Client and Collection

First, connect to Qdrant and create a clean collection for our recommendation system:

from datetime import datetime
from qdrant_client import QdrantClient, models
import os

client = QdrantClient(url=os.getenv("QDRANT_URL"), api_key=os.getenv("QDRANT_API_KEY"))

# For Colab:
# from google.colab import userdata
# client = QdrantClient(url=userdata.get("QDRANT_URL"), api_key=userdata.get("QDRANT_API_KEY"))

collection_name = "day5_recommendations_hybrid"

# Clean state
if client.collection_exists(collection_name=collection_name):
    client.delete_collection(collection_name=collection_name)

Now configure a collection with three vectors - each serving a different purpose in our recommendation pipeline:

client.create_collection(
    collection_name=collection_name,
    vectors_config={
        # Dense vectors for semantic understanding
        "dense": models.VectorParams(size=384, distance=models.Distance.COSINE),
        # ColBERT multivectors for fine-grained reranking
        "colbert": models.VectorParams(
            size=128,
            distance=models.Distance.COSINE,
            multivector_config=models.MultiVectorConfig(
                comparator=models.MultiVectorComparator.MAX_SIM
            ),
            hnsw_config=models.HnswConfigDiff(
                m=0  # Disable HNSW - used only for reranking
            ),
        ),
    },
    sparse_vectors_config={
        # Sparse vectors for exact keyword matching
        "sparse": models.SparseVectorParams(
            index=models.SparseIndexParams(on_disk=False)
        )
    },
)

Why this setup: ColBERT uses MAX_SIM for token-level comparison and m=0 since it’s only used for reranking, not initial retrieval, and setting m=0 effectively disables HNSW indexing to save memory and compute.

Create Payload Indexes

Before ingesting data, create indexes for the fields we’ll filter by. This enables efficient filtering during vector search:

# Business metadata indexes
client.create_payload_index(
    collection_name=collection_name,
    field_name="category",
    field_schema="keyword",
)
client.create_payload_index(
    collection_name=collection_name,
    field_name="user_segment",
    field_schema="keyword",
)

# Quality and recency indexes
client.create_payload_index(
    collection_name=collection_name,
    field_name="release_date",
    field_schema="datetime",
)
client.create_payload_index(
    collection_name=collection_name,
    field_name="popularity_score",
    field_schema="float",
)
client.create_payload_index(
    collection_name=collection_name,
    field_name="rating",
    field_schema="float",
)

Step 2: Prepare and Upload Recommendation Data

Create Sample Recommendation Data

Let’s create sample movie data with business metadata for filtering:

# Example: Create sample movie/content data
sample_data = [
    {
        "title": "The Matrix",
        "description": "A computer hacker learns about the true nature of reality and his role in the war against its controllers.",
        "category": "movie",
        "genre": ["sci-fi", "action"],
        "year": 1999,
        "rating": 8.7,
        "user_segment": "premium",
        "popularity_score": 0.9,
        "release_date": "1999-03-31T00:00:00Z",
    },
    # Add more sample data...
]

texts = [it["description"] for it in sample_data]

Initialize Embedding Models

We’ll use FastEmbed to generate all three vector types:

from fastembed import TextEmbedding, SparseTextEmbedding, LateInteractionTextEmbedding

# Model configurations
DENSE_MODEL_ID = "sentence-transformers/all-MiniLM-L6-v2"  # 384-dim
SPARSE_MODEL_ID = "prithivida/Splade_PP_en_v1"  # SPLADE sparse
COLBERT_MODEL_ID = "colbert-ir/colbertv2.0"  # 128-dim multivector

dense_model = TextEmbedding(DENSE_MODEL_ID)
sparse_model = SparseTextEmbedding(SPARSE_MODEL_ID)
colbert_model = LateInteractionTextEmbedding(COLBERT_MODEL_ID)

Generate Embeddings

Encode all descriptions with all three embedding models:

# Generate embeddings for all items
dense_embeds = list(
    dense_model.embed(texts, parallel=0)
)  # list[np.ndarray] shape (384,)

sparse_embeds = list(
    sparse_model.embed(texts, parallel=0)
)  # list[SparseEmbedding] with .indices/.values

colbert_multivectors = list(
    colbert_model.embed(texts, parallel=0)
)  # list[np.ndarray] shape (tokens, 128)

Create Points and Upload

Now package everything into points and upload to Qdrant:

# Generate vectors for each item
points = []
for i, item in enumerate(sample_data):
    # Create sparse vector (keyword matching)
    sparse_vector = sparse_embeds[i].as_object()

    # Create dense vector (semantic understanding)
    dense_vector = dense_embeds[i]

    # Create ColBERT multivector (token-level understanding)
    colbert_vector = colbert_multivectors[i]

    points.append(
        models.PointStruct(
            id=i,
            vector={
                "dense": dense_vector,
                "sparse": sparse_vector,
                "colbert": colbert_vector,
            },
            payload=item,
        )
    )

client.upload_points(collection_name=collection_name, points=points)
print(f"Uploaded {len(points)} recommendation items")

Step 3: One Universal Query — Retrieve → Fuse → Rerank → Filter

Now let’s build a complete multi-stage search that combines everything in a single request.

Prepare Query Embeddings

First, encode the user’s search intent using all three embedding models:

from datetime import timedelta

# Example user intent
user_query = "premium user likes sci-fi action movies with strong hacker themes"

user_dense_vector = next(dense_model.query_embed(user_query))
user_sparse_vector = next(sparse_model.query_embed(user_query)).as_object()
user_multivector = next(colbert_model.query_embed(user_query))

Define Global Filter with Automatic Propagation

Create a single filter that will automatically propagate to all stages of the pipeline:

# Global filter - this will be propagated to ALL prefetch stages
global_filter = models.Filter(
    must=[
        # Content type and user segment
        models.FieldCondition(
            key="category", match=models.MatchValue(value="movie")
        ),
        models.FieldCondition(
            key="user_segment", match=models.MatchValue(value="premium")
        ),
        # Quality and recency constraints
        models.FieldCondition(
            key="release_date",
            range=models.DatetimeRange(
                gte=(datetime.now() - timedelta(days=365 * 30)).isoformat()
            ),
        ),
        models.FieldCondition(key="popularity_score", range=models.Range(gte=0.7)),
    ]
)

Key insight: This filter automatically propagates to ALL prefetch stages. Qdrant doesn’t do “late filtering” or “post-filtering” - filters are applied at the HNSW search level for maximum efficiency.

Set Up Prefetch and Fusion

Configure parallel retrieval and fusion strategy:

# Prefetch queries - global filter will be automatically applied to both
hybrid_query = [
    models.Prefetch(query=user_dense_vector, using="dense", limit=100),
    models.Prefetch(query=user_sparse_vector, using="sparse", limit=100),
]

# Fusion stage - combine candidates with RRF
fusion_query = models.Prefetch(
    prefetch=hybrid_query,
    query=models.FusionQuery(fusion=models.Fusion.RRF),
    limit=100,
)

These prefetch queries run in parallel, and the global filter from the main query will automatically propagate to both dense and sparse searches.

Execute Universal Query with Reranking

Finally, send the complete query with ColBERT reranking:

# The Universal Query: Global filter propagates through all stages
response = client.query_points(
    collection_name=collection_name,
    prefetch=fusion_query,
    query=user_multivector,
    using="colbert",
    query_filter=global_filter,  # Propagates to all prefetch stages
    limit=10,
    with_payload=True,
)

for hit in response.points or []:
    print(hit.payload)

Why this works: Dense and sparse retrieval happen in parallel (with filters applied), RRF fuses the results, ColBERT reranks with token-level precision, and the global filter is applied at every stage via automatic propagation - all in one atomic API call.

Step 4: Build a Recommendation Service

Now let’s wrap everything into a production-ready function that handles dynamic user preferences.

Helper Function for Building Filters

First, create a reusable helper function to build filters from user profiles:

def build_recommendation_filter(user_profile, user_preference=None):
    """
    Build a global filter from user profile and preferences.
    This filter will automatically propagate to all prefetch stages.

    Args:
        user_profile: {
            "liked_titles": list[str],      # optional
            "preferred_genres": list[str],  # e.g. ["sci-fi","action"]
            "segment": str,                 # e.g. "premium"
            "query": str                    # free-text intent, e.g. "smart sci-fi with hacker vibe"
        }
        user_preference: {
            "category": str | None,         # e.g. "movie"
            "min_rating": float | None,     # e.g. 8.0
            "released_within_days": int | None  # e.g. 365
        }

    Returns:
        models.Filter object or None if no conditions
    """
    from datetime import datetime, timedelta

    filter_conditions = []

    # User segment filtering
    if user_profile.get("segment"):
        filter_conditions.append(
            models.FieldCondition(
                key="user_segment",
                match=models.MatchValue(value=user_profile["segment"]),
            )
        )

    if user_preference:
        # Category filtering
        if user_preference.get("category"):
            filter_conditions.append(
                models.FieldCondition(
                    key="category",
                    match=models.MatchValue(value=user_preference["category"]),
                )
            )

        # Rating filtering
        if user_preference.get("min_rating") is not None:
            filter_conditions.append(
                models.FieldCondition(
                    key="rating",
                    range=models.Range(gte=user_preference["min_rating"])
                )
            )

        # Recency filtering
        if user_preference.get("released_within_days"):
            days = int(user_preference["released_within_days"])
            filter_conditions.append(
                models.FieldCondition(
                    key="release_date",
                    range=models.DatetimeRange(
                        gte=(datetime.utcnow() - timedelta(days=days)).isoformat()
                    ),
                )
            )

    return models.Filter(must=filter_conditions) if filter_conditions else None

Recommendation Function

Now create the main recommendation function using the helper:

def get_recommendations(user_profile, user_preference=None, limit=10):
    """
    Get personalized recommendations using Universal Query API

    Args:
        user_profile: {
            "liked_titles": list[str],      # optional
            "preferred_genres": list[str],  # e.g. ["sci-fi","action"]
            "segment": str,                 # e.g. "premium"
            "query": str                    # free-text intent, e.g. "smart sci-fi with hacker vibe"
        }
        user_preference: {
            "category": str | None,         # e.g. "movie"
            "min_rating": float | None,     # e.g. 8.0
            "released_within_days": int | None  # e.g. 365
        }
        limit: top-k to return
    """

    # Generate query embeddings
    user_dense_vector = next(dense_model.query_embed(user_profile["query"]))
    user_sparse_vector = next(
        sparse_model.query_embed(user_profile["query"])
    ).as_object()
    user_multivector = next(colbert_model.query_embed(user_profile["query"]))

    # Build global filter using helper function
    global_filter = build_recommendation_filter(user_profile, user_preference)

    # Prefetch queries - global filter will propagate automatically
    hybrid_query = [
        models.Prefetch(query=user_dense_vector, using="dense", limit=100),
        models.Prefetch(query=user_sparse_vector, using="sparse", limit=100),
    ]

    # Combine candidates with RRF
    fusion_query = models.Prefetch(
        prefetch=hybrid_query,
        query=models.FusionQuery(fusion=models.Fusion.RRF),
        limit=100,
    )

    # Universal query - global filter propagates to all stages
    response = client.query_points(
        collection_name=collection_name,
        prefetch=fusion_query,
        query=user_multivector,
        using="colbert",
        query_filter=global_filter,  # Propagates to all prefetch stages
        limit=limit,
        with_payload=True,
    )

    return [
        {
            "title": hit.payload["title"],
            "description": hit.payload["description"],
            "score": hit.score,
            "metadata": {
                k: v
                for k, v in hit.payload.items()
                if k not in ["title", "description"]
            },
        }
        for hit in (response.points or [])
    ]

Test the Service

Let’s test the recommendation function:

# Test the recommendation service
user_profile = {
    "liked_titles": ["The Matrix", "Blade Runner"],
    "preferred_genres": ["sci-fi", "action"],
    "segment": "premium",
    "query": "highly rated cyberpunk movies",
}

recommendations = get_recommendations(
    user_profile,
    user_preference={
        "category": "movie",
        "min_rating": 8.0,
        "released_within_days": 365 * 30,
    },
    limit=10,
)

for i, rec in enumerate(recommendations, 1):
    print(f"{i}. {rec['title']} (Score: {rec['score']:.3f})")

What happened under the hood: Qdrant retrieved 100 candidates from dense and 100 from sparse in parallel, fused them with RRF, reranked with ColBERT’s MaxSim over token‑level subvectors, applied final business filters, and returned the top 10 - all in one call.

Success Criteria

You’ll know you’ve succeeded when:

Your collection contains dense, sparse, and ColBERT vectors
You can execute complex multi-stage searches in a single API call
RRF fusion effectively combines different vector types
ColBERT reranking improves result relevance
Business filters propagate automatically to all prefetch stages
Your recommendation service provides personalized, high-quality results

Share Your Discovery

Step 1: Reflect on Your Findings

  1. How did the Universal Query API simplify your recommendation pipeline (fewer calls, fewer joins, less code)?
  2. Which fusion strategy (RRF vs. DBSF) ranked better for your queries?
  3. What was the effect of ColBERT reranking on recommendation quality (top-k changes, click-like signals, nDCG/MRR)?
  4. What was the latency impact of multi-stage filtering (prefetch, rerank, total)?

Step 2: Post Your Results

Post your results in Post your results in Discord using this:

**[Day N] Recommendations with the Universal Query API**

**High-Level Summary**
- **Domain:** "I built recommendations for [domain]"
- **Key Result:** "Using [RRF/DBSF] + ColBERT rerank improved [metric] to [value] with total latency [Z] ms."

**Reproducibility**
- **Collection:** [name]
- **Models:** dense=[id, dim], sparse=[method], colbert=[id, dim]
- **Dataset:** [N items] (snapshot: YYYY-MM-DD)

**Settings (today)**
- **Fusion:** [RRF/DBSF], k_dense=[..], k_sparse=[..]
- **Reranker:** ColBERT (MaxSim), top-k=[..]
- **Filters:** category=[...], segment=[...], rating≥..., release_date≥...
- **Filter propagation:** applied to all prefetch stages

**Results (demo query: "[user intent]")**
- **Top picks (rank → title → score):**
  1) ...
  2) ...
  3) ...
- **Why these won:** [token hit like “hacker”, genre match, rating signal]
- **Latency:** prefetch ~[X] ms | rerank ~[Y] ms | total ~[Z] ms
- **Dropped by rules:** [ids/titles → which rule]

**Surprise**
- "[one thing you didn’t expect]"

**Next step**
- "[what you’ll try next]"

Optional: Go Further

Experiment with Fusion Strategies

Compare RRF with Distribution-Based Score Fusion:

# Test DBSF vs RRF
# Filters and other prefetch params same as above

fusion_query = models.Prefetch(
    prefetch=hybrid_query,
    query=models.FusionQuery(fusion=models.Fusion.DBSF),
    limit=100,
)

response = client.query_points(
    collection_name=collection_name,
    prefetch=fusion_query,
    query=user_multivector,
    using="colbert",
    query_filter=global_filter,  # Same global filter propagates to all stages
    limit=10,
    with_payload=True,
)

for hit in response.points or []:
    print(hit.payload)

A/B Testing Framework

Build a framework to systematically compare fusion strategies across multiple user profiles.

Initialize Testing Function

Set up the function structure and results tracking:

def ab_test_fusion_strategies(user_profiles, user_preferences):
    """Compare RRF vs DBSF performance"""

    results = {"RRF": [], "DBSF": []}

    for user_profile, user_preference in zip(user_profiles, user_preferences):
        for fusion_type in [models.Fusion.RRF, models.Fusion.DBSF]:
            # Run recommendation query
            user_dense_vector = next(dense_model.query_embed(user_profile["query"]))
            user_sparse_vector = next(
                sparse_model.query_embed(user_profile["query"])
            ).as_object()
            user_multivector = next(colbert_model.query_embed(user_profile["query"]))

            # Build global filter using helper function
            global_filter = build_recommendation_filter(user_profile, user_preference)
            
            # Prefetch queries - global filter will propagate automatically
            hybrid_query = [
                models.Prefetch(query=user_dense_vector, using="dense", limit=100),
                models.Prefetch(query=user_sparse_vector, using="sparse", limit=100),
            ]

            # Combine candidates with chosen fusion strategy
            fusion_query = models.Prefetch(
                prefetch=hybrid_query,
                query=models.FusionQuery(fusion=fusion_type),
                limit=100,
            )

            response = client.query_points(
                collection_name=collection_name,
                prefetch=fusion_query,
                query=user_multivector,
                using="colbert",
                query_filter=global_filter,  # Propagates to all prefetch stages
                limit=10,
                with_payload=True,
            )

            strategy_name = "RRF" if fusion_type == models.Fusion.RRF else "DBSF"
            results[strategy_name].append(
                {
                    "user_id": user_profile["user_id"],
                    "recommendations": [hit.id for hit in response.points],
                    "scores": [hit.score for hit in response.points],
                }
            )

    return results

Next Steps

Turn this into a mini‑service: ingest your own items and user signals, write a tiny function that takes profile vectors and returns top‑k via the Universal Query API, then experiment with filters and fusion strategies (try DBSF vs RRF) to see how ranking shifts.