singhpk234 commented on issue #10156:
URL: https://github.com/apache/iceberg/issues/10156#issuecomment-2297093657

   @cccs-jc no i wasn't i tried this unit test : 
   
   ```
     @TestTemplate
     public void testResumingStreamReadFromCheckpointWithStreamFromTimestamp() 
throws Exception {
       File writerCheckpointFolder = 
temp.resolve("writer-checkpoint-folder").toFile();
       File writerCheckpoint = new File(writerCheckpointFolder, 
"writer-checkpoint");
       File output = temp.resolve("junit").toFile();
   
       DataStreamWriter querySource =
               spark
                       .readStream()
                       .format("iceberg")
                       .load(tableName)
                       .writeStream()
                       .option("checkpointLocation", 
writerCheckpoint.toString())
                       .option(SparkReadOptions.STREAM_FROM_TIMESTAMP, 
System.currentTimeMillis())
                       .format("parquet")
                       .queryName("checkpoint_test")
                       .option("path", output.getPath());
   
       StreamingQuery startQuery = querySource.start();
       startQuery.processAllAvailable();
       startQuery.stop();
   
       List<SimpleRecord> expected = Lists.newArrayList();
       for (List<List<SimpleRecord>> expectedCheckpoint :
               TEST_DATA_MULTIPLE_WRITES_MULTIPLE_SNAPSHOTS) {
         // New data was added while the stream was down
         appendDataAsMultipleSnapshots(expectedCheckpoint);
         
expected.addAll(Lists.newArrayList(Iterables.concat(Iterables.concat(expectedCheckpoint))));
         
         // Stream starts up again from checkpoint read the newly added data 
and shut down
         StreamingQuery restartedQuery = querySource.start();
         restartedQuery.processAllAvailable();
         restartedQuery.stop();
   
         // Read data added by the stream
         List<SimpleRecord> actual =
                 
spark.read().load(output.getPath()).as(Encoders.bean(SimpleRecord.class)).collectAsList();
         
assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
       }
     }
   ```
   
   
   I think this may be that i am reading using the same spark session, when you 
kill the job how do you do it can you elaborate more. 
   
   Can you please apply this patch and test  see this explanation if you are 
starting a new spark session ? 
https://github.com/apache/iceberg/pull/4473#issuecomment-1086892995
   
   If it fixes your case i will add a pr for the same.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to