Docs Home
Viewing docs for
Self-ManagedNot available for BYOC

Fluss (Private Preview)

On this page

Prerequisites and Limitations

During the Private Preview phase, be aware of the following configurations and limitations:

  • Non-Production Use: This environment is not meant for production workloads or mission-critical data.
  • Dedicated Workspace: We strongly advise creating a dedicated workspace for your Fluss preview to avoid impacting the stability of your existing projects.
  • Storage Limit: The preview cluster provides a limited overall disk size (e.g., 300GB).
  • Replication: The cluster typically sets a replication factor of 3 for high availability. Note that tables will consume disk space proportional to bucket_number * 6GB.
  • Security:
    • No ACLs: The provided credentials grant superuser access. Fine-grained Access Control Lists (ACLs) are not enabled.
    • No TLS: External connections to the Fluss cluster are not protected via TLS. Do not transmit sensitive data over external connections.
  • Scanning Limitations: When connecting externally, you can only scan from the latest offset. Reading from the earliest offset or performing a full snapshot scan is not supported externally.

Connection Details

After your workspace is enabled for the preview, you will receive specific connection details. You will need these to configure your Flink jobs and external applications.

ParameterDescription
Internal Fluss URIUsed for VERA jobs running inside Ververica Cloud. Use this for maximum network speed and reduced latency.
External Fluss URIUsed to connect to the Fluss cluster from applications outside Ververica Cloud (e.g., local development).
Username / PasswordAuthentication credentials required for both internal and external connections.

Quickstart

This guide demonstrates how to create a Fluss catalog, generate test data, and query a primary key table.

1. Create a Fluss Catalog

Run the following SQL in your Ververica Cloud SQL Editor to register the catalog using the Internal URI:

SQL
1CREATE CATALOG fluss WITH (
2  'type' = 'fluss',
3  'bootstrap.servers' = '<YOUR_INTERNAL_FLUSS_URI>',  -- e.g., coordinator-server-0.fluss-vvc...:9127
4  'client.security.protocol' = 'SASL',
5  'client.security.sasl.mechanism' = 'PLAIN',
6  'client.security.sasl.username' = '<YOUR_FLUSS_USERNAME>',
7  'client.security.sasl.password' = '<YOUR_FLUSS_PASSWORD>'
8);

2. Create a Database and Table

Create a database and a partitioned table with a primary key:

SQL
1CREATE DATABASE fluss.quickstart;
2
3CREATE TABLE IF NOT EXISTS fluss.quickstart.pk_table (
4  c1 BIGINT,
5  c2 INT,
6  c3 STRING,
7  c4 BIGINT,
8  c5 INT,
9  c6 STRING,
10  PRIMARY KEY (c1, c2, c3) NOT ENFORCED
11) PARTITIONED BY (c3) WITH (
12  'bucket.num' = '2'
13);

3. Generate Data

Deploy a temporary data generation job using the faker connector to populate the table:

SQL
1CREATE TEMPORARY TABLE fake WITH (
2  'connector' = 'faker',
3  'rows-per-second' = '1000',
4  'fields.c1.expression' = '#{number.numberBetween ''0'',''1000000''}',
5  'fields.c2.expression' = '#{number.numberBetween ''0'',''1000''}',
6  -- Generates a partition key based on a date pattern
7  'fields.c3.expression' = '202601#{regexify ''(01|02|03|04|05|06|07)''}', 
8  'fields.c4.expression' = '#{number.numberBetween ''0'',''1000000''}',
9  'fields.c5.expression' = '#{number.numberBetween ''0'',''1000''}',
10  'fields.c6.expression' = '#{superhero.name}'
11) LIKE fluss.quickstart.pk_table (EXCLUDING ALL);
12
13INSERT INTO fluss.quickstart.pk_table
14SELECT * FROM fake;

4. Query the Data

After the job is running, you can query the table directly:

SQL
1SELECT * FROM fluss.quickstart.pk_table;

For more advanced examples, refer to the official Fluss documentation.

External Connectivity

You can connect to Fluss from external applications (such as a local Java application) using the External URI.

SQL Example (External)

SQL
1SELECT * FROM fluss.quickstart.pk_table 
2/*+ OPTIONS('scan.startup.mode'='latest') */;

Java SDK Example

The following Java snippet demonstrates how to configure the client and scan for records using the LogScanner.

TEXT
1import java.time.Duration;
2import java.util.Collections;
3
4import org.apache.fluss.client.Connection;
5import org.apache.fluss.client.ConnectionFactory;
6import org.apache.fluss.config.Configuration;
7import org.apache.fluss.client.admin.Admin;
8import org.apache.fluss.client.admin.ListOffsetsResult;
9import org.apache.fluss.client.admin.OffsetSpec;
10import org.apache.fluss.client.table.Table;
11import org.apache.fluss.client.table.scanner.ScanRecord;
12import org.apache.fluss.client.table.scanner.log.LogScanner;
13import org.apache.fluss.client.table.scanner.log.ScanRecords;
14import org.apache.fluss.metadata.TableBucket;
15import org.apache.fluss.metadata.TablePath;
16import org.apache.fluss.row.InternalRow;
17
18// 1. Configure Connection
19Configuration conf = new Configuration();
20conf.setString("bootstrap.servers", "<YOUR_EXTERNAL_FLUSS_URI>"); // e.g. ...ververica.cloud:9128
21conf.setString("client.security.protocol", "SASL");
22conf.setString("client.security.sasl.mechanism", "PLAIN");
23conf.setString("client.security.sasl.username", "<YOUR_FLUSS_USERNAME>");
24conf.setString("client.security.sasl.password", "<YOUR_FLUSS_PASSWORD>");
25
26try (Connection conn = ConnectionFactory.createConnection(conf)) {
27    // 2. Access the Table
28    TablePath tablePath = TablePath.of("quickstart", "pk_table");
29    Table table = conn.getTable(tablePath);
30    Admin admin = conn.getAdmin();
31    ListOffsetsResult listOffsetsResult =
32          admin.listOffsets( tablePath, Collections.singleton(0), new 
33OffsetSpec.LatestSpec());
34    Long latest = listOffsetsResult.bucketResult(0).get();
35    
36    // 3. Scan from Latest Offset
37    try (LogScanner logScanner = table.newScan()
38           .createLogScanner()) {
39
40        logScanner.subscribe(0, latest); // Subscribe to bucket 0 at the latest offset
41
42        while (true) {
43            System.out.println("Polling for records...");
44            ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
45            
46            for (TableBucket bucket : scanRecords.buckets()) {
47                for (ScanRecord record : scanRecords.records(bucket)) {
48                    InternalRow row = record.getRow();
49                    // Process your row here
50                    System.out.println("Read row: " + row);
51                }
52            }
53        }
54    }
55}

Continue with Fluss:

Was this helpful?