snleee commented on code in PR #10045: URL: https://github.com/apache/pinot/pull/10045#discussion_r1061952606
########## pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/PinotSinkFunction.java: ########## @@ -151,4 +148,18 @@ private void flush() LOG.info("Pinot segment uploaded to {}", segmentURI); }); } + + @Override + public List<GenericRow> snapshotState(long checkpointId, long timestamp) { Review Comment: @bobby-richard After giving a bit of thoughts, I would like to understand what we are trying to achieve here. If I understand it correctly, we want to get the way of recovering the job in case of the failure by having checkpoints. The simplest way of handling fault tolerance is to depend on something like the following: 1. when flush() gets called, we update some state (e.g. checkpoint = x) to indicate "all data up to `checkpoint` has been flushed to Pinot. (In Kafka, the checkpoint can be the offset). 2. if the flink job fails in the middle, we start to consume from `x+1`. The above needs to store 1 checkpoint information as part of the checkpoint but the challenge is that the information gets updated in `flush() - PinotFlinkSink` while we need to rewind data from `x+1` in the source. Do you think that the above mechanism is feasible on Flink? In general, I think that it's inefficient to store all the rows for the checkpoint in the sink since we can re-wind data from the source. @walterddr Can you also chime in since you have the experience with the Flink? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org