snleee commented on code in PR #10045: URL: https://github.com/apache/pinot/pull/10045#discussion_r1061952606
########## pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/PinotSinkFunction.java: ########## @@ -151,4 +148,18 @@ private void flush() LOG.info("Pinot segment uploaded to {}", segmentURI); }); } + + @Override + public List<GenericRow> snapshotState(long checkpointId, long timestamp) { Review Comment: @bobby-richard After giving a bit of thoughts, I would like to understand what we are trying to achieve here. If I understand it correctly, we want to get the way of recovering the job in case of the failure by having a checkpoints. The simplest way of handling fault tolerance is to depend on something like the following: 1. when flush() gets called, we update some state (e.g. checkpoint = x) to indicate "all data up to `checkpoint` has been flushed to Pinot. (In Kafka, the checkpoint can be the offset). 2. if the flink job fails in the middle, we start to consume from `x+1`. I think that it's inefficient to store all the rows as the checkpoint because we simply need to store the `last offset that we flushed in the last time`. Do you think that the above mechanism is feasible on Flink? @walterddr Can you also chime in since you have the experience with the Flink? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org