stevenzwu commented on code in PR #16408:
URL: https://github.com/apache/iceberg/pull/16408#discussion_r3344395954
##########
core/src/main/java/org/apache/iceberg/ManifestInfoStruct.java:
##########
@@ -339,7 +339,7 @@ Builder dv(ByteBuffer buffer) {
Builder dvCardinality(long cardinality) {
Preconditions.checkArgument(
- cardinality > 0, "Invalid DV cardinality: %s (must be positive)",
cardinality);
+ cardinality >= 0, "Invalid DV cardinality: %s (must be >= 0)",
cardinality);
Review Comment:
I am actually wondering if it needs to be strictly positive. Does it make
much sense to add an inline manifest DV with zero bits set? just leave the dv
and dvCardinality as null in this case.
##########
core/src/main/java/org/apache/iceberg/ManifestInfoStruct.java:
##########
@@ -252,121 +252,140 @@ public String toString() {
.add("replaced_rows_count", replacedRowsCount)
.add("min_sequence_number", minSequenceNumber)
.add("dv", dv == null ? "null" : "(binary)")
- .add("dv_cardinality", dvCardinality == null ? "null" : dvCardinality)
+ .add("dv_cardinality", dvCardinality)
.toString();
}
static class Builder {
- private int addedFilesCount = -1;
- private int existingFilesCount = -1;
- private int deletedFilesCount = -1;
- private int replacedFilesCount = -1;
- private long addedRowsCount = -1L;
- private long existingRowsCount = -1L;
- private long deletedRowsCount = -1L;
- private long replacedRowsCount = -1L;
- private long minSequenceNumber = -1L;
+ private Integer addedFilesCount = null;
+ private Integer existingFilesCount = null;
+ private Integer deletedFilesCount = null;
+ private Integer replacedFilesCount = null;
+ private Long addedRowsCount = null;
+ private Long existingRowsCount = null;
+ private Long deletedRowsCount = null;
+ private Long replacedRowsCount = null;
+ private Long minSequenceNumber = null;
private byte[] dv = null;
private Long dvCardinality = null;
Builder addedFilesCount(int count) {
+ Preconditions.checkArgument(
+ count >= 0, "Invalid added files count: %s (must be >= 0)", count);
this.addedFilesCount = count;
return this;
}
Builder existingFilesCount(int count) {
+ Preconditions.checkArgument(
+ count >= 0, "Invalid existing files count: %s (must be >= 0)",
count);
this.existingFilesCount = count;
return this;
}
Builder deletedFilesCount(int count) {
+ Preconditions.checkArgument(
+ count >= 0, "Invalid deleted files count: %s (must be >= 0)", count);
this.deletedFilesCount = count;
return this;
}
Builder replacedFilesCount(int count) {
+ Preconditions.checkArgument(
+ count >= 0, "Invalid replaced files count: %s (must be >= 0)",
count);
this.replacedFilesCount = count;
return this;
}
Builder addedRowsCount(long count) {
+ Preconditions.checkArgument(count >= 0, "Invalid added rows count: %s
(must be >= 0)", count);
this.addedRowsCount = count;
return this;
}
Builder existingRowsCount(long count) {
+ Preconditions.checkArgument(
+ count >= 0, "Invalid existing rows count: %s (must be >= 0)", count);
this.existingRowsCount = count;
return this;
}
Builder deletedRowsCount(long count) {
+ Preconditions.checkArgument(
+ count >= 0, "Invalid deleted rows count: %s (must be >= 0)", count);
this.deletedRowsCount = count;
return this;
}
Builder replacedRowsCount(long count) {
+ Preconditions.checkArgument(
+ count >= 0, "Invalid replaced rows count: %s (must be >= 0)", count);
this.replacedRowsCount = count;
return this;
}
Builder minSequenceNumber(long sequenceNumber) {
+ Preconditions.checkArgument(
+ sequenceNumber >= 0, "Invalid min sequence number: %s (must be >=
0)", sequenceNumber);
this.minSequenceNumber = sequenceNumber;
return this;
}
Builder dv(ByteBuffer buffer) {
- this.dv = buffer != null ? ByteBuffers.toByteArray(buffer) : null;
- return this;
- }
-
- Builder dv(byte[] buffer) {
- this.dv = buffer;
+ Preconditions.checkArgument(buffer != null, "Invalid DV: null");
+ this.dv = ByteBuffers.toByteArray(buffer);
return this;
}
- Builder dvCardinality(Long cardinality) {
+ Builder dvCardinality(long cardinality) {
+ Preconditions.checkArgument(
+ cardinality >= 0, "Invalid DV cardinality: %s (must be >= 0)",
cardinality);
this.dvCardinality = cardinality;
return this;
}
ManifestInfoStruct build() {
Preconditions.checkArgument(
- addedFilesCount >= 0, "Invalid added files count: %s (must be >=
0)", addedFilesCount);
+ addedFilesCount != null, "Missing required value: added files
count");
Preconditions.checkArgument(
- existingFilesCount >= 0,
- "Invalid existing files count: %s (must be >= 0)",
- existingFilesCount);
+ existingFilesCount != null, "Missing required value: existing files
count");
Preconditions.checkArgument(
- deletedFilesCount >= 0,
- "Invalid deleted files count: %s (must be >= 0)",
- deletedFilesCount);
+ deletedFilesCount != null, "Missing required value: deleted files
count");
Preconditions.checkArgument(
- replacedFilesCount >= 0,
- "Invalid replaced files count: %s (must be >= 0)",
- replacedFilesCount);
+ replacedFilesCount != null, "Missing required value: replaced files
count");
+ Preconditions.checkArgument(
+ addedRowsCount != null, "Missing required value: added rows count");
+ Preconditions.checkArgument(
+ existingRowsCount != null, "Missing required value: existing rows
count");
+ Preconditions.checkArgument(
+ deletedRowsCount != null, "Missing required value: deleted rows
count");
Preconditions.checkArgument(
- addedRowsCount >= 0, "Invalid added rows count: %s (must be >= 0)",
addedRowsCount);
+ replacedRowsCount != null, "Missing required value: replaced rows
count");
Preconditions.checkArgument(
- existingRowsCount >= 0,
- "Invalid existing rows count: %s (must be >= 0)",
- existingRowsCount);
+ minSequenceNumber != null, "Missing required value: min sequence
number");
Preconditions.checkArgument(
- deletedRowsCount >= 0, "Invalid deleted rows count: %s (must be >=
0)", deletedRowsCount);
+ addedRowsCount == 0 || addedFilesCount > 0,
Review Comment:
this kind of composite validation can allow some invalid combos. e.g.
`addedRowsCount < 0 && addedFilesCount > 0`.
Previous validations are for each individual field e.g. `addedRowsCount >=
0`, which would prevent the above anomaly. But individual validation can also
miss some conditions like `addedRowsCount > 0 && addedFilesCount == 0`.
maybe the condition check should be either both are zero or both are
positive.
```
(addedRowsCount == 0 && addedFilesCount == 0) || (addedRowsCount > 0 &&
addedFilesCount > 0)
```
##########
core/src/main/java/org/apache/iceberg/TrackingBuilder.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.nio.ByteBuffer;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.ByteBuffers;
+
+class TrackingBuilder {
+ private final EntryStatus status;
+ private final Long snapshotId;
+ private final Long dataSequenceNumber;
+ private final Long fileSequenceNumber;
+ private final Long firstRowId;
+ // ID of the snapshot in which the new Tracking instance will be committed.
+ private final long newSnapshotId;
+ private Long dvSnapshotId;
+ private byte[] deletedPositions;
+ private byte[] replacedPositions;
+
+ /**
+ * Creates a builder for a newly added file.
+ *
+ * @param newSnapshotId the snapshot ID in which the new tracking instance
will be committed
+ */
+ static TrackingBuilder added(long newSnapshotId) {
+ return new TrackingBuilder(newSnapshotId);
+ }
+
+ /**
+ * Creates a builder for a tracking row derived from {@code source}.
+ *
+ * @param source source tracking from a manifest entry
+ * @param newSnapshotId the snapshot ID in which the new tracking instance
will be committed
+ */
+ static TrackingBuilder builder(Tracking source, long newSnapshotId) {
+ return new TrackingBuilder(source, newSnapshotId);
+ }
+
+ /**
+ * Returns a DELETED tracking row derived from {@code source}.
+ *
+ * @param source source tracking from a manifest entry
+ * @param newSnapshotId the snapshot ID in which the new tracking instance
will be committed
+ */
+ static Tracking delete(Tracking source, long newSnapshotId) {
+ return terminal(EntryStatus.DELETED, source, newSnapshotId);
+ }
+
+ /**
+ * Returns a REPLACED tracking row derived from {@code source}.
+ *
+ * @param source source tracking from a manifest entry
+ * @param newSnapshotId the snapshot ID in which the new tracking instance
will be committed
+ */
+ static Tracking replace(Tracking source, long newSnapshotId) {
+ return terminal(EntryStatus.REPLACED, source, newSnapshotId);
+ }
+
+ private TrackingBuilder(long newSnapshotId) {
+ this.status = EntryStatus.ADDED;
+ this.snapshotId = newSnapshotId;
+ this.newSnapshotId = newSnapshotId;
+ this.dataSequenceNumber = null;
+ this.fileSequenceNumber = null;
+ this.firstRowId = null;
+ this.dvSnapshotId = null;
+ this.deletedPositions = null;
+ this.replacedPositions = null;
+ }
+
+ private TrackingBuilder(Tracking source, long newSnapshotId) {
+ validateSource(source);
+ checkStatus(source.status(), EntryStatus.EXISTING);
+ this.status = EntryStatus.EXISTING;
+ this.snapshotId = source.snapshotId();
+ this.newSnapshotId = newSnapshotId;
+ this.dataSequenceNumber = source.dataSequenceNumber();
+ this.fileSequenceNumber = source.fileSequenceNumber();
+ this.firstRowId = source.firstRowId();
+ this.dvSnapshotId = source.dvSnapshotId();
+ this.deletedPositions = null;
+ this.replacedPositions = null;
+ }
+
+ /** Indicates that the DV has been updated for the new Tracking. */
+ TrackingBuilder dvUpdated() {
+ // DV applies to data files; deleted/replaced positions apply to manifest
files
+ Preconditions.checkState(
+ deletedPositions == null && replacedPositions == null,
+ "Cannot mark DV updated on a manifest entry (deleted/replaced
positions are set)");
+ this.dvSnapshotId = newSnapshotId;
+ return this;
+ }
+
+ TrackingBuilder deletedPositions(ByteBuffer positions) {
+ Preconditions.checkState(
+ status == EntryStatus.EXISTING, "Cannot set deleted positions on %s
entry", status);
+ // DV applies to data files; deleted positions apply to manifest files
+ Preconditions.checkState(
+ dvSnapshotId == null,
+ "Cannot set deleted positions on a data file entry (DV snapshot ID is
set)");
+ this.deletedPositions = ByteBuffers.toByteArray(positions);
+ return this;
+ }
+
+ TrackingBuilder replacedPositions(ByteBuffer positions) {
+ Preconditions.checkState(
+ status == EntryStatus.EXISTING, "Cannot set replaced positions on %s
entry", status);
+ // DV applies to data files; replaced positions apply to manifest files
+ Preconditions.checkState(
+ dvSnapshotId == null,
+ "Cannot set replaced positions on a data file entry (DV snapshot ID is
set)");
+ this.replacedPositions = ByteBuffers.toByteArray(positions);
+ return this;
+ }
+
+ Tracking build() {
+ return new TrackingStruct(
+ status,
+ snapshotId,
+ dataSequenceNumber,
+ fileSequenceNumber,
+ dvSnapshotId,
+ firstRowId,
+ deletedPositions,
+ replacedPositions);
+ }
+
+ private static Tracking terminal(EntryStatus to, Tracking source, long
newSnapshotId) {
+ validateSource(source);
+ checkStatus(source.status(), to);
+ return new TrackingStruct(
+ to,
+ newSnapshotId,
+ source.dataSequenceNumber(),
+ source.fileSequenceNumber(),
+ source.dvSnapshotId(),
+ source.firstRowId(),
+ null,
+ null);
+ }
+
+ private static void validateSource(Tracking source) {
+ Preconditions.checkArgument(source != null, "Invalid source tracking:
null");
+ Preconditions.checkArgument(
+ source.dataSequenceNumber() != null,
+ "Invalid tracking source: data sequence number is null");
+ Preconditions.checkArgument(
+ source.fileSequenceNumber() != null,
+ "Invalid tracking source: file sequence number is null");
+ }
+
+ private static void checkStatus(EntryStatus from, EntryStatus to) {
Review Comment:
nit: `validateStatusTransition` might be more clear on the purpose
##########
core/src/main/java/org/apache/iceberg/TrackingBuilder.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.nio.ByteBuffer;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.ByteBuffers;
+
+class TrackingBuilder {
+ private final EntryStatus status;
+ private final Long snapshotId;
+ private final Long dataSequenceNumber;
+ private final Long fileSequenceNumber;
+ private final Long firstRowId;
+ // ID of the snapshot in which the new Tracking instance will be committed.
+ private final long newSnapshotId;
+ private Long dvSnapshotId;
+ private byte[] deletedPositions;
+ private byte[] replacedPositions;
+
+ /**
+ * Creates a builder for a newly added file.
+ *
+ * @param snapshotId the snapshot ID in which the new tracking instance will
be committed
+ */
+ static TrackingBuilder added(long snapshotId) {
Review Comment:
I actually didn't realize some (like delete and replace) return `Tracking`
directly. Should we just call this method `builder(long newSnapshotId)`? the
Javadoc can explain it is for builder for a new `added` entry.
@anoopj seems that Ryan was suggesting `deleted` and `replaced` for the
other two methods.
```
static Tracking delete(Tracking source, long newSnapshotId)
static Tracking replace(Tracking source, long newSnapshotId)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]