nastra commented on code in PR #5984:
URL: https://github.com/apache/iceberg/pull/5984#discussion_r1178698478
##########
core/src/test/java/org/apache/iceberg/TestBaseIncrementalAppendScan.java:
##########
@@ -50,6 +51,131 @@ public void testFromSnapshotInclusive() {
Assert.assertEquals(3, Iterables.size(scanWithToSnapshot.planFiles()));
}
+ @Test
+ public void testFromSnapshotInclusiveWithRef() {
+ table.newFastAppend().appendFile(FILE_A).commit();
+ long snapshotAId = table.currentSnapshot().snapshotId();
+
+ String branchName = "b1";
+ String tagSnapshotAName = "t1";
+ table.manageSnapshots().createBranch(branchName, snapshotAId).commit();
+ table.manageSnapshots().createTag(tagSnapshotAName, snapshotAId).commit();
+
+ String tagSnapshotBName = "t2";
+ table.newFastAppend().appendFile(FILE_B).appendFile(FILE_B).commit();
+ long snapshotBId = table.currentSnapshot().snapshotId();
+ table.manageSnapshots().createTag(tagSnapshotBName, snapshotBId).commit();
+ table.newFastAppend().appendFile(FILE_C).appendFile(FILE_C).commit();
+ long snapshotCId = table.currentSnapshot().snapshotId();
Review Comment:
seems to be unused
##########
core/src/main/java/org/apache/iceberg/BaseIncrementalScan.java:
##########
@@ -34,6 +34,17 @@ protected BaseIncrementalScan(Table table, Schema schema,
TableScanContext conte
protected abstract CloseableIterable<T> doPlanFiles(
Long fromSnapshotIdExclusive, long toSnapshotIdInclusive);
+ @Override
+ public ThisT fromSnapshotInclusive(String ref) {
+ SnapshotRef snapshotRef = table().refs().get(ref);
+ Preconditions.checkArgument(snapshotRef != null, "Cannot find ref %s",
ref);
+ Preconditions.checkArgument(snapshotRef.isTag(), "Ref %s is not a tag",
ref);
+
+ TableScanContext newContext =
context().fromSnapshotIdInclusive(snapshotRef.snapshotId());
+
+ return newRefinedScan(table(), schema(), newContext);
Review Comment:
should we rather call `fromSnapshotInclusive(snapshotRef.snapshotId())` here
to be consistent with other methods?
##########
core/src/test/java/org/apache/iceberg/TestBaseIncrementalAppendScan.java:
##########
@@ -100,6 +226,39 @@ public void testToSnapshot() {
Assert.assertEquals(2, Iterables.size(scan.planFiles()));
}
+ @Test
+ public void testToSnapshotWithRef() {
+ table.newFastAppend().appendFile(FILE_A).commit();
+ long snapshotAId = table.currentSnapshot().snapshotId();
Review Comment:
seems to be unused
##########
core/src/test/java/org/apache/iceberg/TestBaseIncrementalAppendScan.java:
##########
@@ -50,6 +51,131 @@ public void testFromSnapshotInclusive() {
Assert.assertEquals(3, Iterables.size(scanWithToSnapshot.planFiles()));
}
+ @Test
+ public void testFromSnapshotInclusiveWithRef() {
+ table.newFastAppend().appendFile(FILE_A).commit();
+ long snapshotAId = table.currentSnapshot().snapshotId();
+
+ String branchName = "b1";
+ String tagSnapshotAName = "t1";
+ table.manageSnapshots().createBranch(branchName, snapshotAId).commit();
+ table.manageSnapshots().createTag(tagSnapshotAName, snapshotAId).commit();
+
+ String tagSnapshotBName = "t2";
+ table.newFastAppend().appendFile(FILE_B).appendFile(FILE_B).commit();
+ long snapshotBId = table.currentSnapshot().snapshotId();
+ table.manageSnapshots().createTag(tagSnapshotBName, snapshotBId).commit();
+ table.newFastAppend().appendFile(FILE_C).appendFile(FILE_C).commit();
+ long snapshotCId = table.currentSnapshot().snapshotId();
+
+ IncrementalAppendScan scan =
newScan().fromSnapshotInclusive(tagSnapshotAName);
+ Assert.assertEquals(5, Iterables.size(scan.planFiles()));
Review Comment:
this is typically preferrable because it will print you the content in case
the assertion ever fails (see below example where I changed the expected size
to 4):
```
Expected size: 4 but was: 5 in:
[BaseFileScanTask{file=/path/to/data-b.parquet,
partition_data=PartitionData{data_bucket=1}, residual=true},
BaseFileScanTask{file=/path/to/data-a.parquet,
partition_data=PartitionData{data_bucket=0}, residual=true},
BaseFileScanTask{file=/path/to/data-c.parquet,
partition_data=PartitionData{data_bucket=2}, residual=true},
BaseFileScanTask{file=/path/to/data-b.parquet,
partition_data=PartitionData{data_bucket=1}, residual=true},
BaseFileScanTask{file=/path/to/data-c.parquet,
partition_data=PartitionData{data_bucket=2}, residual=true}]
```
Would be good to apply this across the newly added tests.
##########
core/src/main/java/org/apache/iceberg/TableScanContext.java:
##########
@@ -110,7 +114,30 @@ TableScanContext useSnapshotId(Long scanSnapshotId) {
toSnapshotId,
planExecutor,
fromSnapshotInclusive,
- metricsReporter);
+ metricsReporter,
+ branch);
+ }
+
+ String branch() {
+ return branch;
+ }
+
+ TableScanContext useBranch(String newBranch) {
Review Comment:
once you rebase, all you need to add to `TableScanContext` is:
```
@Nullable
public abstract String ref();
```
and then a `useRef()` shortcut method if required
```
TableScanContext useRef(String ref) {
return ImmutableTableScanContext.builder().from(this).ref(ref).build();
}
```
##########
core/src/test/java/org/apache/iceberg/TestBaseIncrementalAppendScan.java:
##########
@@ -50,6 +51,131 @@ public void testFromSnapshotInclusive() {
Assert.assertEquals(3, Iterables.size(scanWithToSnapshot.planFiles()));
}
+ @Test
+ public void testFromSnapshotInclusiveWithRef() {
+ table.newFastAppend().appendFile(FILE_A).commit();
+ long snapshotAId = table.currentSnapshot().snapshotId();
+
+ String branchName = "b1";
+ String tagSnapshotAName = "t1";
+ table.manageSnapshots().createBranch(branchName, snapshotAId).commit();
+ table.manageSnapshots().createTag(tagSnapshotAName, snapshotAId).commit();
+
+ String tagSnapshotBName = "t2";
+ table.newFastAppend().appendFile(FILE_B).appendFile(FILE_B).commit();
+ long snapshotBId = table.currentSnapshot().snapshotId();
+ table.manageSnapshots().createTag(tagSnapshotBName, snapshotBId).commit();
+ table.newFastAppend().appendFile(FILE_C).appendFile(FILE_C).commit();
+ long snapshotCId = table.currentSnapshot().snapshotId();
+
+ IncrementalAppendScan scan =
newScan().fromSnapshotInclusive(tagSnapshotAName);
+ Assert.assertEquals(5, Iterables.size(scan.planFiles()));
+
+ IncrementalAppendScan scan3 =
+
newScan().fromSnapshotInclusive(tagSnapshotAName).toSnapshot(tagSnapshotBName);
+ Assert.assertEquals(3, Iterables.size(scan3.planFiles()));
+
+ Assertions.assertThatThrownBy(() ->
newScan().fromSnapshotInclusive(branchName))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(String.format("Ref %s is not a tag", branchName));
+
+ Assertions.assertThatThrownBy(() ->
newScan().fromSnapshotInclusive("notExistTag"))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Cannot find ref");
+ }
+
+ @Test
+ public void testFromSnapshotExclusiveWithRef() {
+ table.newFastAppend().appendFile(FILE_A).commit();
+ long snapshotAId = table.currentSnapshot().snapshotId();
+
+ String branchName = "b1";
+ String tagSnapshotAName = "t1";
+ table.manageSnapshots().createBranch(branchName, snapshotAId).commit();
+ table.manageSnapshots().createTag(tagSnapshotAName, snapshotAId).commit();
+
+ String tagSnapshotBName = "t2";
+ table.newFastAppend().appendFile(FILE_B).appendFile(FILE_B).commit();
+ long snapshotBId = table.currentSnapshot().snapshotId();
+ table.manageSnapshots().createTag(tagSnapshotBName, snapshotBId).commit();
+ table.newFastAppend().appendFile(FILE_C).appendFile(FILE_C).commit();
+ long snapshotCId = table.currentSnapshot().snapshotId();
Review Comment:
seems to be unused
##########
core/src/test/java/org/apache/iceberg/TestBaseIncrementalAppendScan.java:
##########
@@ -50,6 +51,131 @@ public void testFromSnapshotInclusive() {
Assert.assertEquals(3, Iterables.size(scanWithToSnapshot.planFiles()));
}
+ @Test
+ public void testFromSnapshotInclusiveWithRef() {
+ table.newFastAppend().appendFile(FILE_A).commit();
+ long snapshotAId = table.currentSnapshot().snapshotId();
+
+ String branchName = "b1";
+ String tagSnapshotAName = "t1";
+ table.manageSnapshots().createBranch(branchName, snapshotAId).commit();
+ table.manageSnapshots().createTag(tagSnapshotAName, snapshotAId).commit();
+
+ String tagSnapshotBName = "t2";
+ table.newFastAppend().appendFile(FILE_B).appendFile(FILE_B).commit();
+ long snapshotBId = table.currentSnapshot().snapshotId();
+ table.manageSnapshots().createTag(tagSnapshotBName, snapshotBId).commit();
+ table.newFastAppend().appendFile(FILE_C).appendFile(FILE_C).commit();
+ long snapshotCId = table.currentSnapshot().snapshotId();
+
+ IncrementalAppendScan scan =
newScan().fromSnapshotInclusive(tagSnapshotAName);
+ Assert.assertEquals(5, Iterables.size(scan.planFiles()));
Review Comment:
```suggestion
Assertions.assertThat(scan.planFiles()).hasSize(5);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]