Freedomfirebody commented on issue #9431: URL: https://github.com/apache/iceberg/issues/9431#issuecomment-1959092029
I accept but disagree with the answer to question 1, It should support OffsetDateTime, which is its return from response. That is not true about question 2, After trying different schemas, I found that it happens again when adding the day("storage_time") to PartitionSpec. > PartitionSpec spec = PartitionSpec.builderFor(schema) > .day("storage_time") > .identity("id") > .build(); If I remove the day("storage_time") and generate data under the same conditions, the record's response will be accurate. Next, I am tracing the code, and checking the stored data, and I have identified a question, It doesn't include partitions.lower_bound and partitions.upper_bound. I am not sure if there are any issues with my test program, but when I remove day(), It will be ok. These are full code and snap-*.avro file data of my ``` public class Main { public static void main(String[] args) { String warehousePath = "/warehouse"; String path = "table"; Configuration conf = getConf(); TableIdentifier identifier = TableIdentifier.of(Namespace.empty(), path); try (HadoopCatalog hadoopCatalog = new HadoopCatalog(conf, warehousePath)) { System.out.println(hadoopCatalog.listTables(Namespace.of("."))); // create table create(hadoopCatalog, identifier); // generate data gen(hadoopCatalog, identifier); // read data read(hadoopCatalog, identifier); // drop table drop(hadoopCatalog, identifier); } catch (IOException e) { throw new RuntimeException(e); } } private static Configuration getConf() { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "s3a://"); conf.set("fs.s3a.endpoint", "http://localhost:9000/"); conf.set("fs.s3a.access.key", ""); conf.set("fs.s3a.secret.key", ""); conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"); conf.set("fs.s3a.path.style.access", "true"); conf.set("fs.s3a.connection.ssl.enabled", "false"); return conf; } public static void drop(HadoopCatalog hadoopCatalog, TableIdentifier identifier) { hadoopCatalog.dropTable(identifier); } public static void create(HadoopCatalog hadoopCatalog, TableIdentifier identifier) { Schema schema = new Schema( Types.NestedField.required(1, "id", Types.IntegerType.get()), Types.NestedField.required(2, "storage_time", Types.TimestampType.withZone()), Types.NestedField.required(3, "message", Types.StringType.get()), Types.NestedField.optional(4, "call_stack", Types.ListType.ofRequired(5, Types.StringType.get())) ); PartitionSpec spec = PartitionSpec.builderFor(schema) .bucket("storage_time", 10) .day("storage_time") .build(); Map<String, String> properties = new HashMap<>(); Table table = hadoopCatalog.createTable(identifier, schema, spec, properties); } public static void read(HadoopCatalog hadoopCatalog, TableIdentifier identifier) { Table table = hadoopCatalog.loadTable(identifier); try (CloseableIterable<Record> result = IcebergGenerics.read(table) .where(Expressions.lessThan("storage_time", OffsetDateTime.of(LocalDateTime.now(), ZoneOffset.of("+7")).toString())) .build()) { for (Record r : result) { System.out.println("========================="); System.out.println(r); } System.out.println("========================="); } catch (IOException e) { throw new RuntimeException(e); } } public static void gen(HadoopCatalog hadoopCatalog, TableIdentifier identifier) { Table table = hadoopCatalog.loadTable(identifier); String filepath = table.location() + "/" + UUID.randomUUID(); try (FileIO io = table.io()) { OutputFile file = io.newOutputFile(filepath); final PartitionKey partitionKey = new PartitionKey(table.spec(), table.spec().schema()); DataWriter<GenericRecord> dataWriter = Parquet.writeData(file) .schema(table.schema()) .withSpec(table.spec()) .withPartition(partitionKey) .createWriterFunc(GenericParquetWriter::buildWriter) .build(); for (int i = 0, sized = i + 10, day = sized / 3; i < sized; i++) { ArrayList<String> list = new ArrayList<>(); for (int ind = 0, size = i % 3; ind < size; ind++) { list.add(String.valueOf(ind)); } GenericRecord record = GenericRecord.create(table.schema()); record.setField("id", i); record.setField("storage_time", OffsetDateTime.of(LocalDateTime.now().plusDays(i - day), ZoneOffset.of("+7"))); record.setField("message", String.valueOf(i)); record.setField("call_stack", list); dataWriter.write(record); } dataWriter.close(); DataFile dataFile = dataWriter.toDataFile(); table.newAppend().appendFile(dataFile).commit(); table.refresh(); } catch (IOException e) { throw new RuntimeException(e); } } } ``` avro data: ``` [ { "manifest_path": "/warehouse/table/metadata/332a49c4-c8b3-4e0e-9c61-d04d59438255-m0.avro", "manifest_length": 6468, "partition_spec_id": 0, "added_snapshot_id": 7055246188063471896, "added_data_files_count": 1, "existing_data_files_count": 0, "deleted_data_files_count": 0, "partitions": [ { "contains_null": true, "contains_nan": false, "lower_bound": null, "upper_bound": null }, { "contains_null": true, "contains_nan": false, "lower_bound": null, "upper_bound": null } ], "added_rows_count": 10, "existing_rows_count": 0, "deleted_rows_count": 0 } ] ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org