Docs Home
Viewing docs for
BYOCNot available for Self-Managed

Materialized Tables

On this page

Supported Version: VERA Engine 4.1 or later. Note that incremental updates require VERA Engine 4.3 or later.

Overview

Materialized tables in Ververica Cloud are stateful representations of data that the system persists and maintains automatically during query execution. They provide a unified way to define data pipelines, which abstracts the complexity of managing separate batch and streaming jobs.

When you define a materialized table, the system automatically manages the underlying Flink jobs required to keep the data fresh based on your defined interval.

Key Characteristics

  • Automated management: The system handles the refresh logic (streaming or batch) based on the FRESHNESS definition.
  • State persistence: Data is persisted in the storage backend (for example, Paimon on S3), which allows for historical queries and efficient lookups.
  • Unified architecture: Simplifies the "Lambda Architecture" by allowing a single definition to handle both historical backfill and real-time updates.

Prerequisites

To use materialized tables, ensure your environment meets the following requirements:

  • Engine version: Your workspace must use VERA Engine 4.1 or later. Note that incremental updates require VERA Engine 4.3 or later.
  • Catalog: You must configure an Apache Paimon catalog.
  • Storage access: The platform must have read and write access to the object storage (such as S3 or Azure Blob Storage) used by the catalog.

Use Cases

Use CaseBenefits
Stream-to-table (temporal) joinsJoin a stream of events with a slowly changing dimension table materialized in state for efficient lookups.
Windowed aggregationsCalculate and persist rolling metrics, such as counts, sums, or averages, over time windows.
DeduplicationMaintain a stateful index of unique keys to ensure only the first event per key is processed.
CDC handlingConsume database changelogs and materialize the current state of a record.
Real-time dashboardsPersist aggregated KPIs in a queryable format for low-latency access.

Update Modes

A materialized table supports three update modes:

  • Stream update: The table updates continuously as new data arrives.
  • Full batch update: The engine calculates data for the entire table or partition and overwrites the existing data.
  • Incremental batch update: The engine calculates only the data that has changed since the last update and merges it into the materialized table.

Freshness Behavior

The FRESHNESS value determines whether the table updates in stream or batch mode:

  • Stream mode: Used when freshness is less than 30 minutes.
  • Batch mode: Used when freshness is 30 minutes or more.

In batch mode, the engine automatically determines whether to perform a full or an incremental update. The engine prioritizes incremental updates and performs a full update only when the query or table does not support incremental processing.

Incremental Updates

Incremental updates improve efficiency by processing only changed data.

Conditions for Incremental Updates

The engine uses incremental updates only if the materialized table meets the following conditions:

  • The partition.fields.#.date-formatter parameter is not configured in the materialized table definition.
  • The source table does not have a primary key.
  • The query statement supports incremental updates.

Supported SQL Patterns

The following table describes the SQL features supported for incremental updates:

ClauseSupport and Limitations
SELECTSupports selecting columns and scalar function expressions, including user-defined functions (UDFs). Aggregate functions are not supported.
FROMSupports table names or subqueries.
WITHSupports common table expressions (CTEs).
WHERESupports filter conditions with scalar function expressions. Subqueries, such as WHERE [NOT] EXISTS or WHERE [NOT] IN, are not supported.
UNIONOnly UNION ALL is supported.
JOININNER JOIN is supported, but it still reads the full data from both source tables. LEFT/RIGHT/FULL OUTER JOIN is not supported, except for the lateral and lookup join cases below.
LATERAL JOIN[LEFT OUTER] JOIN LATERAL with table function expressions is supported.
Lookup JoinOnly A [LEFT OUTER] JOIN B FOR SYSTEM_TIME AS OF PROCTIME() is supported.
GROUP BYNot supported.

Examples

Example 1: Process Data Using a Scalar Function

This example uses a scalar function to transform order data.

SQL
1CREATE MATERIALIZED TABLE mt_shipped_orders (
2    PRIMARY KEY (order_id) NOT ENFORCED
3)
4FRESHNESS = INTERVAL '30' MINUTE
5AS
6SELECT 
7    order_id,
8    COALESCE(customer_id, 'Unknown') AS customer_id,
9    CAST(order_amount AS DECIMAL(10, 2)) AS order_amount,
10    CASE 
11        WHEN status = 'shipped' THEN 'Completed'
12        WHEN status = 'pending' THEN 'In Progress'
13        ELSE 'Unknown'
14    END AS order_status,
15    DATE_FORMAT(order_ts, 'yyyyMMdd') AS order_date,
16    process_notes(notes) AS notes
17FROM 
18    orders
19WHERE
20    status = 'shipped';

Example 2: Enrich Data Using Lateral and Lookup Joins

This example uses a lateral join to split tags and a lookup join to enrich order data with product information.

