stevenzwu commented on code in PR #9547:
URL: https://github.com/apache/iceberg/pull/9547#discussion_r1468280113


##########
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java:
##########
@@ -97,6 +97,31 @@ public void clean() {
     super.clean();
   }
 
+  private void insertRows(String partition, String branch, Table table, Row... 
rows)
+      throws IOException {
+    GenericAppenderHelper appender = new GenericAppenderHelper(table, FORMAT, 
temporaryDirectory);
+
+    GenericRecord gRecord = GenericRecord.create(table.schema());
+    List<Record> records = Lists.newArrayList();
+    for (Row row : rows) {
+      records.add(
+          gRecord.copy(
+              "id", row.getField(0),
+              "data", row.getField(1),
+              "dt", row.getField(2)));
+    }
+
+    if (partition != null) {
+      appender.appendToTable(TestHelpers.Row.of(partition, 0), records);

Review Comment:
   we could also append to a table with both partition and branch set



##########
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java:
##########
@@ -205,20 +234,132 @@ public void testConsumeFromBeginning() throws Exception {
   }
 
   @TestTemplate
-  public void testConsumeFilesWithBranch() throws Exception {
+  /**
+   * Insert records on the main branch. Then, insert in a named branch. Reads 
from the main branch
+   * and assert that the only records from main are returned
+   */
+  public void testConsumeFilesFromMainOnlyWithBranch() throws Exception {
     sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
     Table table = 
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));
+
+    // Produce two snapshots on main branch
     Row row1 = Row.of(1, "aaa", "2021-01-01");
     Row row2 = Row.of(2, "bbb", "2021-01-01");
+
     insertRows(table, row1, row2);
+    String branchName = "b1";
+    table.manageSnapshots().createBranch(branchName).commit();
 
-    Assertions.assertThatThrownBy(
-            () ->
-                exec(
-                    "SELECT * FROM %s /*+ OPTIONS('streaming'='true', 
'monitor-interval'='1s', 'branch'='b1')*/",
-                    TABLE))
-        .isInstanceOf(IllegalArgumentException.class)
-        .hasMessage("Cannot scan table using ref b1 configured for streaming 
reader yet");
+    // insert on the 'b1' branch
+    Row row3 = Row.of(3, "ccc", "2021-01-01");
+    Row row4 = Row.of(4, "ddd", "2021-01-01");
+
+    insertRowsInBranch(branchName, table, row3, row4);
+
+    // read from main
+    TableResult result =
+        exec("SELECT * FROM %s /*+ OPTIONS('streaming'='true', 
'monitor-interval'='1s')*/", TABLE);
+
+    try (CloseableIterator<Row> iterator = result.collect()) {
+      // the start snapshot(row2) is exclusive.
+      assertRows(ImmutableList.of(row1, row2), iterator);
+
+      Row row5 = Row.of(5, "eee", "2021-01-01");
+      Row row6 = Row.of(6, "fff", "2021-01-01");
+      insertRows(table, row5, row6);
+      assertRows(ImmutableList.of(row5, row6), iterator);
+
+      Row row7 = Row.of(7, "ggg", "2021-01-01");
+      insertRows(table, row7);
+      assertRows(ImmutableList.of(row7), iterator);
+    }
+    result.getJobClient().ifPresent(JobClient::cancel);
+  }
+
+  @TestTemplate
+  /**
+   * Insert records on the main branch. Then insert record on named branch. 
Then select from the
+   * named branch and assert all the records are returned
+   */
+  public void testConsumeFilesFromBranch() throws Exception {
+    sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
+    Table table = 
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));
+
+    // Produce two snapshots on main branch
+    Row row1 = Row.of(1, "aaa", "2021-01-01");
+    Row row2 = Row.of(2, "bbb", "2021-01-01");
+
+    insertRows(table, row1, row2);
+    String branchName = "b1";
+    table.manageSnapshots().createBranch(branchName).commit();
+
+    // read from main
+    TableResult result =
+        exec(
+            "SELECT * FROM %s  /*+ OPTIONS('streaming'='true', 
'monitor-interval'='1s', 'branch'='%s')*/ ",
+            TABLE, branchName);
+
+    try (CloseableIterator<Row> iterator = result.collect()) {
+      assertRows(ImmutableList.of(row1, row2), iterator);
+      // insert on the 'b1' branch
+      Row row3 = Row.of(3, "ccc", "2021-01-01");
+      Row row4 = Row.of(4, "ddd", "2021-01-01");
+      insertRowsInBranch(branchName, table, row3, row4);
+      assertRows(ImmutableList.of(row3, row4), iterator);
+    }
+    result.getJobClient().ifPresent(JobClient::cancel);
+  }
+
+  @TestTemplate
+  /**
+   * Insert records on branch b1. Then insert record on b2. Then select from 
each branch and assert
+   * the correct records are returned
+   */
+  public void testConsumeFilesFromTwoBranches() throws Exception {
+    sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
+    Table table = 
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));
+
+    String branch1 = "b1";
+    String branch2 = "b2";
+    table.manageSnapshots().createBranch(branch1).commit();
+    table.manageSnapshots().createBranch(branch2).commit();
+
+    // Produce two snapshots on main branch
+    Row row1B1 = Row.of(1, "b1", "2021-01-01");

