amogh-jahagirdar commented on code in PR #12928: URL: https://github.com/apache/iceberg/pull/12928#discussion_r2106437659
########## arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java: ########## @@ -567,6 +608,164 @@ public void close() { } } + private static final class RowIdVectorReader extends VectorizedArrowReader { + private static final Field ROW_ID_ARROW_FIELD = ArrowSchemaUtil.convert(MetadataColumns.ROW_ID); + + private final long firstRowId; + private final VectorizedReader<VectorHolder> idReader; + private final VectorizedReader<VectorHolder> posReader; + private NullabilityHolder nulls; + + private RowIdVectorReader(long firstRowId, VectorizedArrowReader idReader) { + this.firstRowId = firstRowId; + this.idReader = idReader != null ? idReader : nulls(); + this.posReader = new PositionVectorReader(true); + } + + @Override + public VectorHolder read(VectorHolder reuse, int numValsToRead) { + FieldVector positions = null; + FieldVector ids = null; + + try { + positions = posReader.read(null, numValsToRead).vector(); + VectorHolder idsHolder = idReader.read(null, numValsToRead); + ids = idsHolder.vector(); + ArrowVectorAccessor<?, String, ?, ?> idsAccessor = + ids == null ? null : ArrowVectorAccessors.getVectorAccessor(idsHolder); + + BigIntVector rowIds = allocateBigIntVector(ROW_ID_ARROW_FIELD, numValsToRead); + ArrowBuf dataBuffer = rowIds.getDataBuffer(); + for (int i = 0; i < numValsToRead; i += 1) { + long bufferOffset = (long) i * Long.BYTES; + if (idsAccessor == null || isNull(idsHolder, i)) { + long rowId = firstRowId + (Long) positions.getObject(i); + dataBuffer.setLong(bufferOffset, rowId); + } else { + long materializedRowId = idsAccessor.getLong(i); + dataBuffer.setLong(bufferOffset, materializedRowId); + } + } + + rowIds.setValueCount(numValsToRead); + return VectorHolder.vectorHolder(rowIds, MetadataColumns.ROW_ID, nulls); + } finally { + if (positions != null) { + positions.close(); + } + + if (ids != null) { + ids.close(); + } + } + } + + @Override + public void setRowGroupInfo( + PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata) { + idReader.setRowGroupInfo(source, metadata); + posReader.setRowGroupInfo(source, metadata); + } + + @Override + public void setBatchSize(int batchSize) { + if (nulls == null || nulls.size() < batchSize) { + this.nulls = newNullabilityHolder(batchSize); + } + + idReader.setBatchSize(batchSize); + posReader.setBatchSize(batchSize); + } + + @Override + public void close() { + // don't close vectors as they are not owned by readers Review Comment: I made this comment a bit more specific to say the "result vectors" since I previously copy/pasted from the other reader but I think the intent is just to be for the result vectors, lmk what you think. I believe we can and should safely close the intermediate vectors used for calculations on the inheritane path. For instance I think for the position vectors used for calculating row IDs in case it's null or the underlying materialized row id reader, those can safely be closed after reading a batch since those vectors are scoped to the `read`. After that, they don't need to be used externally and I think it makes sense to close them as soon as we know it's not needed anymore. I think the part that cannot be closed are the vectors containing the contents of the results of the reader (e.g. the `BigIntVector rowIds = allocateBigIntVector(...)` since then we'd be freeing contents before external readers could use them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org