Milvus Connector
On this page
Connect Flink SQL jobs running on Ververica to Milvus to persist vector and scalar data with upsert semantics based on a primary key.
Prerequisites
- A running Milvus 2.4+ deployment with a reachable network address (host and port).
- A database and collection must already exist in Milvus.
- The collection schema must have:
- One primary key field of type
StringorInteger, withAUTO_IDdisabled. - One or more vector fields (e.g.,
FloatVector) with a known dimension.
- One primary key field of type
- Credentials (
userName,password) if authentication is enabled on your Milvus instance.
Overview
The Milvus connector for Flink SQL provides a sink to write streaming or batch data into a Milvus collection. It is ideal for real-time AI and vector-search use cases.
- Upsert Semantics: The sink processes changelog data, performing an
upsertforINSERTandUPDATE_AFTERrecords and adeleteforDELETErecords. - Buffered Writes: Data is buffered and written in batches to optimize for throughput.
- Streaming or Batch: The connector supports both bounded (batch) and unbounded (streaming) write modes.
Syntax and Example
The following example shows how to define a Flink table that sinks to an existing Milvus collection and then insert data into it.
1-- Define a sink into an existing Milvus collection
2CREATE TABLE milvus_sink (
3 id BIGINT NOT NULL,
4 name STRING,
5 age INT,
6 vec ARRAY<FLOAT>, -- maps to a FloatVector field in Milvus
7 PRIMARY KEY (id) NOT ENFORCED
8) WITH (
9 'connector' = 'milvus',
10 'endpoint' = 'milvus-dev.my.company', -- or IP
11 'port' = '19530',
12 'userName' = 'root',
13 'password' = 'Milvus',
14 'databaseName' = 'default',
15 'collectionName' = 'people',
16 -- tune buffering for throughput vs. latency
17 'sink.buffer-flush.max-rows' = '200',
18 'sink.buffer-flush.interval' = '3s'
19);
20
21-- Example bounded source (datagen) and insert
22CREATE TEMPORARY TABLE gen (
23 id BIGINT NOT NULL,
24 name STRING,
25 age INT,
26 PRIMARY KEY (id) NOT ENFORCED
27) WITH (
28 'connector' = 'datagen',
29 'number-of-rows' = '3',
30 'fields.id.kind' = 'sequence',
31 'fields.id.start' = '1',
32 'fields.id.end' = '3'
33);
34
35INSERT INTO milvus_sink
36SELECT id, name, age,
37 ARRAY[CAST(0.1 AS FLOAT), CAST(0.2 AS FLOAT), CAST(0.3 AS FLOAT)] AS vec
38FROM gen;The vector dimension in your ARRAY<FLOAT> data must exactly match the dimension of the corresponding vector field in your Milvus collection's schema (3 in this example).
Feeding Deletes
To delete a record in Milvus, your source must be a changelog source (like upsert-kafka with a changelog-json format) that can emit DELETE records. The sink will then publish a DELETE event for the given primary key.
1CREATE TEMPORARY TABLE tombstones_kafka (
2 id BIGINT NOT NULL,
3 name STRING,
4 age INT,
5 vec ARRAY<FLOAT>,
6 PRIMARY KEY (id) NOT ENFORCED
7) WITH (
8 'connector' = 'upsert-kafka',
9 'topic' = 'people-changelog',
10 'properties.bootstrap.servers' = 'kafka-broker:9092',
11 'key.format' = 'json',
12 'value.format' = 'changelog-json',
13 'value.fields-include' = 'ALL'
14);
15
16INSERT INTO MilvusPeople
17SELECT * FROM tombstones_kafka;Partitioning
The connector supports writing to a specific Milvus partition.
Static Partitioning
To write all data from the sink into a single, fixed partition, specify its name using the partitionName option.
1CREATE TABLE MilvusSink (...) WITH (
2 'connector' = 'milvus',
3 'partitionName' = 'my_partition',
4 ...
5);Partition Key Routing
If your Milvus collection is configured with a partition key, you can enable automatic routing. The connector will use the value from the corresponding field in your Flink data to route each record to the correct partition.
1CREATE TABLE MilvusSink (...) WITH (
2 'connector' = 'milvus',
3 'partitionKey.enabled' = 'true',
4 ...
5);WITH Options
All WITH option names use camelCase (e.g., databaseName, collectionName). Ensure you use these names exactly.
Operational Tips and Limits
- Sink-only: The Milvus connector currently only supports writing data (sink). It cannot be used as a source or for lookup joins.
- Deletes Require Changelogs: To delete records in Milvus, you must use a source that produces changelog data with
DELETEoperations. - Indexing for Queries: To query data immediately after writing, you must first create a vector index on your collection and then load the collection into memory. This is a Milvus requirement.
- Network Accessibility: Ensure that your Ververica deployment's network configuration (e.g., VPC, Security Groups, Firewalls) allows outbound traffic to your Milvus instance's host and port. Additionally, configure your Milvus instance's firewall or security group to allow inbound traffic from your Ververica deployment.
Milvus Setup for Testing
This section shows how to get a Milvus instance running with Docker and how to perform basic operations using the Python client. This is useful for local development and for following the examples in a test environment.
Step 1: Set Up Milvus with Docker
This example uses an Ubuntu 22.04 environment.
- Install Docker and Docker Compose
Connect to your host machine with SSH and run the following commands to install the necessary packages.
1sudo apt-get update -y
2sudo apt-get install -y ca-certificates curl gnupg lsb-release
3
4sudo install -m 0755 -d /etc/apt/keyrings
5curl -fsSL https://download.docker.com/linux/ubuntu/gpg \
6 | sudo gpg --dearmor -o /etc/apt/keyrings/docker.gpg
7echo \
8 "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.gpg] \
9 https://download.docker.com/linux/ubuntu \
10 $(. /etc/os-release; echo "$VERSION_CODENAME") stable" \
11 | sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
12
13sudo apt-get update -y
14sudo apt-get install -y docker-ce docker-ce-cli containerd.io \
15 docker-buildx-plugin docker-compose-plugin
16
17sudo systemctl enable --now docker
18sudo usermod -aG docker $USERYou may need to log out and log back in for the user group changes to take effect.
- Create a `docker-compose.yaml` File
This file defines the Milvus service and its dependencies (etcd and MinIO).
1version: '3.8'
2
3services:
4 etcd:
5 image: quay.io/coreos/etcd:v3.5.11
6 container_name: etcd
7 restart: unless-stopped
8 command: >
9 /usr/local/bin/etcd
10 -advertise-client-urls http://etcd:2379
11 -listen-client-urls http://0.0.0.0:2379
12 -data-dir /etcd-data
13 volumes:
14 - etcd_data:/etcd-data
15
16 minio:
17 image: minio/minio:latest
18 container_name: minio
19 restart: unless-stopped
20 environment:
21 MINIO_ROOT_USER: minioadmin
22 MINIO_ROOT_PASSWORD: minioadmin
23 command: server /data --console-address ":9001"
24 ports:
25 - "9000:9000"
26 - "9001:9001"
27 volumes:
28 - minio_data:/data
29
30 milvus:
31 image: milvusdb/milvus:v2.4.10
32 container_name: milvus
33 restart: unless-stopped
34 environment:
35 ETCD_ENDPOINTS: etcd:2379
36 MINIO_ADDRESS: minio:9000
37 command: ["milvus", "run", "standalone"]
38 depends_on:
39 - etcd
40 - minio
41 ports:
42 - "19530:19530"
43 - "9091:9091"
44 volumes:
45 - milvus_data:/var/lib/milvus
46
47volumes:
48 etcd_data:
49 minio_data:
50 milvus_data:- Start the Services
From the same directory, run the following command.
1sudo docker compose up -dYou can verify that the services are running with docker ps.
Step 3: Prepare Collection and Query with Python
You can interact with your Milvus instance using the pymilvus client.
- Install the Client
1pip install pymilvus- Run the Python Script
The following script connects to Milvus, creates a collection (if it doesn't exist), creates an index, and loads the collection into memory so it's ready for queries.
1from pymilvus import (
2 connections, utility, Collection, FieldSchema, CollectionSchema, DataType
3)
4
5
6HOST = "localhost" # Use your EC2 host IP if running remotely
7PORT = "19530"
8COLLECTION_NAME = "people_demo"
9VECTOR_FIELD = "vec"
10DIMENSION = 3
11
12# --- Connect to Milvus ---
13connections.connect("default", host=HOST, port=PORT)
14print("Connected to Milvus.")
15
16# --- Create collection if it's missing ---
17if COLLECTION_NAME not in utility.list_collections():
18 schema = CollectionSchema(
19 fields=[
20 FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=False),
21 FieldSchema(name="name", dtype=DataType.VARCHAR, max_length=255),
22 FieldSchema(name="age", dtype=DataType.INT32),
23 FieldSchema(name=VECTOR_FIELD, dtype=DataType.FLOAT_VECTOR, dim=DIMENSION),
24 ],
25 description="People demo collection"
26 )
27 Collection(name=COLLECTION_NAME, schema=schema)
28 print(f"Created collection: {COLLECTION_NAME}")
29else:
30 print(f"Collection '{COLLECTION_NAME}' already exists.")
31
32col = Collection(COLLECTION_NAME)
33
34# --- Create a vector index if one doesn't exist ---
35if not col.has_index():
36 print("Creating index on vector field...")
37 col.create_index(
38 field_name=VECTOR_FIELD,
39 index_params={"index_type": "AUTOINDEX", "metric_type": "L2"}
40 )
41 print("Waiting for index to build...")
42 utility.wait_for_index_building_complete(COLLECTION_NAME)
43 print("Index is ready.")
44
45# --- Load collection and query ---
46print("Loading collection into memory...")
47col.load()
48print(f"Collection loaded. Number of entities: {col.num_entities}")
49
50# Example: Query for records after inserting from Flink
51rows = col.query(expr="id >= 1", output_fields=["id","name","age"])
52print("Query results:")
53print(rows)Troubleshooting
- "Missing required options...": Verify that all
WITHoption names use the correct camelCase format (e.g.,databaseName). - "Index not found" or "Collection not loaded": Ensure you have created a vector index on your collection and have called the
.load()command before querying. - "Dimension mismatch": The length of the vector in your Flink
ARRAY<FLOAT>data must exactly match the dim specified for the vector field in your Milvus collection schema.