stevenzwu commented on code in PR #8555:
URL: https://github.com/apache/iceberg/pull/8555#discussion_r1329395164
##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java:
##########
@@ -142,4 +160,35 @@ public String toString() {
.toString();
}
}
+
+ class NoOpTableLoader implements TableLoader {
Review Comment:
is `StaticTableLoader` or `ImmutableTableLoader` more accurate? this is also
a cached loader that never expires.
##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java:
##########
@@ -57,13 +61,24 @@ public void open() {
// Initialize the task writer factory.
this.taskWriterFactory.initialize(subTaskId, attemptId);
+ // Refresh the table if needed.
+ this.tableLoader.open();
+ if (this.taskWriterFactory instanceof RowDataTaskWriterFactory) {
Review Comment:
here we can check if `tableLoader instanceOf CachingTableLoader`?
##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java:
##########
@@ -266,6 +268,20 @@ public Builder upsert(boolean enabled) {
return this;
}
+ /**
+ * Sets the table loader used to refresh the table instance in {@link
IcebergStreamWriter}. If
+ * not specified then the default behavior is to not refresh the table,
and the initial table
+ * instance initialized is used for the lifetime of the job.
+ *
+ * @param newTableLoader the table loader to use to refresh the table in
writer subtasks
+ * @return {@link Builder} to connect the iceberg table.
+ */
+ @Experimental
+ public Builder writeTableLoader(TableLoader newTableLoader) {
Review Comment:
wondering if we need a separate `writeTableLoader`? @bryanck you mentioned
that committer also can have the same problem. Note that `TableLoader` already
has a `clone` method for other reasons/problems. it can be cloned.
if we use the same table loader, I guess we need a way to figure out if the
loaded table needs to be refreshed. Should we check the loader class type like
`instanceOf CacheTableLoader/RefreshingTableLoader` ? this way we can also have
one less public API change.
##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java:
##########
@@ -237,15 +237,20 @@ public void notifyCheckpointComplete(long checkpointId)
throws Exception {
// For step#4, we don't need to commit iceberg table again because in
step#3 we've committed all
// the files,
// Besides, we need to maintain the max-committed-checkpoint-id to be
increasing.
- if (checkpointId > maxCommittedCheckpointId) {
- LOG.info("Checkpoint {} completed. Attempting commit.", checkpointId);
- commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId,
operatorUniqueId, checkpointId);
- this.maxCommittedCheckpointId = checkpointId;
- } else {
- LOG.info(
- "Skipping committing checkpoint {}. {} is already committed.",
- checkpointId,
- maxCommittedCheckpointId);
+ try {
+ if (checkpointId > maxCommittedCheckpointId) {
+ LOG.info("Checkpoint {} completed. Attempting commit.", checkpointId);
+ commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId,
operatorUniqueId, checkpointId);
+ this.maxCommittedCheckpointId = checkpointId;
+ } else {
+ LOG.info(
+ "Skipping committing checkpoint {}. {} is already committed.",
+ checkpointId,
+ maxCommittedCheckpointId);
+ }
+ } finally {
Review Comment:
this doesn't have to be `finally`, right? if commit failed, job will restart
anyway.
##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java:
##########
@@ -105,10 +104,17 @@ public RowDataTaskWriterFactory(
}
}
+ public void setTable(Table table) {
Review Comment:
I guess this is the public API change that is not avoidable with the
approach in this PR? Maybe mark it as `@Experiemental` too
##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java:
##########
@@ -38,6 +38,8 @@ public interface TableLoader extends Closeable, Serializable,
Cloneable {
void open();
+ boolean isOpen();
Review Comment:
is this necessary?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]