Review Comment:
   nit: the style is to use full name for readability. so `row1Branch1`



##########
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java:
##########
@@ -97,6 +97,31 @@ public void clean() {
     super.clean();
   }
 
+  private void insertRows(String partition, String branch, Table table, Row... 
rows)

Review Comment:
   we can consolidate the method below this to reduce code duplication.



##########
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java:
##########
@@ -97,6 +97,31 @@ public void clean() {
     super.clean();
   }
 
+  private void insertRows(String partition, String branch, Table table, Row... 
rows)
+      throws IOException {
+    GenericAppenderHelper appender = new GenericAppenderHelper(table, FORMAT, 
temporaryDirectory);
+
+    GenericRecord gRecord = GenericRecord.create(table.schema());
+    List<Record> records = Lists.newArrayList();
+    for (Row row : rows) {
+      records.add(
+          gRecord.copy(
+              "id", row.getField(0),
+              "data", row.getField(1),
+              "dt", row.getField(2)));
+    }
+
+    if (partition != null) {
+      appender.appendToTable(TestHelpers.Row.of(partition, 0), records);

Review Comment:
   see if `appendToTable` can take the `MAIN_BRANCH`. then we can set the 
branch name outside this if-else block with partition.



##########
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java:
##########
@@ -370,6 +370,91 @@ public void testSpecificSnapshotTimestamp() throws 
Exception {
     }
   }
 
+  @Test
+  public void testReadingFromBranch() throws Exception {
+    String branch = "b1";
+    GenericAppenderHelper dataAppender =
+        new GenericAppenderHelper(tableResource.table(), FileFormat.PARQUET, 
TEMPORARY_FOLDER);
+
+    List<Record> batchBase =
+        RandomGenericData.generate(tableResource.table().schema(), 2, 
randomSeed.incrementAndGet());
+    dataAppender.appendToTable(batchBase);
+
+    // create branch
+    tableResource
+        .table()
+        .manageSnapshots()
+        .createBranch(branch, 
tableResource.table().currentSnapshot().snapshotId())
+        .commit();
+
+    // snapshot1 to branch
+    List<Record> batch1 =
+        RandomGenericData.generate(tableResource.table().schema(), 2, 
randomSeed.incrementAndGet());
+    dataAppender.appendToTable(branch, batch1);
+
+    // snapshot2 to branch
+    List<Record> batch2 =
+        RandomGenericData.generate(tableResource.table().schema(), 2, 
randomSeed.incrementAndGet());
+    dataAppender.appendToTable(branch, batch2);
+
+    List<Record> branchExpectedRecords = Lists.newArrayList();
+    branchExpectedRecords.addAll(batchBase);
+    branchExpectedRecords.addAll(batch1);
+    branchExpectedRecords.addAll(batch2);
+    // reads from branch: it should contain the first snapshot (before the 
branch creation) followed
+    // by the next 2 snapshots added
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            .monitorInterval(Duration.ofMillis(10L))
+            
.startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
+            .useBranch(branch)
+            .build();
+
+    try (CloseableIterator<Row> iter =
+        
createStream(scanContext).executeAndCollect(getClass().getSimpleName())) {
+      List<Row> resultMain = waitForResult(iter, 6);
+      TestHelpers.assertRecords(resultMain, branchExpectedRecords, 
tableResource.table().schema());
+
+      // snapshot3 to branch
+      List<Record> batch3 =
+          RandomGenericData.generate(
+              tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+      dataAppender.appendToTable(branch, batch3);
+
+      List<Record> batch4 =
+          RandomGenericData.generate(
+              tableResource.table().schema(), 2, randomSeed.incrementAndGet());
+      dataAppender.appendToTable(branch, batch4);
+
+      List<Row> result3 = waitForResult(iter, 2);

Review Comment:
   nit: should we interleave this? meaning move it before line 425.



##########
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java:
##########
@@ -205,20 +234,132 @@ public void testConsumeFromBeginning() throws Exception {
   }
 
   @TestTemplate
-  public void testConsumeFilesWithBranch() throws Exception {
+  /**
+   * Insert records on the main branch. Then, insert in a named branch. Reads 
from the main branch
+   * and assert that the only records from main are returned
+   */
+  public void testConsumeFilesFromMainOnlyWithBranch() throws Exception {
     sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
     Table table = 
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));
+
+    // Produce two snapshots on main branch
     Row row1 = Row.of(1, "aaa", "2021-01-01");
     Row row2 = Row.of(2, "bbb", "2021-01-01");
+
     insertRows(table, row1, row2);
+    String branchName = "b1";
+    table.manageSnapshots().createBranch(branchName).commit();
 
-    Assertions.assertThatThrownBy(
-            () ->
-                exec(
-                    "SELECT * FROM %s /*+ OPTIONS('streaming'='true', 
'monitor-interval'='1s', 'branch'='b1')*/",
-                    TABLE))
-        .isInstanceOf(IllegalArgumentException.class)
-        .hasMessage("Cannot scan table using ref b1 configured for streaming 
reader yet");
+    // insert on the 'b1' branch
+    Row row3 = Row.of(3, "ccc", "2021-01-01");
+    Row row4 = Row.of(4, "ddd", "2021-01-01");
+
+    insertRowsInBranch(branchName, table, row3, row4);
+
+    // read from main
+    TableResult result =
+        exec("SELECT * FROM %s /*+ OPTIONS('streaming'='true', 
'monitor-interval'='1s')*/", TABLE);
+
+    try (CloseableIterator<Row> iterator = result.collect()) {
+      // the start snapshot(row2) is exclusive.
+      assertRows(ImmutableList.of(row1, row2), iterator);
+
+      Row row5 = Row.of(5, "eee", "2021-01-01");
+      Row row6 = Row.of(6, "fff", "2021-01-01");
+      insertRows(table, row5, row6);
+      assertRows(ImmutableList.of(row5, row6), iterator);
+
+      Row row7 = Row.of(7, "ggg", "2021-01-01");
+      insertRows(table, row7);
+      assertRows(ImmutableList.of(row7), iterator);
+    }
+    result.getJobClient().ifPresent(JobClient::cancel);
+  }
+
+  @TestTemplate
+  /**
+   * Insert records on the main branch. Then insert record on named branch. 
Then select from the
+   * named branch and assert all the records are returned

Review Comment:
   comments need to be updated



##########
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java:
##########
@@ -205,20 +234,132 @@ public void testConsumeFromBeginning() throws Exception {
   }
 
   @TestTemplate
-  public void testConsumeFilesWithBranch() throws Exception {
+  /**
+   * Insert records on the main branch. Then, insert in a named branch. Reads 
from the main branch
+   * and assert that the only records from main are returned
+   */
+  public void testConsumeFilesFromMainOnlyWithBranch() throws Exception {

Review Comment:
   current name is a bit awkward. what about `ConsumeFilesFromMainBranch`



##########
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java:
##########
@@ -205,20 +234,132 @@ public void testConsumeFromBeginning() throws Exception {
   }
 
   @TestTemplate
-  public void testConsumeFilesWithBranch() throws Exception {
+  /**
+   * Insert records on the main branch. Then, insert in a named branch. Reads 
from the main branch
+   * and assert that the only records from main are returned
+   */
+  public void testConsumeFilesFromMainOnlyWithBranch() throws Exception {
     sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
     Table table = 
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));
+
+    // Produce two snapshots on main branch
     Row row1 = Row.of(1, "aaa", "2021-01-01");
     Row row2 = Row.of(2, "bbb", "2021-01-01");
+
     insertRows(table, row1, row2);
+    String branchName = "b1";
+    table.manageSnapshots().createBranch(branchName).commit();
 
-    Assertions.assertThatThrownBy(
-            () ->
-                exec(
-                    "SELECT * FROM %s /*+ OPTIONS('streaming'='true', 
'monitor-interval'='1s', 'branch'='b1')*/",
-                    TABLE))
-        .isInstanceOf(IllegalArgumentException.class)
-        .hasMessage("Cannot scan table using ref b1 configured for streaming 
reader yet");
+    // insert on the 'b1' branch
+    Row row3 = Row.of(3, "ccc", "2021-01-01");
+    Row row4 = Row.of(4, "ddd", "2021-01-01");
+
+    insertRowsInBranch(branchName, table, row3, row4);
+
+    // read from main
+    TableResult result =
+        exec("SELECT * FROM %s /*+ OPTIONS('streaming'='true', 
'monitor-interval'='1s')*/", TABLE);
+
+    try (CloseableIterator<Row> iterator = result.collect()) {
+      // the start snapshot(row2) is exclusive.
+      assertRows(ImmutableList.of(row1, row2), iterator);
+
+      Row row5 = Row.of(5, "eee", "2021-01-01");
+      Row row6 = Row.of(6, "fff", "2021-01-01");
+      insertRows(table, row5, row6);
+      assertRows(ImmutableList.of(row5, row6), iterator);
+
+      Row row7 = Row.of(7, "ggg", "2021-01-01");
+      insertRows(table, row7);
+      assertRows(ImmutableList.of(row7), iterator);
+    }
+    result.getJobClient().ifPresent(JobClient::cancel);
+  }
+
+  @TestTemplate
+  /**
+   * Insert records on the main branch. Then insert record on named branch. 
Then select from the
+   * named branch and assert all the records are returned
+   */
+  public void testConsumeFilesFromBranch() throws Exception {
+    sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
+    Table table = 
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));
+
+    // Produce two snapshots on main branch
+    Row row1 = Row.of(1, "aaa", "2021-01-01");
+    Row row2 = Row.of(2, "bbb", "2021-01-01");
+
+    insertRows(table, row1, row2);
+    String branchName = "b1";
+    table.manageSnapshots().createBranch(branchName).commit();
+
+    // read from main
+    TableResult result =
+        exec(
+            "SELECT * FROM %s  /*+ OPTIONS('streaming'='true', 
'monitor-interval'='1s', 'branch'='%s')*/ ",
+            TABLE, branchName);
+
+    try (CloseableIterator<Row> iterator = result.collect()) {
+      assertRows(ImmutableList.of(row1, row2), iterator);
+      // insert on the 'b1' branch
+      Row row3 = Row.of(3, "ccc", "2021-01-01");
+      Row row4 = Row.of(4, "ddd", "2021-01-01");
+      insertRowsInBranch(branchName, table, row3, row4);
+      assertRows(ImmutableList.of(row3, row4), iterator);
+    }
+    result.getJobClient().ifPresent(JobClient::cancel);
+  }
+
+  @TestTemplate
+  /**
+   * Insert records on branch b1. Then insert record on b2. Then select from 
each branch and assert
+   * the correct records are returned
+   */
+  public void testConsumeFilesFromTwoBranches() throws Exception {
+    sql("CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR)", TABLE);
+    Table table = 
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE));
+
+    String branch1 = "b1";
+    String branch2 = "b2";
+    table.manageSnapshots().createBranch(branch1).commit();
+    table.manageSnapshots().createBranch(branch2).commit();
+
+    // Produce two snapshots on main branch
+    Row row1B1 = Row.of(1, "b1", "2021-01-01");
+    Row row2B1 = Row.of(2, "b1", "2021-01-01");
+
+    Row row1B2 = Row.of(2, "b2", "2021-01-01");
+    Row row2B2 = Row.of(3, "b3", "2021-01-01");
+
+    insertRowsInBranch(branch1, table, row1B1, row2B1);
+    insertRowsInBranch(branch2, table, row1B2, row2B2);
+
+    // read from main

Review Comment:
   need to be updated or better removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to