Docs Home
Viewing docs for
BYOCSelf-Managed

Milvus Connector

On this page

Connect Flink SQL jobs running on Ververica to Milvus to persist vector and scalar data with upsert semantics based on a primary key.

Prerequisites

  • A running Milvus 2.4+ deployment with a reachable network address (host and port).
  • A database and collection must already exist in Milvus.
  • The collection schema must have:
    • One primary key field of type String or Integer, with AUTO_ID disabled.
    • One or more vector fields (e.g., FloatVector) with a known dimension.
  • Credentials (userName, password) if authentication is enabled on your Milvus instance.

Overview

The Milvus connector for Flink SQL provides a sink to write streaming or batch data into a Milvus collection. It is ideal for real-time AI and vector-search use cases.

  • Upsert Semantics: The sink processes changelog data, performing an upsert for INSERT and UPDATE_AFTER records and a delete for DELETE records.
  • Buffered Writes: Data is buffered and written in batches to optimize for throughput.
  • Streaming or Batch: The connector supports both bounded (batch) and unbounded (streaming) write modes.

Syntax and Example

The following example shows how to define a Flink table that sinks to an existing Milvus collection and then insert data into it.

SQL
1-- Define a sink into an existing Milvus collection
2CREATE TABLE milvus_sink (
3  id  BIGINT NOT NULL,
4  name STRING,
5  age  INT,
6  vec  ARRAY<FLOAT>,               -- maps to a FloatVector field in Milvus
7  PRIMARY KEY (id) NOT ENFORCED
8) WITH (
9  'connector'                  = 'milvus',
10  'endpoint'                   = 'milvus-dev.my.company',  -- or IP
11  'port'                       = '19530',
12  'userName'                   = 'root',
13  'password'                   = 'Milvus',
14  'databaseName'               = 'default',
15  'collectionName'             = 'people',
16  -- tune buffering for throughput vs. latency
17  'sink.buffer-flush.max-rows' = '200',
18  'sink.buffer-flush.interval' = '3s'
19);
20
21-- Example bounded source (datagen) and insert
22CREATE TEMPORARY TABLE gen (
23  id BIGINT NOT NULL,
24  name STRING,
25  age INT,
26  PRIMARY KEY (id) NOT ENFORCED
27) WITH (
28  'connector' = 'datagen',
29  'number-of-rows' = '3',
30  'fields.id.kind'  = 'sequence',
31  'fields.id.start' = '1',
32  'fields.id.end'   = '3'
33);
34
35INSERT INTO milvus_sink
36SELECT id, name, age,
37       ARRAY[CAST(0.1 AS FLOAT), CAST(0.2 AS FLOAT), CAST(0.3 AS FLOAT)] AS vec
38FROM gen;

Feeding Deletes

To delete a record in Milvus, your source must be a changelog source (like upsert-kafka with a changelog-json format) that can emit DELETE records. The sink will then publish a DELETE event for the given primary key.

SQL
1CREATE TEMPORARY TABLE tombstones_kafka (
2  id  BIGINT NOT NULL,
3  name STRING,
4  age  INT,
5  vec  ARRAY<FLOAT>,
6  PRIMARY KEY (id) NOT ENFORCED
7) WITH (
8  'connector'     = 'upsert-kafka',
9  'topic'         = 'people-changelog',
10  'properties.bootstrap.servers' = 'kafka-broker:9092',
11  'key.format'    = 'json',
12  'value.format'  = 'changelog-json',
13  'value.fields-include' = 'ALL'
14);
15
16INSERT INTO MilvusPeople
17SELECT * FROM tombstones_kafka;

Partitioning

The connector supports writing to a specific Milvus partition.

Static Partitioning

To write all data from the sink into a single, fixed partition, specify its name using the partitionName option.

SQL
1CREATE TABLE MilvusSink (...) WITH (
2  'connector'     = 'milvus',
3  'partitionName' = 'my_partition',
4  ...
5);

Partition Key Routing

If your Milvus collection is configured with a partition key, you can enable automatic routing. The connector will use the value from the corresponding field in your Flink data to route each record to the correct partition.

SQL
1CREATE TABLE MilvusSink (...) WITH (
2  'connector'            = 'milvus',
3  'partitionKey.enabled' = 'true',
4  ...
5);

WITH Options

OptionRequiredDefaultDescription
connectorYesMust be 'milvus'.
endpointYesThe hostname or IP address of the Milvus instance.
portNo19530The gRPC port for the Milvus instance.
userNameYesThe username for Milvus authentication.
passwordYesThe password for Milvus authentication.
databaseNameYesThe target database name in Milvus.
collectionNameYesThe target collection name in Milvus.
partitionNameNoThe name of a fixed partition to write to.
partitionKey.enabledNofalseSet to true to use the collection’s defined partition key for routing.
sink.buffer-flush.max-rowsNo200The maximum number of records to buffer before flushing. 0 disables this.
sink.buffer-flush.intervalNo3sThe maximum time between flushes. 0s disables this.
sink.max-retriesNo3The number of times to retry on write errors.
sink.parallelismNoplannerManually overrides the parallelism of the sink operator.
sink.ignoreDeleteNofalseIf true, the sink will ignore DELETE records from the changelog source.

Operational Tips and Limits

  • Sink-only: The Milvus connector currently only supports writing data (sink). It cannot be used as a source or for lookup joins.
  • Deletes Require Changelogs: To delete records in Milvus, you must use a source that produces changelog data with DELETE operations.
  • Indexing for Queries: To query data immediately after writing, you must first create a vector index on your collection and then load the collection into memory. This is a Milvus requirement.
  • Network Accessibility: Ensure that your Ververica deployment's network configuration (e.g., VPC, Security Groups, Firewalls) allows outbound traffic to your Milvus instance's host and port. Additionally, configure your Milvus instance's firewall or security group to allow inbound traffic from your Ververica deployment.

