Prerequisites
- Ensure you already have an Iceberg table that you can sink data to. For additional guidance on creating a table and setting up Iceberg, refer to this quickstart guide on creating an Iceberg table.
- Ensure you have an upstream materialized view or source that you can sink data from.
Syntax
Parameters
Basic parameters
Parameter name | Description |
---|---|
type | Required. Allowed values: append-only and upsert . |
force_append_only | Optional. If true, forces the sink to be append-only, even if it cannot be. |
s3.endpoint | Optional. Endpoint of the S3.
|
s3.region | Optional. The region where the S3 bucket is hosted. Either s3.endpoint or s3.region must be specified. |
s3.access.key | Required. Access key of the S3 compatible object store. |
s3.secret.key | Required. Secret key of the S3 compatible object store. |
s3.path.style.access | Optional. Determines the access style for S3. If true , use path-style; if false , use virtual-hosted–style. |
enable_config_load | Optional. Controls whether configuration is loaded from environment variables. Set to true will load warehouse credentials from environment variables. Only supported in self-hosted environments. |
database.name | Required. The database of the target Iceberg table. |
table.name | Required. The name of the target Iceberg table. |
catalog.name | Conditional. The name of the Iceberg catalog. It can be omitted for storage catalog but required for other catalogs. |
catalog.type | Optional. The catalog type used in this table. Currently, the supported values are storage, rest, hive, jdbc, and glue. If not specified, storage is used. For details, see Catalogs. |
warehouse.path | Conditional. The path of the Iceberg warehouse. It’s required if the catalog.type is not rest. |
catalog.url | Conditional. The URL of the catalog. It is required when catalog.type is not storage. |
primary_key | The primary key for an upsert sink. It is only applicable to the upsert mode. |
partition_by | Optional. Specify partitioning using column names or transformations. Supported formats include: column , transform(column) , transform(n,column) , and transform(n, column) . Transformations can include functions like bucket or truncate , where n is an optional parameter. Ensure that the specified partition fields exist in the schema. |
commit_checkpoint_interval | Optional. Commit every N checkpoints (N > 0). Default value is 10. The behavior of this field also depends on the sink_decouple setting:
time = barrier_interval_ms × checkpoint_frequency × commit_checkpoint_interval . barrier_interval_ms and checkpoint_frequency are system parameters that define the base checkpointing rate; commit_checkpoint_interval is configurable in the Iceberg sink. |
commit_retry_num | Optional. The number of times to retry a commit when an Iceberg commit fails. Default is 8. |
create_table_if_not_exists | Optional. When set to true , it will automatically create a table for the Iceberg sink. |
catalog.credential | Optional. Credential for accessing the Iceberg catalog, used to exchange for a token in the OAuth2 client credentials flow. Applicable only in the rest catalog. |
catalog.token | Optional. A bearer token for accessing the Iceberg catalog, used for interaction with the server. Applicable only in the rest catalog. |
catalog.oauth2_server_uri | Optional. The oauth2_server_uri for accessing the Iceberg catalog, serving as the token endpoint URI to fetch a token if the rest catalog is not the authorization server. Applicable only in the rest catalog. |
catalog.scope | Optional. Scope for accessing the Iceberg catalog, providing additional scope for OAuth2. Applicable only in the rest catalog. |
Use GCS as object storage for Iceberg
Added in v2.3.0.
storage
or rest
. To enable it, configure the following specific parameters:
Parameter name | Description |
---|---|
warehouse.path | Specifies the Google Cloud Storage path. |
gcs.credential | Base64-encoded credential key obtained from the GCS service account key JSON file. To get this JSON file, refer to the guides of GCS documentation.
|
Example
Use Azure Blob as object storage for Iceberg
Added in v2.4.0.
storage
or rest
. To enable it, configure the following specific parameters:
Parameter name | Description |
---|---|
warehouse.path | Specifies the Azure Blob Storage path. |
azblob.account_name | The Azure Storage account name used to authenticate access. |
azblob.account_key | The Azure Storage account key associated with the account name, used for authentication. |
azblob.endpoint_url | The full endpoint URL of the Azure Blob service. |
Example
Use Amazon S3 Tables with the Iceberg sink
You can configure the RisingWave Iceberg sink connector to use Amazon S3 Tables as its catalog. This setup allows RisingWave to sink data into Iceberg tables managed by the AWS native S3 Tables catalog service. To achieve this, specify therest
catalog type within your CREATE SINK
statement and include the necessary parameters for SigV4 authentication against the S3 Tables REST API.
Required REST Catalog Parameters for S3 Tables:
Parameter name | Description | Value for S3 Tables |
---|---|---|
catalog.rest.signing_region | The AWS region for signing REST catalog requests. | e.g., us-east-1 |
catalog.rest.signing_name | The service name for signing REST catalog requests. | s3tables |
catalog.rest.sigv4_enabled | Enables SigV4 signing for REST catalog requests. Set to true . | true |
Example
Replace placeholder values with your specific AWS account, region, bucket, database, table names, and credentials.
source_table
into the specified Iceberg table (<your-table-name>
) within the <your-database-name>
database, using Amazon S3 Tables to manage the table’s metadata.
Data type mapping
RisingWave converts RisingWave data types from/to Iceberg according to the following data type mapping table:RisingWave Type | Iceberg Type |
---|---|
boolean | boolean |
int | int |
smallint | int |
bigint | long |
real | float |
float | float |
double | double |
varchar | string |
date | date |
timestamptz | timestamptz |
timestamp | timestamp |
map | map |
array | list |
struct | struct |
jsonb | string |
Catalog
Iceberg supports these types of catalogs:Storage catalog
The Storage catalog stores all metadata in the underlying file system, such as Hadoop or S3. Currently, we only support S3 as the underlying file system.Example
REST catalog
RisingWave supports the REST catalog, which acts as a proxy to other catalogs like Hive, JDBC, and Nessie catalog. This is the recommended approach to use RisingWave with Iceberg tables.Example
Hive catalog
RisingWave supports the Hive catalog. You need to setcatalog.type
to hive
to use it.
Example
JDBC catalog
RisingWave supports the JDBC catalog.Example
Glue catalog
PREMIUM EDITION FEATUREThis is a Premium Edition feature. All Premium Edition features are available out of the box without additional cost on RisingWave Cloud. For self-hosted deployments, users need to purchase a license key to access this feature. To purchase a license key, please contact sales team at sales@risingwave-labs.com.For a full list of Premium Edition features, see RisingWave Premium Edition.
Example
Iceberg table format
Currently, RisingWave only supports Iceberg tables in format v2.Exactly-once delivery
RisingWave provides exactly-once delivery semantics for Iceberg sinks. This semantics guarantees that each data event is processed once and only once, even in the presence of failures such as retries or restarts. This level of delivery assurance is essential in scenarios where duplicate records can lead to incorrect analytics or data corruption in downstream systems. Exactly-once delivery is achieved through a two-phase commit protocol involving a pre-commit phase and a commit phase. Iceberg’s commit operations are idempotent, which allows RisingWave to safely retry failed transactions without introducing duplicates. By default, exactly-once semantics is disabled. To enable it for an Iceberg sink, includeis_exactly_once = 'true'
in the WITH
clause of the sink definition. Note that enabling this option introduces additional coordination overhead due to metadata pre-commit, which may impact sink performance in high-throughput workloads.
Examples
This section includes several examples that you can use if you want to quickly experiment with sinking data to Iceberg.Create an Iceberg table (if you do not already have one)
Setcreate_table_if_not_exists
to true
to automatically create an Iceberg table.
Alternatively, use Spark to create a table. For example, the following spark-sql
command creates an Iceberg table named table
under the database dev
in AWS S3. The table is in an S3 bucket named my-iceberg-bucket
in region ap-southeast-1
and under the path path/to/warehouse
. The table has the property format-version=2
, so it supports the upsert option. There should be a folder named s3://my-iceberg-bucket/path/to/warehouse/dev/table/metadata
.
Note that only S3-compatible object store is supported, such as AWS S3 or MinIO.
Create an upstream materialized view or source
The following query creates an append-only source. For more details on creating a source, see CREATE SOURCE .Append-only sink from append-only source
If you have an append-only source and want to create an append-only sink, settype = append-only
in the CREATE SINK
SQL query.
Append-only sink from upsert source
If you have an upsert source and want to create an append-only sink, settype = append-only
and force_append_only = true
. This will ignore delete messages in the upstream, and to turn upstream update messages into insert messages.