amogh-jahagirdar commented on code in PR #8555:
URL: https://github.com/apache/iceberg/pull/8555#discussion_r1324554391
##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java:
##########
@@ -148,19 +153,24 @@ public void initializeState(StateInitializationContext
context) throws Exception
// Open the table loader and load the table.
this.tableLoader.open();
- this.table = tableLoader.loadTable();
- this.committerMetrics = new IcebergFilesCommitterMetrics(super.metrics,
table.name());
+ Table initTable = tableLoader.loadTable();
+ if (reloadIntervalMs > 0) {
+ this.tableSupplier = new ReloadingTableSupplier(initTable, tableLoader,
reloadIntervalMs);
+ } else {
+ this.tableSupplier = () -> initTable;
+ }
+ this.committerMetrics = new IcebergFilesCommitterMetrics(super.metrics,
initTable.name());
Review Comment:
Nit spacing, newline after the else block
##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java:
##########
@@ -369,4 +369,25 @@ public void testOverrideWriteConfigWithUnknownFileFormat()
{
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid file format: UNRECOGNIZED");
}
+
+ @Test
+ public void testWriteRowReloadEnabled() throws Exception {
+ List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2,
"world"), Row.of(3, "foo"));
+ DataStream<RowData> dataStream =
+ env.addSource(createBoundedSource(rows), ROW_TYPE_INFO)
+ .map(CONVERTER::toInternal,
FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE));
+
+ FlinkSink.forRowData(dataStream)
+ .table(table)
+ .tableLoader(tableLoader)
+ .writeParallelism(parallelism)
+ .reloadIntervalMs(1000)
Review Comment:
If I understand right, internally a reloading table supplier will be created
which actually performs the periodic load table. I guess this test is just
making sure everything works as expected when this is specified. If there's a
simple way for passing some sort of mocked table loader and verifying it gets
called multiple times that's a stronger test
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]