amogh-jahagirdar commented on code in PR #12928: URL: https://github.com/apache/iceberg/pull/12928#discussion_r2066591870
########## arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java: ########## @@ -567,6 +606,146 @@ public void close() { } } + private static final class RowIdVectorReader extends VectorizedArrowReader { + private static final Field ROW_ID_ARROW_FIELD = ArrowSchemaUtil.convert(MetadataColumns.ROW_ID); + + private final long firstRowId; + private final VectorizedReader<VectorHolder> idReader; + private final VectorizedReader<VectorHolder> posReader; + private NullabilityHolder nulls; + + private RowIdVectorReader(long firstRowId, VectorizedArrowReader idReader) { + this.firstRowId = firstRowId; + this.idReader = idReader; + this.posReader = new PositionVectorReader(true); + } + + @Override + public VectorHolder read(VectorHolder reuse, int numValsToRead) { + FieldVector positions = posReader.read(null, numValsToRead).vector(); + VectorHolder ids = idReader.read(null, numValsToRead); + BigIntVector vec = newVector(numValsToRead); + ArrowBuf dataBuffer = vec.getDataBuffer(); + for (int i = 0; i < numValsToRead; i += 1) { + long bufferOffset = (long) i * Long.BYTES; + if (ids.nullabilityHolder().isNullAt(i) == 1) { + long rowId = firstRowId + (Long) positions.getObject(i); + dataBuffer.setLong(bufferOffset, rowId); + } else { + long materializedRowId = ids.vector().getDataBuffer().getLong(bufferOffset); + dataBuffer.setLong(bufferOffset, materializedRowId); + } + } + + vec.setValueCount(numValsToRead); + + return new VectorHolder.RowIdVectorHolder(vec, MetadataColumns.ROW_ID, nulls); + } + + @Override + public void setRowGroupInfo( + PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata) { + idReader.setRowGroupInfo(source, metadata); + posReader.setRowGroupInfo(source, metadata); + } + + @Override + public void setBatchSize(int batchSize) { + if (nulls == null || nulls.size() < batchSize) { + this.nulls = newNullabilityHolder(batchSize); + } + + idReader.setBatchSize(batchSize); + posReader.setBatchSize(batchSize); + } + + @Override + public void close() { + // don't close vectors as they are not owned by readers + } + + private static BigIntVector newVector(int valueCount) { + BigIntVector vector = + (BigIntVector) ROW_ID_ARROW_FIELD.createVector(ArrowAllocation.rootAllocator()); + vector.allocateNew(valueCount); + return vector; + } + + private static NullabilityHolder newNullabilityHolder(int size) { + NullabilityHolder nullabilityHolder = new NullabilityHolder(size); + nullabilityHolder.setNotNulls(0, size); + return nullabilityHolder; + } + } + + private static final class LastUpdatedSeqVectorReader extends VectorizedArrowReader { + private static final Field LAST_UPDATED_SEQ = + ArrowSchemaUtil.convert(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER); + + private final long lastUpdatedSeq; + private final VectorizedReader<VectorHolder> seqReader; + private NullabilityHolder nulls; + + private LastUpdatedSeqVectorReader( + long lastUpdatedSeq, VectorizedReader<VectorHolder> seqReader) { + this.lastUpdatedSeq = lastUpdatedSeq; + this.seqReader = seqReader == null ? nulls() : seqReader; + } + + @Override + public VectorHolder read(VectorHolder reuse, int numValsToRead) { + VectorHolder seqNumbers = seqReader.read(null, numValsToRead); + BigIntVector vec = newVector(numValsToRead); + ArrowBuf dataBuffer = vec.getDataBuffer(); + for (int i = 0; i < numValsToRead; i += 1) { + long bufferOffset = (long) i * Long.BYTES; + if (seqNumbers.nullabilityHolder().isNullAt(i) == 1) { + dataBuffer.setLong(bufferOffset, lastUpdatedSeq); + } else { + long materializedRowId = seqNumbers.vector().getDataBuffer().getLong(bufferOffset); + dataBuffer.setLong(bufferOffset, materializedRowId); + } + } + + vec.setValueCount(numValsToRead); + + return new VectorHolder.RowIdVectorHolder(vec, MetadataColumns.ROW_ID, nulls); + } + + @Override + public void setRowGroupInfo( + PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata) { + seqReader.setRowGroupInfo(source, metadata); + } + + @Override + public void setBatchSize(int batchSize) { + if (nulls == null || nulls.size() < batchSize) { + this.nulls = newNullabilityHolder(batchSize); + } + + seqReader.setBatchSize(batchSize); + } + + @Override + public void close() { + // don't close vectors as they are not owned by readers + } + + private static BigIntVector newVector(int valueCount) { + BigIntVector vector = + (BigIntVector) LAST_UPDATED_SEQ.createVector(ArrowAllocation.rootAllocator()); + vector.allocateNew(valueCount); + return vector; + } + + private static NullabilityHolder newNullabilityHolder(int size) { + NullabilityHolder nullabilityHolder = new NullabilityHolder(size); + nullabilityHolder.setNotNulls(0, size); + return nullabilityHolder; + } Review Comment: This doesn't need to be duplicated, move it out -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org