SQL
1CREATE MATERIALIZED TABLE mt_enriched_orders (
2    PRIMARY KEY (order_id, order_tag) NOT ENFORCED
3)
4FRESHNESS = INTERVAL '30' MINUTE
5AS
6WITH o AS (
7    SELECT
8        order_id,
9        product_id,
10        quantity,
11        proc_time,
12        e.tag AS order_tag
13    FROM 
14        orders,
15        LATERAL TABLE(split_tags(tags, ',')) AS e(tag))
16SELECT 
17    o.order_id,
18    o.product_id,
19    p.product_name,
20    p.category,
21    o.quantity,
22    p.price,
23    o.quantity * p.price AS total_amount,
24    order_tag
25FROM o 
26LEFT JOIN 
27    product_info FOR SYSTEM_TIME AS OF PROCTIME() AS p
28ON 
29    o.product_id = p.product_id;

Create and Use Materialized Tables

This workflow demonstrates how to build a data pipeline using Paimon and materialized tables. In this scenario, you create ODS (Operational Data Store) tables for user logs and product dimensions, and then build DWD (Data Warehouse Detail) and DWS (Data Warehouse Service) materialized tables to analyze the data.

1. Create a Paimon Catalog

Create a Paimon catalog backed by object storage (S3).

SQL
1CREATE CATALOG paimon WITH (
2  'type' = 'paimon',
3  'metastore' = 'filesystem',
4  'warehouse' = 's3a://your-bucket-name/paimon' -- Replace with your S3 bucket
5);
6
7USE CATALOG paimon;

2. Create ODS Tables

Create the database and the ODS tables for user logs and products.

SQL
1CREATE DATABASE IF NOT EXISTS mt;
2USE mt;
3
4-- Create User Log table
5CREATE TABLE ods_user_log (
6  item_id INT NOT NULL,
7  user_id INT NOT NULL,
8  vtime TIMESTAMP(6),
9  ds VARCHAR(10)
10) 
11PARTITIONED BY(ds)
12WITH (
13  'bucket' = '4',            -- Set the number of buckets to 4.
14  'bucket-key' = 'item_id'   -- Specify the bucket key column.
15);
16
17-- Create Product Dimension table
18CREATE TABLE ods_dim_product (
19  item_id INT NOT NULL,
20  title VARCHAR(255),
21  pict_url VARCHAR(255), 
22  brand_id INT,
23  seller_id INT,
24  PRIMARY KEY(item_id) NOT ENFORCED
25) WITH (
26  'bucket' = '4',
27  'bucket-key' = 'item_id'
28);

3. Generate and Load Data

Use the faker connector to generate sample data and load it into the ODS tables.

SQL
1-- Generate user log data
2CREATE TEMPORARY TABLE `user_log` (
3  item_id INT,
4  user_id INT,
5  vtime TIMESTAMP,  
6  ds AS DATE_FORMAT(CURRENT_DATE,'yyyyMMdd')
7) WITH (
8  'connector' = 'faker',
9  'fields.item_id.expression'='#{number.numberBetween ''0'',''1000''}',
10  'fields.user_id.expression'='#{number.numberBetween ''0'',''100''}',
11  'fields.vtime.expression'='#{date.past ''5'',''HOURS''}',
12  'rows-per-second' = '3'
13);
14
15-- Generate product data
16CREATE TEMPORARY TABLE `dim_product` (
17  item_id INT NOT NULL,
18  title VARCHAR(255),
19  pict_url VARCHAR(255), 
20  brand_id INT,
21  seller_id INT,
22  PRIMARY KEY(item_id) NOT ENFORCED
23) WITH (
24  'connector' = 'faker',
25  'fields.item_id.expression'='#{number.numberBetween ''0'',''1000''}',
26  'fields.title.expression'='#{book.title}',
27  'fields.pict_url.expression'='#{internet.domainName}',
28  'fields.brand_id.expression'='#{number.numberBetween ''1000'',''10000''}',   
29  'fields.seller_id.expression'='#{number.numberBetween ''1000'',''10000''}',
30  'rows-per-second' = '3'
31);
32
33-- Insert data into ODS tables
34BEGIN STATEMENT SET; 
35INSERT INTO ods_user_log 
36  SELECT 
37  item_id,
38  user_id,
39  vtime,
40  CAST(ds AS VARCHAR(10)) AS ds
41FROM `user_log`;
42
43INSERT INTO ods_dim_product
44  SELECT 
45  item_id,
46  title,
47  pict_url,
48  brand_id,
49  seller_id
50FROM `dim_product`;
51END; 

4. Verify ODS Data

Check that the data has been successfully loaded into the Paimon tables.

SQL
1SELECT * FROM ods_dim_product LIMIT 10;
2SELECT * FROM ods_user_log LIMIT 10;

5. Create Materialized Tables

Create materialized tables to process and aggregate the data.