Milvus Setup for Testing

This section shows how to get a Milvus instance running with Docker and how to perform basic operations using the Python client. This is useful for local development and for following the examples in a test environment.

Step 1: Set Up Milvus with Docker

This example uses an Ubuntu 22.04 environment.

  1. Install Docker and Docker Compose

Connect to your host machine with SSH and run the following commands to install the necessary packages.

BASH
1sudo apt-get update -y
2sudo apt-get install -y ca-certificates curl gnupg lsb-release
3
4sudo install -m 0755 -d /etc/apt/keyrings
5curl -fsSL https://download.docker.com/linux/ubuntu/gpg \
6  | sudo gpg --dearmor -o /etc/apt/keyrings/docker.gpg
7echo \
8  "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.gpg] \
9  https://download.docker.com/linux/ubuntu \
10  $(. /etc/os-release; echo "$VERSION_CODENAME") stable" \
11  | sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
12
13sudo apt-get update -y
14sudo apt-get install -y docker-ce docker-ce-cli containerd.io \
15  docker-buildx-plugin docker-compose-plugin
16
17sudo systemctl enable --now docker
18sudo usermod -aG docker $USER
  1. Create a `docker-compose.yaml` File

This file defines the Milvus service and its dependencies (etcd and MinIO).

YAML
1version: '3.8'
2
3services:
4  etcd:
5    image: quay.io/coreos/etcd:v3.5.11
6    container_name: etcd
7    restart: unless-stopped
8    command: >
9      /usr/local/bin/etcd
10      -advertise-client-urls http://etcd:2379
11      -listen-client-urls http://0.0.0.0:2379
12      -data-dir /etcd-data
13    volumes:
14      - etcd_data:/etcd-data
15
16  minio:
17    image: minio/minio:latest
18    container_name: minio
19    restart: unless-stopped
20    environment:
21      MINIO_ROOT_USER: minioadmin
22      MINIO_ROOT_PASSWORD: minioadmin
23    command: server /data --console-address ":9001"
24    ports:
25      - "9000:9000"
26      - "9001:9001"
27    volumes:
28      - minio_data:/data
29
30  milvus:
31    image: milvusdb/milvus:v2.4.10
32    container_name: milvus
33    restart: unless-stopped
34    environment:
35      ETCD_ENDPOINTS: etcd:2379
36      MINIO_ADDRESS: minio:9000
37    command: ["milvus", "run", "standalone"]
38    depends_on:
39      - etcd
40      - minio
41    ports:
42      - "19530:19530"
43      - "9091:9091"
44    volumes:
45      - milvus_data:/var/lib/milvus
46
47volumes:
48  etcd_data:
49  minio_data:
50  milvus_data:
  1. Start the Services

From the same directory, run the following command.

BASH
1sudo docker compose up -d

You can verify that the services are running with docker ps.

Step 3: Prepare Collection and Query with Python

You can interact with your Milvus instance using the pymilvus client.

  1. Install the Client
BASH
1pip install pymilvus
  1. Run the Python Script

The following script connects to Milvus, creates a collection (if it doesn't exist), creates an index, and loads the collection into memory so it's ready for queries.

PYTHON
1from pymilvus import (
2    connections, utility, Collection, FieldSchema, CollectionSchema, DataType
3)
4
5
6HOST = "localhost"  # Use your EC2 host IP if running remotely
7PORT = "19530"
8COLLECTION_NAME = "people_demo"
9VECTOR_FIELD = "vec"
10DIMENSION = 3
11
12# --- Connect to Milvus ---
13connections.connect("default", host=HOST, port=PORT)
14print("Connected to Milvus.")
15
16# --- Create collection if it's missing ---
17if COLLECTION_NAME not in utility.list_collections():
18    schema = CollectionSchema(
19        fields=[
20            FieldSchema(name="id",   dtype=DataType.INT64, is_primary=True, auto_id=False),
21            FieldSchema(name="name", dtype=DataType.VARCHAR, max_length=255),
22            FieldSchema(name="age",  dtype=DataType.INT32),
23            FieldSchema(name=VECTOR_FIELD, dtype=DataType.FLOAT_VECTOR, dim=DIMENSION),
24        ],
25        description="People demo collection"
26    )
27    Collection(name=COLLECTION_NAME, schema=schema)
28    print(f"Created collection: {COLLECTION_NAME}")
29else:
30    print(f"Collection '{COLLECTION_NAME}' already exists.")
31
32col = Collection(COLLECTION_NAME)
33
34# --- Create a vector index if one doesn't exist ---
35if not col.has_index():
36    print("Creating index on vector field...")
37    col.create_index(
38        field_name=VECTOR_FIELD,
39        index_params={"index_type": "AUTOINDEX", "metric_type": "L2"}
40    )
41    print("Waiting for index to build...")
42    utility.wait_for_index_building_complete(COLLECTION_NAME)
43    print("Index is ready.")
44
45# --- Load collection and query ---
46print("Loading collection into memory...")
47col.load()
48print(f"Collection loaded. Number of entities: {col.num_entities}")
49
50# Example: Query for records after inserting from Flink
51rows = col.query(expr="id >= 1", output_fields=["id","name","age"])
52print("Query results:")
53print(rows)

Troubleshooting

  • "Missing required options...": Verify that all WITH option names use the correct camelCase format (e.g., databaseName).
  • "Index not found" or "Collection not loaded": Ensure you have created a vector index on your collection and have called the .load() command before querying.
  • "Dimension mismatch": The length of the vector in your Flink ARRAY<FLOAT> data must exactly match the dim specified for the vector field in your Milvus collection schema.
Was this helpful?