swapna267 opened a new pull request, #15279:
URL: https://github.com/apache/iceberg/pull/15279
This PR introduces a SQL table connector for using the dynamic iceberg sink.
Two new configuration options have been added to FlinkCreateTableOptions:
- use-dynamic-iceberg-sink (boolean): Enable/disable dynamic sink
functionality
- dynamic-record-generator-impl (string): Fully qualified class name of
the DynamicTableRecordGenerator implementation
Example SQL,
```
CREATE TABLE dynamic_sink_table (
id BIGINT,
data STRING,
database_name STRING,
table_name STRING
) WITH (
'connector' = 'iceberg',
'catalog-type' = 'hadoop',
'catalog-name' = 'my_catalog',
'warehouse' = 's3://my-warehouse/',
'use-dynamic-iceberg-sink' = 'true',
'dynamic-record-generator-impl' =
'com.example.MyDynamicRecordGenerator',
'table.props.write.format.default' = 'parquet',
'table.props.write.target-file-size-bytes' = '134217728'
);
-- Insert data that will be routed to different tables based on
database_name and table_name
INSERT INTO dynamic_sink_table VALUES
(1, 'record1', 'sales', 'orders'),
(2, 'record2', 'sales', 'customers'),
(3, 'record3', 'inventory', 'products');
```
Planning to provide a CustomVariantToDynamicRecordGenerator that can handle
Flink VARIANT type column to generate records of different schemas landing in
tables of corresponding schema.
Will add that in a different PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]