stevenzwu commented on code in PR #7109:
URL: https://github.com/apache/iceberg/pull/7109#discussion_r1142850082
##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java:
##########
@@ -291,7 +294,25 @@ public DataStream<RowData> build() {
if (env.getMaxParallelism() > 0) {
parallelism = Math.min(parallelism, env.getMaxParallelism());
}
- return env.createInput(format, typeInfo).setParallelism(parallelism);
+
+ DataStreamSource<RowData> source =
Review Comment:
This covers one scenario. there are two other scenarios.
1) Use `FlinkInputFormat` directly. e.g. `StreamingReaderOperator`.
```
private void processSplits() throws IOException {
FlinkInputSplit split = splits.poll();
if (split == null) {
currentSplitState = SplitState.IDLE;
return;
}
format.open(split);
try {
RowData nextElement = null;
while (!format.reachedEnd()) {
nextElement = format.nextRecord(nextElement);
sourceContext.collect(nextElement);
}
} finally {
currentSplitState = SplitState.IDLE;
format.close();
}
// Re-schedule to process the next split.
enqueueProcessSplits();
}
```
2) new Flink FLIP-27 `IcebergSource`. Here is an example from
`IcebergTableSource` that shows how users can construct the DataStream. We can
fix it in `IcebergTableSource`. but we can't control users' code to add the
filter in the `DataStream`. Note that FLIP-27 source will be the future Flink
source.
```
private DataStreamSource<RowData>
createFLIP27Stream(StreamExecutionEnvironment env) {
SplitAssignerType assignerType =
readableConfig.get(FlinkConfigOptions.TABLE_EXEC_SPLIT_ASSIGNER_TYPE);
IcebergSource<RowData> source =
IcebergSource.forRowData()
.tableLoader(loader)
.assignerFactory(assignerType.factory())
.properties(properties)
.project(getProjectedSchema())
.limit(limit)
.filters(filters)
.flinkConfig(readableConfig)
.build();
DataStreamSource stream =
env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
source.name(),
TypeInformation.of(RowData.class));
return stream;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]