graph TD
A["Client (SDK / REST)"]
B["Access Layer (Proxy nodes — load balancing, routing)"]
C["Coordinator Layer RootCoord · QueryCoord · DataCoord · IndexCoord"]
D["Worker Layer QueryNode · DataNode · IndexNode"]
E["Storage Layer etcd (metadata) · MinIO/S3 (object store) Message Queue (Pulsar/Kafka)"]
A --> B
B --> C
C --> D
D --> E
style A fill:#4A90D9,color:#fff,stroke:#2c6faa
style B fill:#5BA85A,color:#fff,stroke:#3d7a3d
style C fill:#E8A838,color:#fff,stroke:#b07a1a
style D fill:#D95F5F,color:#fff,stroke:#a03030
style E fill:#8B6BB1,color:#fff,stroke:#5c3d8a
Milvus for Computer Vision: An In-Depth Guide

Introduction
I’ve spent a fair bit of time working on computer vision systems — the kind that start small, manageable, and almost deceptively simple, and then quietly spiral in scale until the infrastructure holding them together starts creaking at the seams. For a while I was getting by with fairly standard approaches: storing image embeddings in flat files, querying with NumPy, eventually graduating to something like FAISS. It worked. Until it didn’t.
The turning point came when the dataset crossed a threshold where even approximate brute-force search started adding up to latency that was genuinely painful in production. I needed something that could handle tens of millions of vectors, support filtered queries alongside similarity search, and not require me to completely rebuild the data layer every few months as the system grew. That’s when I came across Milvus.
What struck me first was how deliberately it had been designed for exactly this class of problem. It wasn’t a general-purpose database with a vector plugin bolted on — it was built from the ground up around the idea that your primary query is “find me things that look like this,” and everything else (filtering, metadata, persistence, scalability) flows from that. Getting started was surprisingly approachable once I understood the core concepts, and scaling from a local prototype to a distributed deployment was far more incremental than I’d expected.
This guide is what I wish I’d had when I started. It covers Milvus from the very beginning — what vector databases are, how embeddings work, and why you need dedicated infrastructure for this kind of search — all the way through four real computer vision use cases, three deployment modes, and the performance tuning details that actually matter in practice. Whether you’re prototyping on a laptop or planning a production system handling billions of vectors, the path forward is here.
Table of Contents
- What Is a Vector Database — And Why Do You Need One?
- Introducing Milvus
- Core Concepts You Must Understand
- How Computer Vision Meets Vector Search
- Setting Up Your Environment
- Deployment Options: Lite → Docker → Kubernetes
- Working with Collections and Schemas
- Inserting Embedding Vectors
- Index Types and When to Use Each
- Querying and Searching
- Use Case 1 — Image Similarity Search
- Use Case 2 — Face Recognition
- Use Case 3 — Object Detection & Retrieval
- Use Case 4 — Video Frame Search
- Partitions, Filtering, and Hybrid Search
- Performance Tuning and Best Practices
- Security and Access Control
- Monitoring and Observability
- Common Pitfalls and How to Avoid Them
- Glossary
1. What Is a Vector Database — And Why Do You Need One?
The Problem with Traditional Databases
Traditional relational databases (PostgreSQL, MySQL, SQLite) store and retrieve data that is exactly defined — rows, columns, integers, strings, dates. When you want to find a user named “Alice,” you write:
SELECT * FROM users WHERE name = 'Alice';This works perfectly for exact matches. But computer vision operates in an entirely different paradigm. Imagine you have a photo of a dog and you want to find all similar-looking dogs in a database of one million photos. There is no exact match to look for. The question is not “find this exact image” — it is “find images that look like this image.”
Traditional databases cannot answer that question efficiently. You could compare pixel-by-pixel, but that would be catastrophically slow and would fail even for the same dog photographed twice under different lighting conditions.
The Role of Embeddings
The key insight that makes modern computer vision work is this: neural networks can compress the semantic meaning of an image into a compact numerical vector — called an embedding or feature vector.
An embedding is simply a list of floating-point numbers. For example, a 512-dimensional embedding is a list of 512 floats:
[0.023, -0.412, 0.881, 0.003, -0.667, ..., 0.142] # 512 values totalWhat makes embeddings magical is that neural networks learn to place semantically similar images close together in this high-dimensional space. Two photos of the same person, taken from different angles and lighting conditions, will produce embeddings that are numerically close to each other. A cat and a dog will be closer to each other than a cat and an airplane.
“Close” in this context is measured by mathematical distance functions:
- Cosine similarity — measures the angle between two vectors (ignores magnitude; good for normalized embeddings)
- Euclidean distance (L2) — measures the straight-line distance between two points in space
- Inner product (IP) — dot product; useful for recommendation systems and unnormalized embeddings
Why You Need a Dedicated Vector Database
Once you have millions of embeddings, you need to answer “find me the k nearest neighbors to this query vector” — this is called Approximate Nearest Neighbor (ANN) search — as quickly as possible.
A naive approach (compare query against every single vector) is called exact search or brute-force search. It works fine for thousands of vectors, but:
- At 1 million vectors of 512 dimensions, a brute-force search involves 512 million floating-point multiplications per query
- At 100 million vectors, this becomes computationally untenable for real-time applications
Vector databases solve this by building indexes — clever data structures that allow you to skip most of the comparisons and still find results that are very close to the true nearest neighbors. This is the “approximate” in ANN: you trade a small amount of accuracy for enormous speed gains.
Milvus is one of the most powerful, production-ready, and feature-rich open-source vector databases available today.
2. Introducing Milvus
What Is Milvus?
Milvus is an open-source vector database built specifically for storing, indexing, and searching high-dimensional vector embeddings at massive scale. It was originally created by Zilliz and donated to the Linux Foundation AI & Data.
Key properties of Milvus:
- Stores billions of vectors with sub-second query latency
- Supports multiple index algorithms (IVF, HNSW, FLAT, ScaNN, DiskANN, and more)
- Supports multiple distance metrics (L2, IP, Cosine)
- Has a rich filtering system — combine vector search with scalar attribute filters (like SQL WHERE clauses)
- Supports multi-tenancy through partitions and collections
- Offers three deployment modes: Milvus Lite (local, no server), Standalone (single-node Docker), and Distributed (Kubernetes cluster)
- First-class Python SDK (PyMilvus), plus SDKs for Go, Java, Node.js, and REST API
Milvus vs. Alternatives
| Feature | Milvus | Pinecone | Weaviate | pgvector |
|---|---|---|---|---|
| Open source | ✅ | ❌ (cloud only) | ✅ | ✅ |
| Scale | Billions | Millions | Millions | Millions |
| Deployment | Lite/Docker/K8s | Managed cloud | Docker/K8s | PostgreSQL extension |
| Hybrid filtering | ✅ Rich | ✅ | ✅ | ✅ |
| GPU indexing | ✅ | ❌ | ❌ | ❌ |
| Best for | Production scale | Quick SaaS start | Semantic search | Existing Postgres apps |
For computer vision at scale, Milvus is a leading choice because of its support for very large datasets, GPU-accelerated indexing, and mature Python ecosystem.
Milvus Architecture Overview
Milvus has a layered, disaggregated architecture — each layer can be scaled independently:
In plain English:
- Proxy nodes receive client requests and route them
- Coordinators manage cluster metadata, query planning, and data distribution
- Worker nodes do the actual heavy lifting: storing data, building indexes, executing searches
- Storage is separated from compute — data lives in object storage (S3/MinIO), metadata in etcd
This separation is what allows Milvus to scale each component independently. You can add more QueryNodes to handle more queries without touching DataNodes.
3. Core Concepts You Must Understand
Before writing a single line of code, you need to internalize these concepts. They map to familiar database concepts but have important differences.
3.1 Collection
A collection in Milvus is analogous to a table in a relational database. It is the top-level container that holds your data.
Each collection has:
- A schema — defines the fields (columns) and their types
- One or more indexes — built on the vector field(s) to enable fast ANN search
- Optional partitions — logical subdivisions within a collection
Example analogy: SQL Table face_embeddings → Milvus Collection face_embeddings
3.2 Schema and Fields
A Milvus schema defines the structure of every entity (row) in the collection. Each schema must have:
- A primary key field — a unique ID for each entity. Can be
INT64(auto-generated or user-provided) orVARCHAR. - At least one vector field — stores the embedding. Must specify the number of dimensions.
- Optional scalar fields — additional metadata like file path, label, timestamp, confidence score.
Supported scalar field types:
INT8,INT16,INT32,INT64FLOAT,DOUBLEBOOLVARCHAR(up to 65,535 characters)JSON— unstructured key-value data (powerful for flexible metadata)ARRAY— fixed-type arrays
Supported vector field types:
FLOAT_VECTOR— 32-bit floating point vectors (most common)BINARY_VECTOR— packed binary vectors (more compact, useful for hashing-based embeddings)FLOAT16_VECTOR— 16-bit half-precision (reduces memory, slight accuracy tradeoff)BFLOAT16_VECTOR— brain float 16 (popular in ML hardware)SPARSE_FLOAT_VECTOR— for sparse representations (BM25, SPLADE)
3.3 Entity
An entity is a single record (row) in a collection. It contains values for all fields defined in the schema. When you insert data, you insert entities.
3.4 Segment
Internally, Milvus divides data in a collection into segments — immutable chunks of data that are individually indexed. When a segment reaches a certain size threshold, it is “sealed” and an index is built on it. Smaller “growing segments” handle newly inserted data before they are sealed.
You rarely interact with segments directly, but understanding them explains behaviors like “why don’t my newly inserted vectors appear in search results immediately?”
3.5 Partition
A partition is a logical subdivision of a collection. Think of it as a sub-table that can be searched independently or together.
Why use partitions?
- To scope searches to a subset of data (e.g., search only videos from “2024”)
- To logically separate data (e.g., one partition per camera, one per user)
- They improve query performance when you know which partition to target
Every collection has a default partition called _default.
3.6 Index
An index is a data structure built on a vector field that makes ANN search fast. Milvus supports many index types:
- FLAT — brute-force exact search. Perfect accuracy, slow at scale.
- IVF_FLAT — inverted file index. Divides vectors into clusters; searches only relevant clusters.
- IVF_SQ8 — like IVF_FLAT but with scalar quantization (compresses vectors to 8-bit; saves memory).
- IVF_PQ — product quantization; extreme compression, lower accuracy.
- HNSW — Hierarchical Navigable Small World graph. Excellent speed/accuracy tradeoff; the gold standard for most use cases.
- SCANN — Google’s ScaNN algorithm; highly optimized for recall.
- DiskANN — designed for datasets too large to fit in RAM; stores index on disk.
- GPU_IVF_FLAT, GPU_CAGRA — GPU-accelerated variants.
Choosing the right index is one of the most important decisions in your Milvus deployment. We cover this in detail in Section 9.
3.7 Distance Metrics
When performing a vector search, Milvus computes a distance between the query vector and every candidate vector. The three supported metrics are:
L2 (Euclidean Distance) \[ d(a, b) = \sqrt{\sum_i (a_i - b_i)^2} \] Lower = more similar. Best for embeddings that are not normalized to unit length.
IP (Inner Product / Dot Product) \[ d(a, b) = \sum_i a_i \cdot b_i \] Higher = more similar. For normalized vectors, IP is equivalent to cosine similarity.
Cosine
\[ d(a, b) = 1 - \frac{a \cdot b}{\|a\| \, \|b\|} \]
Lower = more similar. Measures angular distance; invariant to vector magnitude.
Rule of thumb: If your embedding model normalizes its output (most do), use IP or Cosine. If not normalized, use L2.
4. How Computer Vision Meets Vector Search
The General Pipeline
Every computer vision application that uses Milvus follows the same fundamental pipeline:
graph LR
A["Raw Image (or frame)"]
B["Embedding Model (CNN, ViT, etc.)"]
C["Feature Vector f₁, f₂, ..., fₙ"]
D[("Milvus Collection id · vector · metadata ────────────────── 1 · [...] · dog.jpg 2 · [...] · cat.png")]
E["Query Embed new image Search k-NN Return IDs"]
A --> B
B --> C
C --> D
D --> E
style A fill:#E8F4FD,stroke:#4A90D9
style B fill:#FEF9E7,stroke:#E8A838
style C fill:#EAF7EA,stroke:#5BA85A
style D fill:#F4ECF7,stroke:#8B6BB1
style E fill:#FDEDEC,stroke:#D95F5F
Two phases:
- Ingestion (offline): Extract embeddings from all your images and insert them into Milvus along with metadata.
- Query (online): For a new query image, extract its embedding, send it to Milvus, receive the IDs of the most similar images.
Choosing the Right Embedding Dimensionality
Different models produce embeddings of different sizes:
| Model Family | Typical Dimensions | Notes |
|---|---|---|
| ResNet-50 (pool layer) | 2048 | Large; very expressive |
| EfficientNet-B0 | 1280 | Good accuracy/size tradeoff |
| CLIP ViT-B/32 | 512 | Multi-modal (text+image) |
| CLIP ViT-L/14 | 768 | Larger, more accurate |
| DINOv2 ViT-S/14 | 384 | Efficient, self-supervised |
| DINOv2 ViT-g/14 | 1536 | Highest quality, expensive |
| Face (ArcFace, FaceNet) | 128–512 | Specialized for identity |
Higher dimensions = more expressive but more memory and slower search. Always test with your target data to find the right model for your use case.
Normalization
Most ANN indexes and distance metrics assume your vectors are L2-normalized (unit vectors). Normalize before inserting:
import numpy as np
def normalize(vector: np.ndarray) -> np.ndarray:
"""Normalize a vector to unit length (L2 norm = 1)."""
norm = np.linalg.norm(vector)
if norm == 0:
return vector
return vector / norm
# For a batch of vectors (shape: [N, D])
def normalize_batch(vectors: np.ndarray) -> np.ndarray:
norms = np.linalg.norm(vectors, axis=1, keepdims=True)
norms = np.where(norms == 0, 1, norms) # avoid division by zero
return vectors / normsCheck your model’s documentation — many models (CLIP, DINOv2) already output normalized embeddings.
5. Setting Up Your Environment
Python Prerequisites
# Create and activate a virtual environment (recommended)
python -m venv milvus-cv-env
source milvus-cv-env/bin/activate # Linux/Mac
# milvus-cv-env\Scripts\activate # Windows
# Install the Milvus Python SDK
pip install pymilvus
# Install pymilvus with MilvusClient support (recommended, includes model utilities)
pip install "pymilvus[model]"
# Common CV libraries
pip install numpy pillow
pip install torch torchvision # if using PyTorch modelsVerifying the Installation
import pymilvus
print(pymilvus.__version__) # Should print e.g. "2.4.x"
from pymilvus import MilvusClient
print("PyMilvus installed correctly")SDK Version Compatibility
Always match your SDK version to your Milvus server version. Milvus uses semantic versioning (MAJOR.MINOR.PATCH). The SDK minor version should match the server minor version.
| Milvus Server | PyMilvus SDK |
|---|---|
| 2.4.x | 2.4.x |
| 2.3.x | 2.3.x |
| 2.2.x | 2.2.x |
6. Deployment Options: Lite → Docker → Kubernetes
6.1 Milvus Lite (Local Development)
Milvus Lite is a lightweight, serverless version of Milvus that runs entirely in-process — no server to start, no Docker required. It stores data in a local SQLite-like file.
Ideal for: prototyping, unit tests, notebooks, offline processing on a single machine.
Limitations:
- Not suitable for production (single process, limited concurrency)
- No distributed indexing, no GPU support
- Maximum dataset size is limited by local RAM/disk
Installation:
pip install milvus-lite # already included in pymilvus >= 2.4.2Usage:
from pymilvus import MilvusClient
# Pass a file path — Milvus Lite creates/opens a local database file
client = MilvusClient("./my_cv_database.db")
print("Connected to Milvus Lite")That’s it. No servers, no configuration. The database file is portable and can be copied between machines.
Checking stored data:
# List all collections in this database
collections = client.list_collections()
print(collections)When to move beyond Milvus Lite:
- Your dataset exceeds a few million vectors
- You need multi-user concurrent access
- You need production reliability (backups, replication, crash recovery)
- You want GPU-accelerated indexing
6.2 Standalone Milvus (Docker / Docker Compose)
Standalone Milvus runs Milvus as a set of Docker containers on a single machine. It includes all components: the Milvus server, etcd (for metadata), and MinIO (for object storage).
Ideal for: single-machine production use, team development environments, moderate-scale deployments (tens of millions of vectors).
Installing Docker
# Ubuntu/Debian
sudo apt-get update && sudo apt-get install docker.io docker-compose-plugin -y
sudo systemctl enable --now docker
sudo usermod -aG docker $USER # allow running docker without sudo (re-login required)
# macOS — Install Docker Desktop from https://www.docker.com/products/docker-desktop/Starting Standalone Milvus with Docker Compose
Download the official docker-compose.yml:
# Download the compose file
wget https://github.com/milvus-io/milvus/releases/download/v2.4.0/milvus-standalone-docker-compose.yml \
-O docker-compose.ymlThe file looks like this (simplified):
version: '3.5'
services:
etcd:
container_name: milvus-etcd
image: quay.io/coreos/etcd:v3.5.5
environment:
- ETCD_AUTO_COMPACTION_MODE=revision
- ETCD_AUTO_COMPACTION_RETENTION=1000
- ETCD_QUOTA_BACKEND_BYTES=4294967296
- ETCD_SNAPSHOT_COUNT=50000
volumes:
- ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/etcd:/etcd
command: etcd -advertise-client-urls=http://127.0.0.1:2379
-listen-client-urls http://0.0.0.0:2379 --data-dir /etcd
minio:
container_name: milvus-minio
image: minio/minio:RELEASE.2023-03-13T19-46-17Z
environment:
MINIO_ACCESS_KEY: minioadmin
MINIO_SECRET_KEY: minioadmin
volumes:
- ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/minio:/minio_data
command: minio server /minio_data
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
standalone:
container_name: milvus-standalone
image: milvusdb/milvus:v2.4.0
command: ["milvus", "run", "standalone"]
environment:
ETCD_ENDPOINTS: etcd:2379
MINIO_ADDRESS: minio:9000
volumes:
- ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/milvus:/var/lib/milvus
ports:
- "19530:19530" # gRPC port (SDK connects here)
- "9091:9091" # HTTP/metrics port
depends_on:
- etcd
- minio
networks:
default:
name: milvusStart it:
docker compose up -d
# Check that all three containers are running
docker compose psExpected output:
NAME STATUS
milvus-etcd running
milvus-minio running
milvus-standalone running
Connect from Python:
from pymilvus import MilvusClient
# Connect to the running Milvus server
# Default port is 19530
client = MilvusClient(uri="http://localhost:19530")
print("Connected to Milvus Standalone")Stop and remove containers:
docker compose down # Stop containers, preserve data volumes
docker compose down -v # Stop containers AND delete all data (destructive!)Persistent Volumes
By default, data is stored in ./volumes/ relative to where you ran the compose command. Back up this directory to preserve your data.
Resource Recommendations for Standalone
| Dataset Size | RAM | CPU | Disk |
|---|---|---|---|
| < 10M vectors | 16 GB | 4 cores | 100 GB SSD |
| 10–50M vectors | 32–64 GB | 8 cores | 500 GB SSD |
| 50–100M vectors | 64–128 GB | 16 cores | 1 TB SSD |
6.3 Distributed Milvus on Kubernetes
Distributed Milvus is the full production-grade deployment. Each component (QueryNode, DataNode, IndexNode, Proxy) runs as a separate pod and scales independently.
Ideal for: billion-scale datasets, high-availability requirements, multi-region deployments, enterprise use cases.
Prerequisites
- A running Kubernetes cluster (EKS, GKE, AKS, or self-hosted with kubeadm)
kubectlconfigured to access your clusterhelm(Kubernetes package manager) installed
# Install Helm
curl https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3 | bash
# Verify
helm versionAdding the Milvus Helm Repository
helm repo add milvus https://zilliztech.github.io/milvus-helm/
helm repo updateMinimal Distributed Deployment
Create a values.yaml to customize your deployment:
# values.yaml — Minimal distributed Milvus configuration
cluster:
enabled: true # Enable distributed mode
# Component replica counts
proxy:
replicas: 2
queryNode:
replicas: 2
resources:
requests:
memory: "8Gi"
cpu: "2"
limits:
memory: "16Gi"
cpu: "4"
dataNode:
replicas: 1
resources:
requests:
memory: "4Gi"
cpu: "1"
indexNode:
replicas: 1
resources:
requests:
memory: "8Gi"
cpu: "4"
# Message queue (Pulsar for distributed mode)
pulsar:
enabled: true
# Object storage (MinIO deployed alongside)
minio:
enabled: true
mode: distributed
replicas: 4
# Metadata store
etcd:
replicaCount: 3 # etcd should run as odd number for quorum
# Expose the service
service:
type: LoadBalancerDeploy:
# Create a dedicated namespace
kubectl create namespace milvus
# Deploy Milvus
helm install milvus milvus/milvus \
--namespace milvus \
-f values.yaml \
--timeout 15m \
--wait
# Check pod status
kubectl get pods -n milvusExpected pods:
NAME READY STATUS
milvus-datacoord-xxx 1/1 Running
milvus-datanode-xxx 1/1 Running
milvus-etcd-0 1/1 Running
milvus-etcd-1 1/1 Running
milvus-etcd-2 1/1 Running
milvus-indexcoord-xxx 1/1 Running
milvus-indexnode-xxx 1/1 Running
milvus-minio-0 1/1 Running
milvus-proxy-xxx 1/1 Running
milvus-querycoord-xxx 1/1 Running
milvus-querynode-0 1/1 Running
milvus-querynode-1 1/1 Running
milvus-rootcoord-xxx 1/1 Running
Get the external IP:
kubectl get svc -n milvus milvus
# EXTERNAL-IP column shows the load balancer IPConnect from Python:
from pymilvus import MilvusClient
MILVUS_HOST = "YOUR_EXTERNAL_IP" # from kubectl get svc
client = MilvusClient(uri=f"http://{MILVUS_HOST}:19530")
print("Connected to Milvus Distributed")Scaling Components
# Scale QueryNodes to handle more concurrent searches
kubectl scale deployment milvus-querynode -n milvus --replicas=5
# Scale DataNodes to handle faster data ingestion
kubectl scale deployment milvus-datanode -n milvus --replicas=3
# Scale IndexNodes for faster index building
kubectl scale deployment milvus-indexnode -n milvus --replicas=2GPU Support on Kubernetes
To enable GPU-accelerated indexing, add GPU node selectors and requests:
# In values.yaml — GPU configuration for IndexNode
indexNode:
replicas: 1
resources:
limits:
nvidia.com/gpu: 1 # Request 1 GPU per pod
nodeSelector:
accelerator: nvidia-gpu
tolerations:
- key: "nvidia.com/gpu"
operator: "Exists"
effect: "NoSchedule"You must also have the NVIDIA device plugin installed in your cluster:
kubectl apply -f https://raw.githubusercontent.com/NVIDIA/k8s-device-plugin/v0.14.0/nvidia-device-plugin.yml7. Working with Collections and Schemas
The MilvusClient API
PyMilvus offers two API styles:
MilvusClient— simplified, high-level API (recommended for most use cases)connections+Collection— lower-level ORM-style API (more control)
This guide uses MilvusClient throughout, as it is the modern recommended approach.
Connecting (works for all deployment modes)
from pymilvus import MilvusClient
# Milvus Lite
client = MilvusClient("./cv_database.db")
# Standalone (Docker)
client = MilvusClient(uri="http://localhost:19530")
# With authentication (if enabled)
client = MilvusClient(
uri="http://localhost:19530",
token="root:Milvus" # format: "username:password"
)
# Distributed (Kubernetes)
client = MilvusClient(uri="http://EXTERNAL_IP:19530")Defining a Schema
from pymilvus import MilvusClient, DataType
client = MilvusClient("./cv_database.db")
# Create a schema
schema = client.create_schema(
auto_id=True, # Milvus auto-generates the primary key
enable_dynamic_field=True, # Allow inserting extra fields not in schema
)
# Add the primary key field
schema.add_field(
field_name="id",
datatype=DataType.INT64,
is_primary=True,
)
# Add the vector field — CRITICAL: dim must match your embedding model's output size
schema.add_field(
field_name="embedding",
datatype=DataType.FLOAT_VECTOR,
dim=512, # Change this to match your model (e.g., 768, 1536, 2048)
)
# Add scalar metadata fields
schema.add_field(
field_name="image_path",
datatype=DataType.VARCHAR,
max_length=1024,
)
schema.add_field(
field_name="label",
datatype=DataType.VARCHAR,
max_length=128,
)
schema.add_field(
field_name="confidence",
datatype=DataType.FLOAT,
)
schema.add_field(
field_name="timestamp",
datatype=DataType.INT64, # store as Unix epoch milliseconds
)Creating Index Parameters
Before creating the collection, define how the vector field should be indexed:
from pymilvus import MilvusClient
# Define index parameters for the vector field
index_params = client.prepare_index_params()
index_params.add_index(
field_name="embedding", # must match your vector field name
index_type="HNSW", # index algorithm (see Section 9 for all options)
metric_type="COSINE", # distance metric: L2, IP, or COSINE
params={
"M": 16, # HNSW: number of neighbors per node (8–64; higher = better recall, more memory)
"efConstruction": 200, # HNSW: build-time search depth (higher = better quality index, slower build)
}
)
# Also create an index on a scalar field for fast filtering
index_params.add_index(
field_name="label",
index_type="Trie", # inverted index for VARCHAR fields
)Creating the Collection
# Create the collection with the schema and index parameters
client.create_collection(
collection_name="image_embeddings",
schema=schema,
index_params=index_params,
)
print("Collection created successfully")
# Verify it exists
collections = client.list_collections()
print(f"Collections: {collections}")
# Get collection info
info = client.describe_collection("image_embeddings")
print(info)Quick Collection Creation (Simplified API)
For rapid prototyping, MilvusClient allows creating a collection with just a dimension:
# Creates a collection with auto schema: id (INT64 PK) + vector (FLOAT_VECTOR)
client.create_collection(
collection_name="quick_test",
dimension=512,
metric_type="COSINE",
)
# This is great for testing but you cannot add custom metadata fields this wayDropping a Collection
# WARNING: This permanently deletes all data in the collection
client.drop_collection("image_embeddings")8. Inserting Embedding Vectors
Basic Insertion
import numpy as np
import time
# Simulate embedding extraction
def mock_embed(n: int, dim: int = 512) -> np.ndarray:
"""Generate random normalized vectors to simulate embeddings."""
vectors = np.random.randn(n, dim).astype(np.float32)
norms = np.linalg.norm(vectors, axis=1, keepdims=True)
return (vectors / norms).tolist()
# Prepare data as a list of dicts (one dict per entity)
data = [
{
# "id" is omitted because auto_id=True
"embedding": mock_embed(1, dim=512)[0],
"image_path": "/dataset/images/dog_001.jpg",
"label": "dog",
"confidence": 0.97,
"timestamp": int(time.time() * 1000),
},
{
"embedding": mock_embed(1, dim=512)[0],
"image_path": "/dataset/images/cat_002.jpg",
"label": "cat",
"confidence": 0.92,
"timestamp": int(time.time() * 1000),
},
]
# Insert the data
result = client.insert(
collection_name="image_embeddings",
data=data,
)
print(f"Inserted {result['insert_count']} entities")
print(f"Primary keys: {result['ids']}")Batch Insertion (Production Pattern)
For large datasets, always insert in batches. Milvus recommends batch sizes of 1,000–10,000 entities per insert call:
import numpy as np
import time
def embed_batch(image_paths: list, dim: int = 512) -> list:
"""
Placeholder function — replace with your actual embedding model call.
Should return a list of normalized float vectors.
"""
n = len(image_paths)
vectors = np.random.randn(n, dim).astype(np.float32)
norms = np.linalg.norm(vectors, axis=1, keepdims=True)
return (vectors / norms).tolist()
def insert_images_in_batches(
client: MilvusClient,
collection_name: str,
image_paths: list,
labels: list,
batch_size: int = 2000,
embedding_dim: int = 512,
):
"""
Extracts embeddings from images and inserts them into Milvus in batches.
"""
total = len(image_paths)
inserted = 0
for start in range(0, total, batch_size):
end = min(start + batch_size, total)
batch_paths = image_paths[start:end]
batch_labels = labels[start:end]
# Extract embeddings for this batch
batch_embeddings = embed_batch(batch_paths, dim=embedding_dim)
# Build the data list
batch_data = [
{
"embedding": batch_embeddings[i],
"image_path": batch_paths[i],
"label": batch_labels[i],
"confidence": 1.0, # placeholder
"timestamp": int(time.time() * 1000),
}
for i in range(len(batch_paths))
]
# Insert
result = client.insert(
collection_name=collection_name,
data=batch_data,
)
inserted += result["insert_count"]
print(f"Progress: {inserted}/{total} ({100*inserted/total:.1f}%)")
print(f" Done! Inserted {inserted} entities total.")
return inserted
# Example usage
image_paths = [f"/data/images/img_{i:06d}.jpg" for i in range(100_000)]
labels = ["dog" if i % 2 == 0 else "cat" for i in range(100_000)]
insert_images_in_batches(
client=client,
collection_name="image_embeddings",
image_paths=image_paths,
labels=labels,
batch_size=2000,
embedding_dim=512,
)Upsert (Insert or Update)
If an entity with the given primary key already exists, upsert replaces it; otherwise it inserts:
result = client.upsert(
collection_name="image_embeddings",
data=[
{
"id": 42, # specify the existing ID to update
"embedding": new_vector,
"image_path": "/updated/path/image.jpg",
"label": "updated_label",
"confidence": 0.99,
"timestamp": int(time.time() * 1000),
}
],
)Deleting Entities
# Delete by primary key
client.delete(
collection_name="image_embeddings",
ids=[1, 2, 3, 42],
)
# Delete by filter expression
client.delete(
collection_name="image_embeddings",
filter="label == 'cat'",
)Data Freshness and the “Growing Segment” Delay
After inserting, your data enters a growing segment that is not yet indexed. Searches on unsealed segments use brute force, which is slower. For production use cases, you can force a flush:
# Force flush — seals all growing segments and ensures data is persisted
client.flush(collection_name="image_embeddings")After flushing, Milvus will asynchronously build the index on the new segments. For queries that need to see the absolute latest data without waiting for indexing, set consistency_level="Strong":
results = client.search(
collection_name="image_embeddings",
data=[query_vector],
limit=10,
consistency_level="Strong", # waits for latest data to be visible
)Consistency levels:
"Strong"— always sees the latest data; highest consistency, highest latency"Bounded"— sees data up to a few seconds old; good default for most CV use cases"Eventually"— fastest; may miss very recent inserts
9. Index Types and When to Use Each
Choosing the right index is crucial for balancing search speed, recall accuracy, and memory usage. Here is a detailed breakdown of every major index type in Milvus.
FLAT (Exact Search / Brute Force)
How it works: Compares the query vector against every single vector in the collection. No approximation — always returns the true nearest neighbors.
Parameters: None.
index_params.add_index(
field_name="embedding",
index_type="FLAT",
metric_type="COSINE",
params={},
)Pros: 100% recall (always finds the true nearest neighbors); no build time.
Cons: O(N) query time — gets linearly slower as N grows; impractical for more than ~500K vectors.
Best for: Exact search requirements, small datasets (< 1M vectors), benchmarking other indexes.
IVF_FLAT (Inverted File Index)
How it works: During index building, vectors are clustered into nlist Voronoi cells using k-means. Each vector is assigned to its nearest cluster centroid. At query time, the nprobe nearest cluster centroids are identified, and only the vectors in those clusters are searched.
index_params.add_index(
field_name="embedding",
index_type="IVF_FLAT",
metric_type="L2",
params={
"nlist": 1024, # number of clusters. Rule of thumb: sqrt(N) where N = dataset size
}
)Search parameters (set at query time):
search_params = {
"nprobe": 16, # number of clusters to search (higher = better recall, slower query)
}nlist and nprobe tradeoffs:
nlist= 1024,nprobe= 1: very fast, low recallnlist= 1024,nprobe= 64: slower, high recallnprobeshould be between 1 andnlist- Typical:
nprobe = nlist / 16tonlist / 8
Best for: Medium datasets (1M–100M vectors), balanced recall/speed.
IVF_SQ8 (IVF + Scalar Quantization)
How it works: Same as IVF_FLAT, but vectors are compressed from 32-bit floats to 8-bit integers (scalar quantization). Reduces memory by ~4x.
index_params.add_index(
field_name="embedding",
index_type="IVF_SQ8",
metric_type="L2",
params={"nlist": 1024},
)Memory reduction: A 512-dim float32 vector takes 2048 bytes. IVF_SQ8 compresses it to 512 bytes.
Recall impact: Slight degradation vs. IVF_FLAT (typically 0.5–2% lower recall@10).
Best for: When you have memory constraints but can tolerate a small accuracy drop.
IVF_PQ (IVF + Product Quantization)
How it works: Divides the vector into m sub-vectors and quantizes each sub-vector independently into one of nbits-bit codes. Extreme compression — a 512-dim float32 vector can be compressed to just 8–16 bytes.
index_params.add_index(
field_name="embedding",
index_type="IVF_PQ",
metric_type="L2",
params={
"nlist": 1024,
"m": 8, # number of sub-quantizers (must divide evenly into dim)
"nbits": 8, # bits per sub-quantizer code (typically 8)
}
)Memory reduction: ~32x compression vs. FLAT (dramatic).
Recall impact: Significant — typically 5–15% lower recall@10 than FLAT.
Best for: Billion-scale datasets where memory is severely constrained.
SCANN
Google’s ScaNN algorithm, integrated into Milvus. Excellent recall/speed tradeoff, competitive with HNSW:
index_params.add_index(
field_name="embedding",
index_type="SCANN",
metric_type="COSINE",
params={
"nlist": 1024,
"with_raw_data": True,
}
)DiskANN (Disk-Based ANN)
How it works: Stores most of the index on disk (SSD) and reads it on demand. Enables searching datasets that are too large to fit in RAM.
index_params.add_index(
field_name="embedding",
index_type="DISKANN",
metric_type="L2",
params={},
)Requirements: Fast NVMe SSD. Query latency is higher than RAM-based indexes (5–30ms vs. 1–5ms) but far better than brute-force.
Best for: Truly massive datasets (100M+ vectors) on a single node.
GPU Indexes
Available when your Milvus deployment has GPU-enabled nodes:
# GPU-accelerated IVF_FLAT
index_params.add_index(
field_name="embedding",
index_type="GPU_IVF_FLAT",
metric_type="L2",
params={"nlist": 1024},
)
# GPU-accelerated CAGRA (graph-based, state of the art for GPU)
index_params.add_index(
field_name="embedding",
index_type="GPU_CAGRA",
metric_type="L2",
params={
"intermediate_graph_degree": 64,
"graph_degree": 32,
}
)Speedups: GPU indexes can be 10–100x faster than CPU indexes for index building, and 5–20x faster for queries.
Index Selection Summary
Small dataset (< 500K)? → FLAT
Medium dataset, low memory? → IVF_SQ8 or IVF_PQ
Medium dataset, good memory? → IVF_FLAT or HNSW
Large dataset, best recall? → HNSW (M=16, efConstruction=200)
Huge dataset, memory limited? → DiskANN
GPU available? → GPU_CAGRA or GPU_IVF_FLAT
10. Querying and Searching
Vector Similarity Search
The primary operation in Milvus — finding the k vectors most similar to a query vector:
import numpy as np
# Simulate a query embedding (in practice, this comes from embedding your query image)
query_vector = np.random.randn(512).astype(np.float32)
query_vector = (query_vector / np.linalg.norm(query_vector)).tolist()
# Perform the search
results = client.search(
collection_name="image_embeddings",
data=[query_vector], # list of query vectors (supports batch queries)
limit=10, # return top 10 most similar
output_fields=["image_path", "label", "confidence"],
search_params={"ef": 100}, # HNSW-specific params (omit for FLAT)
)
# Results is a list of lists (one inner list per query vector)
for hit in results[0]:
print(f"ID: {hit['id']}")
print(f"Distance: {hit['distance']:.4f}")
print(f"Image: {hit['entity']['image_path']}")
print(f"Label: {hit['entity']['label']}")
print()Batch Queries
Search for multiple query vectors in a single call — much more efficient than looping:
query_vectors = [
np.random.randn(512).astype(np.float32).tolist()
for _ in range(5)
]
results = client.search(
collection_name="image_embeddings",
data=query_vectors,
limit=10,
output_fields=["image_path", "label"],
)
for query_idx, query_results in enumerate(results):
print(f"Query {query_idx} top results:")
for hit in query_results:
print(f" {hit['entity']['image_path']} (distance: {hit['distance']:.4f})")Filtered Vector Search
Combine vector similarity with scalar attribute filtering:
results = client.search(
collection_name="image_embeddings",
data=[query_vector],
limit=10,
filter="label == 'dog'",
output_fields=["image_path", "label", "confidence"],
)Filter expression syntax:
# Comparison operators
"confidence > 0.9"
"timestamp >= 1700000000000"
"label != 'cat'"
# Logical operators
"label == 'dog' AND confidence > 0.8"
"label in ['dog', 'cat']"
"NOT (label in ['background', 'unknown'])"
# String operations
"image_path like '/dataset/train/%'"
# Range
"confidence > 0.7 AND confidence < 0.95"
# JSON field access
"metadata['camera_id'] == 'cam_01'"Scalar Query (No Vector Search)
Retrieve entities by scalar attributes only:
results = client.query(
collection_name="image_embeddings",
filter="label == 'dog' AND confidence > 0.9",
output_fields=["id", "image_path", "label", "confidence"],
limit=100,
)Get Entity by ID
entities = client.get(
collection_name="image_embeddings",
ids=[1, 2, 42],
output_fields=["image_path", "label"],
)11. Use Case 1 — Image Similarity Search
Image similarity search is the foundational computer vision use case for Milvus. Given a query image, find the most visually similar images in a large dataset. Applications include reverse image search, product visual search, duplicate detection, and content-based image retrieval (CBIR).
Architecture
graph TD
A["User uploads query image"]
B["Embedding Model (ResNet, CLIP, DINOv2, etc.)"]
C["query_vector 512-dim float array"]
D["Milvus Search HNSW + COSINE"]
E["Top-K similar image IDs + distances + metadata"]
F["Fetch thumbnails from storage by path"]
G["Return results to user"]
A --> B
B --> C
C --> D
D --> E
E --> F
F --> G
style A fill:#E8F4FD,stroke:#4A90D9
style B fill:#FEF9E7,stroke:#E8A838
style C fill:#EAF7EA,stroke:#5BA85A
style D fill:#F4ECF7,stroke:#8B6BB1
style E fill:#FDEDEC,stroke:#D95F5F
style F fill:#EBF5FB,stroke:#2980B9
style G fill:#EAFAF1,stroke:#27AE60
Full Implementation
from pymilvus import MilvusClient, DataType
import numpy as np
import time
# ─── Configuration ────────────────────────────────────────────────────────────
COLLECTION_NAME = "image_similarity"
EMBEDDING_DIM = 512
MILVUS_URI = "./image_similarity.db"
client = MilvusClient(MILVUS_URI)
# ─── Create Collection ────────────────────────────────────────────────────────
def create_image_similarity_collection():
if client.has_collection(COLLECTION_NAME):
print(f"Collection '{COLLECTION_NAME}' already exists.")
return
schema = client.create_schema(auto_id=True, enable_dynamic_field=False)
schema.add_field("id", DataType.INT64, is_primary=True)
schema.add_field("embedding", DataType.FLOAT_VECTOR, dim=EMBEDDING_DIM)
schema.add_field("image_path", DataType.VARCHAR, max_length=1024)
schema.add_field("category", DataType.VARCHAR, max_length=128)
schema.add_field("dataset_split", DataType.VARCHAR, max_length=16)
schema.add_field("width", DataType.INT32)
schema.add_field("height", DataType.INT32)
schema.add_field("file_size_bytes", DataType.INT64)
schema.add_field("inserted_at", DataType.INT64)
index_params = client.prepare_index_params()
index_params.add_index(
field_name="embedding",
index_type="HNSW",
metric_type="COSINE",
params={"M": 16, "efConstruction": 200},
)
index_params.add_index(field_name="category", index_type="Trie")
index_params.add_index(field_name="dataset_split", index_type="Trie")
client.create_collection(
collection_name=COLLECTION_NAME,
schema=schema,
index_params=index_params,
)
print(f"Created collection '{COLLECTION_NAME}'")
# ─── Embedding Function (Model-Agnostic Placeholder) ─────────────────────────
def extract_embedding(image_path: str) -> np.ndarray:
"""
Replace this function with your actual embedding model call.
Example with torchvision (ResNet-50):
from torchvision import models, transforms
from PIL import Image
import torch
model = models.resnet50(pretrained=True)
model.eval()
embedding_model = torch.nn.Sequential(*list(model.children())[:-1])
transform = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]),
])
img = Image.open(image_path).convert("RGB")
tensor = transform(img).unsqueeze(0)
with torch.no_grad():
embedding = embedding_model(tensor).squeeze().numpy()
embedding = embedding / np.linalg.norm(embedding)
return embedding
"""
vec = np.random.randn(EMBEDDING_DIM).astype(np.float32)
return vec / np.linalg.norm(vec)
# ─── Ingest Images ────────────────────────────────────────────────────────────
def ingest_images(image_records: list, batch_size: int = 2000):
total = len(image_records)
inserted = 0
for start in range(0, total, batch_size):
batch = image_records[start : start + batch_size]
data = []
for record in batch:
embedding = extract_embedding(record["path"])
data.append({
"embedding": embedding.tolist(),
"image_path": record["path"],
"category": record["category"],
"dataset_split": record["split"],
"width": record["width"],
"height": record["height"],
"file_size_bytes": record["size"],
"inserted_at": int(time.time() * 1000),
})
result = client.insert(collection_name=COLLECTION_NAME, data=data)
inserted += result["insert_count"]
print(f"Ingested {inserted}/{total} images")
return inserted
# ─── Search ───────────────────────────────────────────────────────────────────
def find_similar_images(
query_image_path: str,
top_k: int = 10,
category_filter: str = None,
min_dimension: int = None,
) -> list:
query_embedding = extract_embedding(query_image_path)
filters = []
if category_filter:
filters.append(f"category == '{category_filter}'")
if min_dimension:
filters.append(f"width >= {min_dimension} AND height >= {min_dimension}")
filter_expr = " AND ".join(filters) if filters else None
results = client.search(
collection_name=COLLECTION_NAME,
data=[query_embedding.tolist()],
limit=top_k,
filter=filter_expr,
search_params={"ef": max(top_k * 10, 100)},
output_fields=["image_path", "category", "width", "height"],
)
return [
{
"id": hit["id"],
"image_path": hit["entity"]["image_path"],
"similarity": hit["distance"],
"category": hit["entity"]["category"],
"width": hit["entity"]["width"],
"height": hit["entity"]["height"],
}
for hit in results[0]
]
# ─── Duplicate Detection ──────────────────────────────────────────────────────
def find_near_duplicates(similarity_threshold: float = 0.98, batch_size: int = 100):
duplicates = []
offset = 0
while True:
entities = client.query(
collection_name=COLLECTION_NAME,
filter="id > 0",
output_fields=["id", "embedding", "image_path"],
limit=batch_size,
offset=offset,
)
if not entities:
break
for entity in entities:
results = client.search(
collection_name=COLLECTION_NAME,
data=[entity["embedding"]],
limit=5,
search_params={"ef": 50},
output_fields=["image_path"],
)
for hit in results[0][1:]:
if hit["distance"] >= similarity_threshold:
pair = tuple(sorted([entity["id"], hit["id"]]))
entry = (pair[0], pair[1], hit["distance"])
if entry not in duplicates:
duplicates.append(entry)
offset += batch_size
return duplicates12. Use Case 2 — Face Recognition
Face recognition is one of the highest-stakes computer vision applications. The core workflow involves face detection, alignment, embedding extraction, storage in Milvus, and similarity search for identity lookup.
Important Notes on Face Recognition Ethics and Legality
Face recognition systems raise serious privacy concerns. Before building and deploying such a system:
- Ensure you have explicit consent from individuals whose faces you are storing
- Comply with applicable regulations (GDPR, CCPA, BIPA, etc.)
- Implement appropriate data retention and deletion policies
- Consider the risk of false positives in high-stakes applications (security, law enforcement)
Identity Schema Design
from pymilvus import MilvusClient, DataType
COLLECTION_NAME = "face_identities"
FACE_EMBEDDING_DIM = 512
client = MilvusClient("./face_db.db")
schema = client.create_schema(auto_id=True, enable_dynamic_field=True)
schema.add_field("id", DataType.INT64, is_primary=True)
schema.add_field("embedding", DataType.FLOAT_VECTOR, dim=FACE_EMBEDDING_DIM)
schema.add_field("person_id", DataType.VARCHAR, max_length=64)
schema.add_field("person_name", DataType.VARCHAR, max_length=128)
schema.add_field("confidence_score", DataType.FLOAT)
schema.add_field("source_image", DataType.VARCHAR, max_length=1024)
schema.add_field("enrolled_at", DataType.INT64)
schema.add_field("is_active", DataType.BOOL)
index_params = client.prepare_index_params()
index_params.add_index(
field_name="embedding",
index_type="HNSW",
metric_type="IP",
params={"M": 16, "efConstruction": 200},
)
index_params.add_index(field_name="person_id", index_type="Trie")
index_params.add_index(field_name="is_active", index_type="BITMAP")
client.create_collection(
collection_name=COLLECTION_NAME,
schema=schema,
index_params=index_params,
)Enrolling Identities
A person may have multiple enrolled face embeddings. Storing multiple embeddings per person improves recognition robustness:
import numpy as np
import time
def extract_face_embedding(aligned_face_image_path: str) -> np.ndarray:
"""
Placeholder — replace with your actual face recognition model.
Example frameworks:
- InsightFace (ArcFace): pip install insightface
- deepface: pip install deepface
- facenet-pytorch: pip install facenet-pytorch
"""
vec = np.random.randn(FACE_EMBEDDING_DIM).astype(np.float32)
return vec / np.linalg.norm(vec)
def assess_face_quality(image_path: str) -> float:
"""
Estimate the quality of a face image for enrollment (0.0–1.0).
In practice, use a dedicated face quality assessment model.
"""
return 0.95 # placeholder
def enroll_person(
person_id: str,
person_name: str,
face_image_paths: list,
min_quality_threshold: float = 0.7,
):
enrolled_count = 0
for image_path in face_image_paths:
quality = assess_face_quality(image_path)
if quality < min_quality_threshold:
print(f"Skipping {image_path} — quality {quality:.2f} below threshold")
continue
embedding = extract_face_embedding(image_path)
client.insert(
collection_name=COLLECTION_NAME,
data=[{
"embedding": embedding.tolist(),
"person_id": person_id,
"person_name": person_name,
"confidence_score": quality,
"source_image": image_path,
"enrolled_at": int(time.time() * 1000),
"is_active": True,
}]
)
enrolled_count += 1
print(f"Enrolled {enrolled_count} faces for {person_name} ({person_id})")
return enrolled_countRecognition (1:N Search)
def recognize_face(
query_face_path: str,
top_k: int = 5,
similarity_threshold: float = 0.7,
) -> dict:
query_embedding = extract_face_embedding(query_face_path)
results = client.search(
collection_name=COLLECTION_NAME,
data=[query_embedding.tolist()],
limit=top_k,
filter="is_active == True",
search_params={"ef": 200},
output_fields=["person_id", "person_name", "confidence_score"],
)
if not results or not results[0]:
return {"status": "unknown", "reason": "no results"}
top_hit = results[0][0]
top_similarity = top_hit["distance"]
if top_similarity < similarity_threshold:
return {
"status": "unknown",
"best_match": {"person_id": top_hit["entity"]["person_id"], "similarity": top_similarity},
"reason": f"similarity {top_similarity:.4f} below threshold {similarity_threshold}",
}
person_votes = {}
for hit in results[0]:
if hit["distance"] >= similarity_threshold:
pid = hit["entity"]["person_id"]
person_votes.setdefault(pid, []).append(hit["distance"])
if not person_votes:
return {"status": "unknown", "reason": "no votes above threshold"}
best_person = max(person_votes, key=lambda pid: sum(person_votes[pid]) / len(person_votes[pid]))
avg_similarity = sum(person_votes[best_person]) / len(person_votes[best_person])
return {
"status": "recognized",
"person_id": best_person,
"person_name": results[0][0]["entity"]["person_name"],
"similarity": avg_similarity,
"num_matching_embeddings": len(person_votes[best_person]),
}
# ─── Verification (1:1) ───────────────────────────────────────────────────────
def verify_identity(image_path_1: str, image_path_2: str, threshold: float = 0.7) -> dict:
emb1 = extract_face_embedding(image_path_1)
emb2 = extract_face_embedding(image_path_2)
similarity = float(np.dot(emb1, emb2))
return {"same_person": similarity >= threshold, "similarity": similarity, "threshold": threshold}
# ─── Removing an Identity ─────────────────────────────────────────────────────
def deactivate_person(person_id: str):
entities = client.query(
collection_name=COLLECTION_NAME,
filter=f"person_id == '{person_id}'",
output_fields=["id"],
)
if not entities:
print(f"No enrollments found for person_id: {person_id}")
return
client.delete(collection_name=COLLECTION_NAME, ids=[e["id"] for e in entities])
print(f"Deleted {len(entities)} enrollments for person {person_id}")Similarity Thresholds for Face Recognition
Thresholds vary significantly by model. Always calibrate on your target dataset:
| Model | Typical Threshold (IP/Cosine) | Notes |
|---|---|---|
| ArcFace (ResNet-50) | 0.65–0.75 | Very robust model |
| FaceNet (Inception) | 0.70–0.80 | Good general purpose |
| AdaFace | 0.60–0.70 | Excellent for low-quality images |
| Your custom model | Must be calibrated | Use ROC curve on held-out set |
Calibration approach: Use your validation set, plot the ROC curve, and choose the threshold at your desired false acceptance rate (FAR) and false rejection rate (FRR) operating point.
13. Use Case 3 — Object Detection & Retrieval
In object detection pipelines, you first detect objects in an image (bounding boxes + class labels), then embed each detected region for downstream retrieval. Applications include defect detection in manufacturing, retail shelf monitoring, medical imaging, and autonomous driving data curation.
Architecture
graph TD
A["Input Image"]
B["Object Detector YOLO, Faster R-CNN, DETR, etc."]
C["Bounding Boxes + Class Labels"]
D["Region Cropping crop each detected region"]
E["Embedding Model same or different from detector"]
F["Region Embeddings"]
G[("Milvus source_image · bbox · class · score")]
A --> B
B --> C
C --> D
D --> E
E --> F
F --> G
style A fill:#E8F4FD,stroke:#4A90D9
style B fill:#FEF9E7,stroke:#E8A838
style C fill:#EAF7EA,stroke:#5BA85A
style D fill:#FDF2E9,stroke:#E67E22
style E fill:#F4ECF7,stroke:#8B6BB1
style F fill:#FDEDEC,stroke:#D95F5F
style G fill:#EAFAF1,stroke:#27AE60
Schema for Object Detections
from pymilvus import MilvusClient, DataType
COLLECTION_NAME = "object_detections"
REGION_EMBEDDING_DIM = 512
client = MilvusClient("./object_detection.db")
schema = client.create_schema(auto_id=True, enable_dynamic_field=True)
schema.add_field("id", DataType.INT64, is_primary=True)
schema.add_field("embedding", DataType.FLOAT_VECTOR, dim=REGION_EMBEDDING_DIM)
schema.add_field("source_image_path", DataType.VARCHAR, max_length=1024)
schema.add_field("source_image_id", DataType.VARCHAR, max_length=64)
schema.add_field("bbox_x1", DataType.FLOAT)
schema.add_field("bbox_y1", DataType.FLOAT)
schema.add_field("bbox_x2", DataType.FLOAT)
schema.add_field("bbox_y2", DataType.FLOAT)
schema.add_field("class_name", DataType.VARCHAR, max_length=64)
schema.add_field("class_id", DataType.INT32)
schema.add_field("detection_score", DataType.FLOAT)
schema.add_field("area_fraction", DataType.FLOAT)
schema.add_field("detected_at", DataType.INT64)
index_params = client.prepare_index_params()
index_params.add_index(
field_name="embedding",
index_type="HNSW",
metric_type="COSINE",
params={"M": 16, "efConstruction": 200},
)
index_params.add_index(field_name="class_name", index_type="Trie")
index_params.add_index(field_name="class_id", index_type="STL_SORT")
index_params.add_index(field_name="detection_score", index_type="STL_SORT")
client.create_collection(
collection_name=COLLECTION_NAME,
schema=schema,
index_params=index_params,
)Processing a Detection Pipeline
import numpy as np
import time
from dataclasses import dataclass
@dataclass
class Detection:
class_name: str
class_id: int
score: float
x1: float
y1: float
x2: float
y2: float
def detect_objects(image_path: str) -> list[Detection]:
"""
Placeholder for your object detection model.
Example with Ultralytics YOLO:
from ultralytics import YOLO
model = YOLO("yolov8n.pt")
results = model(image_path)
detections = []
for box in results[0].boxes:
x1, y1, x2, y2 = box.xyxyn[0].tolist()
detections.append(Detection(
class_name=model.names[int(box.cls)],
class_id=int(box.cls),
score=float(box.conf),
x1=x1, y1=y1, x2=x2, y2=y2,
))
return detections
"""
return [
Detection("car", 2, 0.95, 0.1, 0.2, 0.4, 0.8),
Detection("person", 0, 0.87, 0.5, 0.1, 0.7, 0.9),
]
def extract_region_embedding(image_path: str, detection: Detection) -> np.ndarray:
"""
Crop the detected region and extract its embedding.
Example with PIL:
from PIL import Image
img = Image.open(image_path).convert("RGB")
w, h = img.size
box = (int(detection.x1*w), int(detection.y1*h),
int(detection.x2*w), int(detection.y2*h))
region = img.crop(box)
# Pass through embedding model...
"""
vec = np.random.randn(REGION_EMBEDDING_DIM).astype(np.float32)
return vec / np.linalg.norm(vec)
def process_and_ingest_image(image_path: str, image_id: str):
detections = detect_objects(image_path)
if not detections:
return 0
data = []
for det in detections:
if det.score < 0.5:
continue
embedding = extract_region_embedding(image_path, det)
area = (det.x2 - det.x1) * (det.y2 - det.y1)
data.append({
"embedding": embedding.tolist(),
"source_image_path": image_path,
"source_image_id": image_id,
"bbox_x1": det.x1, "bbox_y1": det.y1,
"bbox_x2": det.x2, "bbox_y2": det.y2,
"class_name": det.class_name,
"class_id": det.class_id,
"detection_score": det.score,
"area_fraction": area,
"detected_at": int(time.time() * 1000),
})
result = client.insert(collection_name=COLLECTION_NAME, data=data)
return result["insert_count"]
def find_similar_objects(
query_image_path: str,
query_detection: Detection,
top_k: int = 10,
same_class_only: bool = True,
min_score: float = 0.7,
) -> list:
query_embedding = extract_region_embedding(query_image_path, query_detection)
filters = [f"detection_score >= {min_score}"]
if same_class_only:
filters.append(f"class_name == '{query_detection.class_name}'")
results = client.search(
collection_name=COLLECTION_NAME,
data=[query_embedding.tolist()],
limit=top_k,
filter=" AND ".join(filters),
search_params={"ef": 150},
output_fields=[
"source_image_path", "class_name", "detection_score",
"bbox_x1", "bbox_y1", "bbox_x2", "bbox_y2"
],
)
return [
{
"image_path": hit["entity"]["source_image_path"],
"similarity": hit["distance"],
"class_name": hit["entity"]["class_name"],
"detection_score": hit["entity"]["detection_score"],
"bbox": {
"x1": hit["entity"]["bbox_x1"], "y1": hit["entity"]["bbox_y1"],
"x2": hit["entity"]["bbox_x2"], "y2": hit["entity"]["bbox_y2"],
}
}
for hit in results[0]
]14. Use Case 4 — Video Frame Search
Video frame search enables you to find specific moments in a video library by content — “find all frames that look like this scene,” “find the first time this logo appears,” or “find all shots of people wearing red jackets.”
Key Challenges in Video Search
- Temporal redundancy — consecutive frames are very similar. You usually don’t want to embed every single frame.
- Scale — a 1-hour video at 30fps has 108,000 frames. A large video library is billions of frames.
- Efficient storage — you need to store enough metadata to locate the exact frame (video ID, timestamp, frame index)
Frame Sampling Strategies
def get_keyframe_indices(total_frames: int, fps: float, strategy: str = "every_n_seconds", interval: float = 1.0):
"""
Returns frame indices to sample based on the chosen strategy.
Strategies:
- "every_n_seconds": sample one frame every N seconds
- "every_n_frames": sample every Nth frame
- "uniform": uniformly sample a fixed number of frames
"""
if strategy == "every_n_seconds":
step = max(1, int(fps * interval))
return list(range(0, total_frames, step))
elif strategy == "every_n_frames":
step = max(1, int(interval))
return list(range(0, total_frames, step))
elif strategy == "uniform":
n_samples = int(interval)
if n_samples >= total_frames:
return list(range(total_frames))
step = total_frames / n_samples
return [int(i * step) for i in range(n_samples)]
else:
raise ValueError(f"Unknown strategy: {strategy}")Schema for Video Frames
from pymilvus import MilvusClient, DataType
COLLECTION_NAME = "video_frames"
FRAME_EMBEDDING_DIM = 512
client = MilvusClient("./video_search.db")
schema = client.create_schema(auto_id=True, enable_dynamic_field=True)
schema.add_field("id", DataType.INT64, is_primary=True)
schema.add_field("embedding", DataType.FLOAT_VECTOR, dim=FRAME_EMBEDDING_DIM)
schema.add_field("video_id", DataType.VARCHAR, max_length=64)
schema.add_field("video_path", DataType.VARCHAR, max_length=1024)
schema.add_field("video_title", DataType.VARCHAR, max_length=256)
schema.add_field("channel", DataType.VARCHAR, max_length=128)
schema.add_field("frame_index", DataType.INT64)
schema.add_field("timestamp_ms", DataType.INT64)
schema.add_field("fps", DataType.FLOAT)
schema.add_field("scene_tag", DataType.VARCHAR, max_length=64)
schema.add_field("has_faces", DataType.BOOL)
schema.add_field("has_text", DataType.BOOL)
index_params = client.prepare_index_params()
index_params.add_index(
field_name="embedding",
index_type="HNSW",
metric_type="COSINE",
params={"M": 16, "efConstruction": 200},
)
index_params.add_index(field_name="video_id", index_type="Trie")
index_params.add_index(field_name="channel", index_type="Trie")
index_params.add_index(field_name="timestamp_ms", index_type="STL_SORT")
client.create_collection(
collection_name=COLLECTION_NAME,
schema=schema,
index_params=index_params,
)Processing a Video
import numpy as np
import time
def extract_frame(video_path: str, frame_index: int) -> np.ndarray:
"""
Extract a single frame from a video.
Example with OpenCV:
import cv2
cap = cv2.VideoCapture(video_path)
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_index)
ret, frame = cap.read()
cap.release()
if ret:
return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
return None
"""
return np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
def embed_frame(frame: np.ndarray) -> np.ndarray:
vec = np.random.randn(FRAME_EMBEDDING_DIM).astype(np.float32)
return vec / np.linalg.norm(vec)
def get_video_metadata(video_path: str) -> dict:
"""
Example with OpenCV:
import cv2
cap = cv2.VideoCapture(video_path)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = cap.get(cv2.CAP_PROP_FPS)
cap.release()
return {"total_frames": total_frames, "fps": fps}
"""
return {"total_frames": 3000, "fps": 30.0}
def process_video(
video_path: str,
video_id: str,
video_title: str = "",
channel: str = "",
sample_every_n_seconds: float = 1.0,
batch_size: int = 256,
):
meta = get_video_metadata(video_path)
total_frames = meta["total_frames"]
fps = meta["fps"]
frame_indices = get_keyframe_indices(
total_frames, fps, strategy="every_n_seconds", interval=sample_every_n_seconds
)
print(f"Processing {video_path} — sampling {len(frame_indices)} frames")
inserted = 0
data_buffer = []
for frame_idx in frame_indices:
frame = extract_frame(video_path, frame_idx)
if frame is None:
continue
embedding = embed_frame(frame)
timestamp_ms = int((frame_idx / fps) * 1000)
data_buffer.append({
"embedding": embedding.tolist(),
"video_id": video_id,
"video_path": video_path,
"video_title": video_title,
"channel": channel,
"frame_index": frame_idx,
"timestamp_ms": timestamp_ms,
"fps": fps,
"has_faces": False,
"has_text": False,
"scene_tag": "unknown",
})
if len(data_buffer) >= batch_size:
result = client.insert(collection_name=COLLECTION_NAME, data=data_buffer)
inserted += result["insert_count"]
data_buffer = []
print(f" Inserted {inserted} frames so far...")
if data_buffer:
result = client.insert(collection_name=COLLECTION_NAME, data=data_buffer)
inserted += result["insert_count"]
print(f"Done: inserted {inserted} frames")
return inserted
def find_similar_frames(
query_image_path: str = None,
query_video_path: str = None,
query_timestamp_ms: int = None,
top_k: int = 20,
channel_filter: str = None,
time_range_ms: tuple = None,
) -> list:
if query_image_path:
frame = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
query_embedding = embed_frame(frame)
elif query_video_path and query_timestamp_ms is not None:
meta = get_video_metadata(query_video_path)
frame_idx = int((query_timestamp_ms / 1000) * meta["fps"])
frame = extract_frame(query_video_path, frame_idx)
query_embedding = embed_frame(frame)
else:
raise ValueError("Must provide query_image_path or (query_video_path + query_timestamp_ms)")
filters = []
if channel_filter:
filters.append(f"channel == '{channel_filter}'")
if time_range_ms:
start_ms, end_ms = time_range_ms
filters.append(f"timestamp_ms >= {start_ms} AND timestamp_ms <= {end_ms}")
results = client.search(
collection_name=COLLECTION_NAME,
data=[query_embedding.tolist()],
limit=top_k,
filter=" AND ".join(filters) if filters else None,
search_params={"ef": 200},
output_fields=["video_id", "video_title", "video_path", "frame_index", "timestamp_ms", "channel"],
)
hits = []
for hit in results[0]:
ts = hit["entity"]["timestamp_ms"]
hours = ts // 3_600_000
minutes = (ts % 3_600_000) // 60_000
seconds = (ts % 60_000) / 1000
hits.append({
"video_id": hit["entity"]["video_id"],
"video_title": hit["entity"]["video_title"],
"frame_index": hit["entity"]["frame_index"],
"timestamp_ms": ts,
"timestamp_str": f"{hours:02d}:{minutes:02d}:{seconds:05.2f}",
"similarity": hit["distance"],
"channel": hit["entity"]["channel"],
})
return hits15. Partitions, Filtering, and Hybrid Search
Partitions
Partitions are logical subdivisions within a collection that allow you to scope searches to a subset of the data, dramatically improving query speed when you know which partition to target.
# Create partitions (e.g., by year for a video archive)
client.create_partition(collection_name="video_frames", partition_name="2023")
client.create_partition(collection_name="video_frames", partition_name="2024")
client.create_partition(collection_name="video_frames", partition_name="2025")
# Insert into a specific partition
client.insert(
collection_name="video_frames",
data=my_data_2024,
partition_name="2024",
)
# Search only in the "2024" partition
results = client.search(
collection_name="video_frames",
data=[query_vector],
limit=10,
partition_names=["2024"],
search_params={"ef": 100},
)
# Search across multiple partitions
results = client.search(
collection_name="video_frames",
data=[query_vector],
limit=10,
partition_names=["2024", "2025"],
)Partition Design Guidelines:
- Use partitions for high-cardinality categorical splits (year, user_id, camera_id)
- Avoid too many partitions (< 4096 per collection is safe)
- Don’t use partitions as a substitute for scalar filtering on low-cardinality fields
Advanced Filter Expressions
# String operations
filter="label in ['dog', 'cat', 'bird']"
filter="image_path like '/dataset/train/%'"
filter="NOT (label in ['background', 'unknown'])"
# Numeric comparisons
filter="confidence > 0.85 AND detection_score < 0.99"
filter="width >= 1920 AND height >= 1080"
# JSON field access
filter="metadata['camera_id'] == 'cam_01'"
# Combining conditions
filter="(label == 'dog' OR label == 'cat') AND confidence > 0.9 AND dataset_split == 'train'"
# Array containment
filter="ARRAY_CONTAINS(tags, 'outdoor')"Hybrid Search (Vector + Full-Text Search)
Milvus 2.5+ supports hybrid search — combining dense vector search with sparse (BM25/keyword) retrieval and re-ranking results using Reciprocal Rank Fusion (RRF):
from pymilvus import MilvusClient, AnnSearchRequest, RRFRanker, WeightedRanker
dense_request = AnnSearchRequest(
data=[dense_query_vector],
anns_field="dense_embedding",
param={"metric_type": "COSINE", "params": {"ef": 100}},
limit=20,
)
sparse_request = AnnSearchRequest(
data=[sparse_query_vector],
anns_field="sparse_embedding",
param={"metric_type": "IP", "params": {}},
limit=20,
)
results = client.hybrid_search(
collection_name="multimodal_index",
reqs=[dense_request, sparse_request],
ranker=RRFRanker(k=60),
limit=10,
output_fields=["image_path", "caption"],
)16. Performance Tuning and Best Practices
16.1 Index Parameter Tuning for HNSW
Build-time (M and efConstruction):
| Dataset Size | M | efConstruction | Build Time | Memory |
|---|---|---|---|---|
| < 1M vectors | 8 | 100 | Fast | Low |
| 1M–10M | 16 | 200 | Moderate | Moderate |
| 10M–100M | 16–32 | 200–400 | Slow | High |
| 100M+ | 16 | 200 | Very slow | Very high |
Query-time (ef):
# ef must be >= limit (top_k)
search_params = {"ef": 50} # Fast, lower recall
search_params = {"ef": 100} # Balanced (recommended starting point)
search_params = {"ef": 500} # High recall
search_params = {"ef": 2000} # Maximum recall (approaching FLAT accuracy)16.2 Batch Insertion Performance
# Bad: insert one at a time
for record in all_records:
client.insert(collection_name="...", data=[record]) # Very slow!
# Good: insert in batches
for batch in chunked(all_records, batch_size=2000):
client.insert(collection_name="...", data=batch)
# Even better: use multiple workers
from concurrent.futures import ThreadPoolExecutor
def embed_and_insert(batch):
embeddings = embed_batch([r["path"] for r in batch])
data = [{"embedding": emb, **meta} for emb, meta in zip(embeddings, batch)]
return client.insert(collection_name="...", data=data)
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(embed_and_insert, batch) for batch in batches]16.3 Memory Management
# Load collection into memory before querying
client.load_collection("image_embeddings")
# Release collection from memory when not actively querying
client.release_collection("image_embeddings")
# Load only specific partitions into memory
client.load_partitions("image_embeddings", partition_names=["2024"])16.4 Query Cache
For repeated identical queries, cache results at the application level:
import hashlib
import json
_search_cache = {}
def cached_search(collection_name, vector, limit, filter=None, ttl_seconds=300):
vec_bytes = json.dumps([round(v, 6) for v in vector]).encode()
cache_key = f"{collection_name}:{hashlib.sha256(vec_bytes).hexdigest()}:{limit}:{filter}"
if cache_key in _search_cache:
cached_result, cached_at = _search_cache[cache_key]
if time.time() - cached_at < ttl_seconds:
return cached_result
result = client.search(
collection_name=collection_name,
data=[vector],
limit=limit,
filter=filter,
)
_search_cache[cache_key] = (result, time.time())
return result16.5 Monitoring Query Performance
import time
def timed_search(client, collection_name, query_vector, limit=10, **kwargs):
start = time.perf_counter()
results = client.search(
collection_name=collection_name,
data=[query_vector],
limit=limit,
**kwargs,
)
latency_ms = (time.perf_counter() - start) * 1000
print(f"Search latency: {latency_ms:.2f}ms | Results: {len(results[0])}")
return results, latency_ms16.6 Schema Design Best Practices
- Minimize the number of fields. Each additional field adds memory overhead and slows insertion.
- Use
enable_dynamic_field=Truecautiously. Dynamic fields are stored as JSON internally, which is slower to filter than typed fields. - Use INT64 for timestamps, not VARCHAR. Numeric comparisons are much faster.
- Normalize your vectors before insertion. Non-normalized vectors with cosine metric produce incorrect results.
- Choose appropriate VARCHAR lengths. Don’t set
max_length=65535for short strings.
17. Security and Access Control
Authentication
Enable authentication on your Milvus instance to prevent unauthorized access.
In docker-compose.yml:
standalone:
environment:
COMMON_SECURITY_AUTHORIZATIONENABLED: "true"In Python:
client = MilvusClient(
uri="http://localhost:19530",
token="root:Milvus",
)
# Create a new user
client.create_user(user_name="cv_app_user", password="StrongP@ssword123")
# Grant a role
client.grant_role(user_name="cv_app_user", role_name="db_ro")
# List users
client.list_users()Role-Based Access Control (RBAC)
# Create a custom role
client.create_role(role_name="cv_readonly")
# Grant specific privileges
client.grant_privilege(
role_name="cv_readonly",
object_type="Collection",
object_name="image_embeddings",
privilege="Query",
)
client.grant_privilege(
role_name="cv_readonly",
object_type="Collection",
object_name="image_embeddings",
privilege="Search",
)
# Assign role to user
client.grant_role(user_name="cv_app_user", role_name="cv_readonly")TLS/SSL Encryption
client = MilvusClient(
uri="https://milvus.example.com:19530",
token="username:password",
server_pem_path="/path/to/server.pem",
)18. Monitoring and Observability
Milvus Metrics
Milvus exposes Prometheus metrics at http://milvus-host:9091/metrics. Key metrics to monitor:
| Metric | Description | Alert if |
|---|---|---|
milvus_proxy_search_latency_sum |
Search latency | p99 > 500ms |
milvus_querynode_collection_num |
Collections loaded | High |
milvus_datanode_insert_buffer_size |
Insert buffer size | Near limit |
milvus_rootcoord_proxy_num |
Active proxies | Drops to 0 |
milvus_segment_count |
Total segments | Monitor growth |
Setting Up Grafana Dashboard
# Import the official Milvus dashboard (ID: 19120 on grafana.com)
wget https://raw.githubusercontent.com/milvus-io/milvus/master/deployments/monitoring/grafana/milvus-dashboard.jsonApplication-Level Monitoring
import time
from collections import defaultdict
from statistics import mean, quantiles
class MilvusMonitor:
def __init__(self):
self.latencies = defaultdict(list)
self.error_counts = defaultdict(int)
def record_search(self, collection: str, latency_ms: float, success: bool):
if success:
self.latencies[collection].append(latency_ms)
else:
self.error_counts[collection] += 1
def report(self):
for collection, lats in self.latencies.items():
if not lats:
continue
p50 = quantiles(lats, n=100)[49]
p95 = quantiles(lats, n=100)[94]
p99 = quantiles(lats, n=100)[98]
print(f"Collection: {collection}")
print(f" Searches: {len(lats)}, Errors: {self.error_counts[collection]}")
print(f" Latency — mean: {mean(lats):.1f}ms, p50: {p50:.1f}ms, "
f"p95: {p95:.1f}ms, p99: {p99:.1f}ms")
monitor = MilvusMonitor()
def monitored_search(collection_name, query_vector, limit=10, **kwargs):
start = time.perf_counter()
success = True
try:
return client.search(collection_name=collection_name, data=[query_vector], limit=limit, **kwargs)
except Exception:
success = False
raise
finally:
monitor.record_search(collection_name, (time.perf_counter() - start) * 1000, success)19. Common Pitfalls and How to Avoid Them
Pitfall 1: Mismatched Embedding Dimensions
Problem: You created a collection with dim=512 but insert vectors of size 768. Milvus rejects the insert with a dimension mismatch error.
Solution: Always assert dimensions before inserting:
def safe_insert(client, collection_name, data, expected_dim):
for entity in data:
vec = entity.get("embedding", [])
assert len(vec) == expected_dim, f"Expected dim {expected_dim}, got {len(vec)}"
return client.insert(collection_name=collection_name, data=data)Pitfall 2: Searching Before Loading
Problem: In older Milvus / ORM-style API, collections must be explicitly loaded into memory before searching.
Solution:
from pymilvus import Collection
col = Collection("image_embeddings")
col.load()Pitfall 3: Not Normalizing Vectors for Cosine/IP Metrics
Problem: Using cosine or IP metric with unnormalized vectors gives incorrect similarity scores.
Solution:
import numpy as np
vec = np.array(raw_embedding)
vec = vec / np.linalg.norm(vec)Pitfall 4: Setting nprobe Too Low (IVF Indexes)
Problem: Low nprobe (e.g., 1 or 2) with IVF indexes causes very poor recall.
Solution: Start with nprobe = nlist / 16 and benchmark recall. Never use nprobe=1 in production without measurement.
Pitfall 5: Growing Segments and Slow Queries on Fresh Data
Problem: Freshly inserted data sits in unsealed “growing segments” that are brute-force searched.
Solution:
client.flush("image_embeddings")
# Then wait for index building to complete before running benchmarksPitfall 6: VARCHAR Filter on Unindexed Fields
Problem: Filtering on a VARCHAR field without a scalar index forces a full scan.
Solution: Always create scalar indexes on fields you filter by:
index_params.add_index(field_name="label", index_type="Trie")
index_params.add_index(field_name="score", index_type="STL_SORT")
index_params.add_index(field_name="flags", index_type="BITMAP")Pitfall 7: Using auto_id=False Without Providing Unique IDs
Problem: Inserting duplicate IDs causes errors or silent overwrites.
Solution: Use auto_id=True unless you have a strong reason to manage IDs yourself.
Pitfall 8: Confusing Distance Values by Metric Type
Problem: For L2 and COSINE, a lower distance means more similar. For IP, higher means more similar. Misinterpreting this leads to sorting results in the wrong direction.
Solution: Trust Milvus’s returned sort order — it always returns results from most to least similar. Just be careful when comparing raw distance scores across different metric types.
20. Glossary
ANN (Approximate Nearest Neighbor): A search approach that finds results very close to the true nearest neighbors, trading a small amount of accuracy for enormous speed gains.
BM25: A sparse retrieval algorithm based on term frequency and inverse document frequency. Used in hybrid search alongside dense vector search.
Collection: The top-level data container in Milvus, analogous to a table in a relational database.
Cosine Similarity: A distance metric measuring the cosine of the angle between two vectors. Values range from -1 (opposite) to 1 (identical).
DiskANN: A graph-based ANN index designed to work with data stored on disk rather than RAM.
Embedding / Feature Vector: A compact numerical representation of complex data (images, text, audio) produced by a neural network. Similar inputs produce numerically close embeddings.
Entity: A single record (row) in a Milvus collection.
etcd: A distributed key-value store used by Milvus to store cluster metadata, configuration, and service discovery information.
HNSW (Hierarchical Navigable Small World): A graph-based ANN index that builds a multi-layer proximity graph for fast nearest neighbor search. Generally considered the best-performing index for most use cases.
Inner Product (IP): The dot product of two vectors. For normalized (unit) vectors, IP equals cosine similarity.
IVF (Inverted File Index): A family of ANN indexes that clusters vectors into Voronoi cells and searches only the nearest clusters at query time.
L2 (Euclidean Distance): The straight-line distance between two points in Euclidean space.
MinIO: An open-source, S3-compatible object storage system used by Milvus to persist vector data and index files.
Milvus Lite: An embedded, serverless version of Milvus that runs entirely in-process. Best for development and prototyping.
Normalization (L2 normalization): The process of scaling a vector to have unit length (L2 norm = 1). Required for correct behavior with cosine and IP metrics.
Partition: A logical subdivision of a Milvus collection that can be searched independently.
Primary Key: A unique identifier for each entity in a collection.
Product Quantization (PQ): A vector compression technique that divides vectors into sub-vectors and quantizes each independently.
PyMilvus: The official Python SDK for Milvus.
Recall@K: The fraction of the true K nearest neighbors that appear in the returned K results.
Scalar Field: A non-vector field in a Milvus schema used for metadata storage and filtered search.
Schema: The definition of the fields and their data types in a Milvus collection.
Segment: An internal data chunk within a Milvus collection. Growing segments hold new data; sealed segments are immutable and fully indexed.
Sparse Vector: A vector representation where most values are zero, stored as a list of (index, value) pairs.
UPSERT: An operation that inserts an entity if it does not exist, or updates it if it does.
Vector Database: A specialized database designed to store, index, and efficiently search high-dimensional vector embeddings using approximate nearest neighbor algorithms.
Guide version: May 2026 | Milvus 2.4.x+ | PyMilvus 2.4.x+
For the latest Milvus documentation, visit milvus.io/docs