snleee commented on code in PR #10045:
URL: https://github.com/apache/pinot/pull/10045#discussion_r1061952606


##########
pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/PinotSinkFunction.java:
##########
@@ -151,4 +148,18 @@ private void flush()
       LOG.info("Pinot segment uploaded to {}", segmentURI);
     });
   }
+
+  @Override
+  public List<GenericRow> snapshotState(long checkpointId, long timestamp) {

Review Comment:
   @bobby-richard  After giving a bit of thoughts, I would like to understand 
what we are trying to achieve here.  If I understand it correctly, we want to 
get the way of recovering the job in case of the failure by having a 
checkpoints. The simplest way of handling fault tolerance is to depend on 
something like the following:
   
   1. when flush() gets called, we update some state (e.g. checkpoint = x) to 
indicate "all data up to `checkpoint` has been flushed to Pinot. (In Kafka, the 
checkpoint can be the offset).
   2. if the flink job fails in the middle, we start to consume from `x+1`.
   
   I think that it's inefficient to store all the rows as the checkpoint 
because we simply need to store the `last offset that we flushed in the last 
time`. Do you think that the above mechanism is feasible on Flink?
   
   @walterddr Can you also chime in since you have the experience with the 
Flink?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to