DWD Table: dwd_user_log_product

This table joins the user log stream with the product dimension table.

SQL
1CREATE MATERIALIZED TABLE dwd_user_log_product(
2    PRIMARY KEY (item_id) NOT ENFORCED
3)
4PARTITIONED BY(ds)
5WITH (
6  'partition.fields.ds.date-formatter' = 'yyyyMMdd'
7)
8FRESHNESS = INTERVAL '1' HOUR      -- Refresh data every hour.
9AS SELECT
10  l.ds,
11  l.item_id,
12  l.user_id,
13  l.vtime,
14  r.brand_id,
15  r.seller_id
16FROM ods_user_log l INNER JOIN ods_dim_product r
17ON l.item_id = r.item_id;

DWS Table: dws_overall

This table aggregates daily pageviews (PV) and unique visitors (UV).

SQL
1CREATE MATERIALIZED TABLE dws_overall(
2    PRIMARY KEY(ds, hh) NOT ENFORCED
3)
4PARTITIONED BY(ds)
5WITH (
6  'partition.fields.ds.date-formatter' = 'yyyyMMdd'
7)
8FRESHNESS = INTERVAL '1' HOUR   -- Refresh data every hour.
9AS SELECT 
10    ds,
11    COALESCE(hh, 'day') AS hh,
12    count(*) AS pv,
13    count(distinct user_id) AS uv
14    FROM (SELECT ds, date_format(vtime, 'HH') AS hh, user_id 
15FROM dwd_user_log_product) tmp
16GROUP BY GROUPING SETS(ds, (ds, hh));

6. Start the Materialized Table Jobs

  1. Navigate to the Data Lineage view in the console.
  2. Locate your new materialized tables (dwd_user_log_product and dws_overall).
  3. Click Start on the materialized table nodes. You can click Details on the node to verify the status and view the latest job information. If the freshness is greater than 30 minutes, a corresponding workflow is also created.

7. Backfill or Trigger Updates

To manually refresh data or backfill historical partitions:

  1. In the Data Lineage pane, select the materialized table node (for example, dwd_user_log_product).
  2. In the lower-right corner, click Trigger Update.
  3. In the dialog box, enter the partition value in the ds field (for example, today's date like 20251126).
  4. Select the Cascade checkbox to automatically update downstream materialized tables (like dws_overall).
  5. Click Confirm.

A new workflow instance is created to handle the update.

8. Change Freshness

You can adjust the freshness interval to switch between batch and streaming modes. For example, lowering the freshness to less than 30 minutes switches the job to streaming mode.

  1. In the Data Lineage view, select the materialized table.
  2. Click Stop to stop the current job.
  3. Modify the table definition to set a lower freshness, such as FRESHNESS = INTERVAL '2' MINUTE.
  4. Click Start to restart the table. The table now runs as a streaming job.

9. Query Materialized Tables

You can query the materialized tables directly to verify the results.

SQL
1SELECT * FROM dws_overall ORDER BY hh LIMIT 10;

Create a Periodic Backfill Workflow

You can create a workflow to schedule periodic backfills for your materialized tables. This allows you to automate data refreshes at specific intervals.

  1. Log on to the Ververica Cloud console.
  2. In the left navigation, click Workflows.
  3. Click Create Workflow.
  4. In the configuration panel, enter the following parameters:
    • Workflow Name: Enter a unique name for the workflow.
    • Description: Enter a description for the workflow.
    • Scheduling Type: Select Periodic Scheduling.
    • Scheduling Rule: Set the schedule using a Cron expression (for example, 0 */5 * * * ? to run every 5 minutes).
    • Scheduling Start Time: Set a valid future start time.
    • Resource Queue: Select the resource queue for the workflow.
  5. Click Create.
  6. In the workflow editor canvas, click the initial node (or add a new Materialized Table node) to edit it:
    • Job: Select the materialized table job you want to schedule (for example, dwd_user_log_product).
    • Node Name: Enter a descriptive name.
  7. Click Save in the node editor panel.
  8. Click Save in the upper-right corner of the workflow page.
  9. In the Workflows list, toggle the Status to Enabled (or click Enable) to activate the schedule. The workflow will now execute automatically according to the defined schedule.

Limitations and Known Issues

Be aware of the following behaviors in the current release:

  • Navigation to job details: In the UI, clicking Materialized Table > Details > Job might lead to a "Not Found" page because the filter might fail to locate the resource context.
  • Missing SQL in job panel: When you view the Latest Job panel in Ververica Platform, the underlying SQL that generated the materialized table does not currently display.
  • Workflow list navigation: The workflow list view does not currently support navigating to a complete list of all scheduled jobs associated with a specific materialized table.
  • Workflow detail view: Navigating to materialized table details from the Workflow view might result in a "Not Found" error.
Was this helpful?