shubham19may commented on code in PR #14499:
URL: https://github.com/apache/iceberg/pull/14499#discussion_r2498351892
##########
arrow/src/main/java/org/apache/iceberg/arrow/ArrowAllocation.java:
##########
@@ -21,14 +21,29 @@
import org.apache.arrow.memory.RootAllocator;
public class ArrowAllocation {
+ private static final String ALLOCATION_MANAGER_TYPE_PROPERTY =
+ "arrow.memory.allocation.manager.type";
+
static {
- ROOT_ALLOCATOR = new RootAllocator(Long.MAX_VALUE);
+ if (System.getProperty(ALLOCATION_MANAGER_TYPE_PROPERTY) == null) {
+ System.setProperty(ALLOCATION_MANAGER_TYPE_PROPERTY, "Netty");
+ }
Review Comment:
Done.
##########
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java:
##########
@@ -227,6 +228,69 @@ protected WriteSupport<InternalRow>
getWriteSupport(Configuration configuration)
}
}
+ @Test
+ public void testTimestampMillisProducedBySparkIsReadCorrectly() throws
IOException {
+ String outputFilePath =
+ String.format("%s/%s", temp.toAbsolutePath(),
"parquet_timestamp_millis.parquet");
+ HadoopOutputFile outputFile =
+ HadoopOutputFile.fromPath(
+ new org.apache.hadoop.fs.Path(outputFilePath), new
Configuration());
+
+ Schema schema = new Schema(required(1, "event_time",
Types.TimestampType.withZone()));
+
+ StructType sparkSchema =
+ new StructType(
+ new StructField[] {
+ new StructField("event_time", DataTypes.TimestampType, false,
Metadata.empty())
+ });
+
+ List<InternalRow> originalRows =
Lists.newArrayList(RandomData.generateSpark(schema, 10, 0L));
+ List<InternalRow> rows = Lists.newArrayList();
+ for (InternalRow row : originalRows) {
+ long timestampMicros = row.getLong(0);
+ long timestampMillis = (timestampMicros / 1000) * 1000;
+ rows.add(
+ new org.apache.spark.sql.catalyst.expressions.GenericInternalRow(
+ new Object[] {timestampMillis}));
+ }
+
+ try (ParquetWriter<InternalRow> writer =
+ new NativeSparkWriterBuilder(outputFile)
+ .set("org.apache.spark.sql.parquet.row.attributes",
sparkSchema.json())
+ .set("spark.sql.parquet.writeLegacyFormat", "false")
+ .set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MILLIS")
+ .set("spark.sql.parquet.fieldId.write.enabled", "true")
+ .build()) {
+ for (InternalRow row : rows) {
+ writer.write(row);
+ }
+ }
+
+ InputFile parquetInputFile = Files.localInput(outputFilePath);
+ Table timestampMillisTable = tableFromInputFile(parquetInputFile, schema);
+
+ int totalRowsRead = 0;
+ try (VectorizedTableScanIterable vectorizedReader =
+ new VectorizedTableScanIterable(timestampMillisTable.newScan(), 1024,
false)) {
+
+ for (org.apache.iceberg.arrow.vectorized.ColumnarBatch batch :
vectorizedReader) {
+ org.apache.arrow.vector.VectorSchemaRoot root =
batch.createVectorSchemaRootFromVectors();
+
+ org.apache.arrow.vector.FieldVector eventTimeVector =
root.getVector("event_time");
+ assertThat(eventTimeVector).isNotNull();
+
assertThat(eventTimeVector).isInstanceOf(org.apache.arrow.vector.BigIntVector.class);
Review Comment:
done.
also, in VectorizedParquetDefinitionLevelReader.java, arrow validity buffer
was not being properly set for non-null values. (bit should be 1 when Value is
not null). Fixed that.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]