amogh-jahagirdar commented on code in PR #6571: URL: https://github.com/apache/iceberg/pull/6571#discussion_r1095384483
########## docs/java-api.md: ########## @@ -147,6 +147,69 @@ t.newAppend().appendFile(data).commit(); t.commitTransaction(); ``` +### WriteData Review Comment: WriteData -> Writing Data ########## data/src/test/java/org/apache/iceberg/data/TestGenericTaskWriter.java: ########## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import java.io.File; +import java.io.IOException; +import java.time.OffsetDateTime; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.commons.compress.utils.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.*; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.StructLikeSet; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestGenericTaskWriter { + @Rule public final TemporaryFolder temp = new TemporaryFolder(); + + private final Schema SCHEMA = + new Schema( + Types.NestedField.required(1, "level", Types.StringType.get()), + Types.NestedField.required(2, "event_time", Types.TimestampType.withZone()), + Types.NestedField.required(3, "message", Types.StringType.get()), + Types.NestedField.optional( + 4, "call_stack", Types.ListType.ofRequired(5, Types.StringType.get()))); + + private final PartitionSpec SPEC = + PartitionSpec.builderFor(SCHEMA).hour("event_time").identity("level").build(); + private Table table; + private List<Record> testRecords = Lists.newArrayList(); + private final String testFileFormat; + + @Parameterized.Parameters(name = "FileFormat = {0}") + public static Object[][] parameters() { + return new Object[][] {{"avro"}, {"orc"}, {"parquet"}}; + } + + public TestGenericTaskWriter(String fileFormat) throws IOException { + this.testFileFormat = fileFormat; + } + + @Before + public void createTable() throws IOException { + File testWareHouse = temp.newFolder(); + if (testWareHouse.exists()) { + Assert.assertTrue(testWareHouse.delete()); + } Review Comment: Nit style: newlines after if/else blocks ########## data/src/test/java/org/apache/iceberg/data/TestGenericTaskWriter.java: ########## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import java.io.File; +import java.io.IOException; +import java.time.OffsetDateTime; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.commons.compress.utils.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.*; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.StructLikeSet; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestGenericTaskWriter { + @Rule public final TemporaryFolder temp = new TemporaryFolder(); + + private final Schema SCHEMA = + new Schema( + Types.NestedField.required(1, "level", Types.StringType.get()), + Types.NestedField.required(2, "event_time", Types.TimestampType.withZone()), + Types.NestedField.required(3, "message", Types.StringType.get()), + Types.NestedField.optional( + 4, "call_stack", Types.ListType.ofRequired(5, Types.StringType.get()))); + + private final PartitionSpec SPEC = + PartitionSpec.builderFor(SCHEMA).hour("event_time").identity("level").build(); + private Table table; + private List<Record> testRecords = Lists.newArrayList(); + private final String testFileFormat; + + @Parameterized.Parameters(name = "FileFormat = {0}") + public static Object[][] parameters() { + return new Object[][] {{"avro"}, {"orc"}, {"parquet"}}; + } + + public TestGenericTaskWriter(String fileFormat) throws IOException { + this.testFileFormat = fileFormat; + } + + @Before + public void createTable() throws IOException { + File testWareHouse = temp.newFolder(); + if (testWareHouse.exists()) { + Assert.assertTrue(testWareHouse.delete()); + } + Catalog catalog = new HadoopCatalog(new Configuration(), testWareHouse.getPath()); + this.table = + catalog.createTable( + TableIdentifier.of("logging", "logs"), + SCHEMA, + SPEC, + Collections.singletonMap("write.format.default", testFileFormat)); + } + + // test write and java API write data code demo + private void writeRecords() throws IOException { + GenericAppenderFactory appenderFactory = + new GenericAppenderFactory(table.schema(), table.spec()); + int partitionId = 1, taskId = 1; + FileFormat fileFormat = + FileFormat.valueOf( + table.properties().getOrDefault("write.format.default", "parquet").toUpperCase()); + OutputFileFactory outputFileFactory = + OutputFileFactory.builderFor(table, partitionId, taskId).format(fileFormat).build(); + // TaskWriter write records into file. (the same is ok for unpartition table) + long targetFileSizeInBytes = 50L * 1024 * 1024; + GenericTaskWriter<Record> genericTaskWriter = + new GenericTaskWriter( + table.spec(), + fileFormat, + appenderFactory, + outputFileFactory, + table.io(), + targetFileSizeInBytes); + + GenericRecord genericRecord = GenericRecord.create(table.schema()); + // assume write 1000 records + for (int i = 0; i < 1000; i++) { + GenericRecord record = genericRecord.copy(); + record.setField("level", i % 6 == 0 ? "error" : "info"); + record.setField("event_time", OffsetDateTime.now()); + record.setField("message", "Iceberg is a great table format"); + record.setField("call_stack", Collections.singletonList("NullPointerException")); + genericTaskWriter.write(record); + // just for test, remove from doc code + this.testRecords.add(record); + } + // after the data file is written above, + // the written data file is submitted to the metadata of the table through Table API. + AppendFiles appendFiles = table.newAppend(); + for (DataFile dataFile : genericTaskWriter.dataFiles()) { + appendFiles.appendFile(dataFile); + } + // submit data file. Review Comment: Same as above, I don't think we need this inline comment. ########## data/src/test/java/org/apache/iceberg/data/TestGenericTaskWriter.java: ########## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import java.io.File; +import java.io.IOException; +import java.time.OffsetDateTime; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.commons.compress.utils.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.*; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.StructLikeSet; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestGenericTaskWriter { + @Rule public final TemporaryFolder temp = new TemporaryFolder(); + + private final Schema SCHEMA = + new Schema( + Types.NestedField.required(1, "level", Types.StringType.get()), + Types.NestedField.required(2, "event_time", Types.TimestampType.withZone()), + Types.NestedField.required(3, "message", Types.StringType.get()), + Types.NestedField.optional( + 4, "call_stack", Types.ListType.ofRequired(5, Types.StringType.get()))); + + private final PartitionSpec SPEC = + PartitionSpec.builderFor(SCHEMA).hour("event_time").identity("level").build(); + private Table table; + private List<Record> testRecords = Lists.newArrayList(); + private final String testFileFormat; + + @Parameterized.Parameters(name = "FileFormat = {0}") + public static Object[][] parameters() { + return new Object[][] {{"avro"}, {"orc"}, {"parquet"}}; + } + + public TestGenericTaskWriter(String fileFormat) throws IOException { + this.testFileFormat = fileFormat; + } + + @Before + public void createTable() throws IOException { + File testWareHouse = temp.newFolder(); + if (testWareHouse.exists()) { + Assert.assertTrue(testWareHouse.delete()); + } + Catalog catalog = new HadoopCatalog(new Configuration(), testWareHouse.getPath()); + this.table = + catalog.createTable( + TableIdentifier.of("logging", "logs"), + SCHEMA, + SPEC, + Collections.singletonMap("write.format.default", testFileFormat)); + } + + // test write and java API write data code demo + private void writeRecords() throws IOException { + GenericAppenderFactory appenderFactory = + new GenericAppenderFactory(table.schema(), table.spec()); + int partitionId = 1, taskId = 1; + FileFormat fileFormat = + FileFormat.valueOf( + table.properties().getOrDefault("write.format.default", "parquet").toUpperCase()); + OutputFileFactory outputFileFactory = + OutputFileFactory.builderFor(table, partitionId, taskId).format(fileFormat).build(); + // TaskWriter write records into file. (the same is ok for unpartition table) + long targetFileSizeInBytes = 50L * 1024 * 1024; + GenericTaskWriter<Record> genericTaskWriter = + new GenericTaskWriter( + table.spec(), + fileFormat, + appenderFactory, + outputFileFactory, + table.io(), + targetFileSizeInBytes); + + GenericRecord genericRecord = GenericRecord.create(table.schema()); + // assume write 1000 records + for (int i = 0; i < 1000; i++) { + GenericRecord record = genericRecord.copy(); + record.setField("level", i % 6 == 0 ? "error" : "info"); + record.setField("event_time", OffsetDateTime.now()); + record.setField("message", "Iceberg is a great table format"); + record.setField("call_stack", Collections.singletonList("NullPointerException")); + genericTaskWriter.write(record); + // just for test, remove from doc code Review Comment: IMO I don't think we need this inline comment ########## docs/java-api.md: ########## @@ -147,6 +147,69 @@ t.newAppend().appendFile(data).commit(); t.commitTransaction(); ``` +### WriteData + +The java API can write data into iceberg table. + +First write data to the data file, then submit the data file, the data you write will take effect in the table. + +For example, add 1000 rows of data to the table. Review Comment: I think we can just say "The following example will demonstrate how to write data files into Iceberg tables". The other details can just be inline comments imo. ########## docs/java-api.md: ########## @@ -147,6 +147,69 @@ t.newAppend().appendFile(data).commit(); t.commitTransaction(); ``` +### WriteData + +The java API can write data into iceberg table. + +First write data to the data file, then submit the data file, the data you write will take effect in the table. + +For example, add 1000 rows of data to the table. + +The structure of this table is the same as the demo table in java-api-quickstart + +```java +/** + * Schema schema = new Schema( + * Types.NestedField.required(1, "level", Types.StringType.get()), + * Types.NestedField.required(2, "event_time", Types.TimestampType.withZone()), + * Types.NestedField.required(3, "message", Types.StringType.get()), + * Types.NestedField.optional(4, "call_stack", Types.ListType.ofRequired(5, Types.StringType.get())) + * ); + * PartitionSpec spec = PartitionSpec.builderFor(schema) + * .hour("event_time") + * .identity("level") + * .build(); + */ + +GenericAppenderFactory appenderFactory = + new GenericAppenderFactory(table.schema(), table.spec()); +int partitionId = 1, taskId = 1; +FileFormat fileFormat = + FileFormat.valueOf( + table.properties().getOrDefault("write.format.default", "parquet").toUpperCase()); +OutputFileFactory outputFileFactory = + OutputFileFactory.builderFor(table, partitionId, taskId).format(fileFormat).build(); Review Comment: Nit for readability, I think this needs more spacing it's a bit dense to read ########## docs/java-api.md: ########## @@ -147,6 +147,69 @@ t.newAppend().appendFile(data).commit(); t.commitTransaction(); ``` +### WriteData + +The java API can write data into iceberg table. + +First write data to the data file, then submit the data file, the data you write will take effect in the table. + +For example, add 1000 rows of data to the table. + +The structure of this table is the same as the demo table in java-api-quickstart + +```java +/** + * Schema schema = new Schema( + * Types.NestedField.required(1, "level", Types.StringType.get()), + * Types.NestedField.required(2, "event_time", Types.TimestampType.withZone()), + * Types.NestedField.required(3, "message", Types.StringType.get()), + * Types.NestedField.optional(4, "call_stack", Types.ListType.ofRequired(5, Types.StringType.get())) + * ); + * PartitionSpec spec = PartitionSpec.builderFor(schema) + * .hour("event_time") + * .identity("level") + * .build(); + */ + +GenericAppenderFactory appenderFactory = + new GenericAppenderFactory(table.schema(), table.spec()); +int partitionId = 1, taskId = 1; +FileFormat fileFormat = + FileFormat.valueOf( + table.properties().getOrDefault("write.format.default", "parquet").toUpperCase()); +OutputFileFactory outputFileFactory = + OutputFileFactory.builderFor(table, partitionId, taskId).format(fileFormat).build(); +// TaskWriter write records into file. (the same is ok for unpartition table) +long targetFileSizeInBytes = 50L * 1024 * 1024; +GenericTaskWriter<Record> genericTaskWriter = + new GenericTaskWriter( + table.spec(), + fileFormat, + appenderFactory, + outputFileFactory, + table.io(), + targetFileSizeInBytes); + +GenericRecord genericRecord = GenericRecord.create(table.schema()); +// assume write 1000 records Review Comment: Either `write 1000 records` or we may just leave it off. Since it's an example I think it's fine to keep it. ########## docs/java-api.md: ########## @@ -147,6 +147,69 @@ t.newAppend().appendFile(data).commit(); t.commitTransaction(); ``` +### WriteData + +The java API can write data into iceberg table. + +First write data to the data file, then submit the data file, the data you write will take effect in the table. + +For example, add 1000 rows of data to the table. + +The structure of this table is the same as the demo table in java-api-quickstart + +```java +/** + * Schema schema = new Schema( + * Types.NestedField.required(1, "level", Types.StringType.get()), + * Types.NestedField.required(2, "event_time", Types.TimestampType.withZone()), + * Types.NestedField.required(3, "message", Types.StringType.get()), + * Types.NestedField.optional(4, "call_stack", Types.ListType.ofRequired(5, Types.StringType.get())) + * ); + * PartitionSpec spec = PartitionSpec.builderFor(schema) + * .hour("event_time") + * .identity("level") + * .build(); + */ + +GenericAppenderFactory appenderFactory = + new GenericAppenderFactory(table.schema(), table.spec()); +int partitionId = 1, taskId = 1; +FileFormat fileFormat = + FileFormat.valueOf( + table.properties().getOrDefault("write.format.default", "parquet").toUpperCase()); +OutputFileFactory outputFileFactory = + OutputFileFactory.builderFor(table, partitionId, taskId).format(fileFormat).build(); +// TaskWriter write records into file. (the same is ok for unpartition table) +long targetFileSizeInBytes = 50L * 1024 * 1024; +GenericTaskWriter<Record> genericTaskWriter = + new GenericTaskWriter( + table.spec(), + fileFormat, + appenderFactory, + outputFileFactory, + table.io(), + targetFileSizeInBytes); + +GenericRecord genericRecord = GenericRecord.create(table.schema()); +// assume write 1000 records +for (int i = 0; i < 1000; i++) { + GenericRecord record = genericRecord.copy(); + record.setField("level", i % 6 == 0 ? "error" : "info"); + record.setField("event_time", OffsetDateTime.now()); + record.setField("message", "Iceberg is a great table format"); + record.setField("call_stack", Collections.singletonList("NullPointerException")); + genericTaskWriter.write(record); +} Review Comment: Nit: newline after the loop ########## data/src/main/java/org/apache/iceberg/data/GenericTaskWriter.java: ########## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import java.io.IOException; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.io.BaseTaskWriter; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.io.PartitionedFanoutWriter; +import org.apache.iceberg.io.UnpartitionedWriter; + +public class GenericTaskWriter<T extends StructLike> extends BaseTaskWriter<T> { Review Comment: I understand this wraps the writer depending on if it's partitioned or not but since this is public just want to make sure it's really needed. Going through the code it seems like every engine (Spark/Flink etc) implements their own writer and specifies their own partitioning. But effectively they all follow the same principle which is implement PartitionWriter<EnginesRowRepresentation> which applies the partition function on the row representation. The logic across the engine also looks duplicated with the same if(partition){use the partitioned writer) else {unpartitioned writer} so I think it's nice this PR has a generic writer which could be leverage. So this change makes sense to me. Maybe for partitioned writes we also allow passing in a custom partition function but we don't need to add that until we see a need. ########## docs/java-api.md: ########## @@ -147,6 +147,69 @@ t.newAppend().appendFile(data).commit(); t.commitTransaction(); ``` +### WriteData + +The java API can write data into iceberg table. + +First write data to the data file, then submit the data file, the data you write will take effect in the table. + +For example, add 1000 rows of data to the table. + +The structure of this table is the same as the demo table in java-api-quickstart + +```java +/** + * Schema schema = new Schema( + * Types.NestedField.required(1, "level", Types.StringType.get()), + * Types.NestedField.required(2, "event_time", Types.TimestampType.withZone()), + * Types.NestedField.required(3, "message", Types.StringType.get()), + * Types.NestedField.optional(4, "call_stack", Types.ListType.ofRequired(5, Types.StringType.get())) + * ); + * PartitionSpec spec = PartitionSpec.builderFor(schema) + * .hour("event_time") + * .identity("level") + * .build(); + */ + +GenericAppenderFactory appenderFactory = + new GenericAppenderFactory(table.schema(), table.spec()); +int partitionId = 1, taskId = 1; +FileFormat fileFormat = + FileFormat.valueOf( + table.properties().getOrDefault("write.format.default", "parquet").toUpperCase()); +OutputFileFactory outputFileFactory = + OutputFileFactory.builderFor(table, partitionId, taskId).format(fileFormat).build(); +// TaskWriter write records into file. (the same is ok for unpartition table) +long targetFileSizeInBytes = 50L * 1024 * 1024; +GenericTaskWriter<Record> genericTaskWriter = + new GenericTaskWriter( + table.spec(), + fileFormat, + appenderFactory, + outputFileFactory, + table.io(), + targetFileSizeInBytes); + +GenericRecord genericRecord = GenericRecord.create(table.schema()); +// assume write 1000 records +for (int i = 0; i < 1000; i++) { + GenericRecord record = genericRecord.copy(); + record.setField("level", i % 6 == 0 ? "error" : "info"); + record.setField("event_time", OffsetDateTime.now()); + record.setField("message", "Iceberg is a great table format"); + record.setField("call_stack", Collections.singletonList("NullPointerException")); + genericTaskWriter.write(record); +} +// after the data file is written above, +// the written data file is submitted to the metadata of the table through Table API. +AppendFiles appendFiles = table.newAppend(); +for (DataFile dataFile : genericTaskWriter.dataFiles()) { + appendFiles.appendFile(dataFile); +} Review Comment: I think the inline comment could be something like // Call the AppendFiles API on each of the data files and then instead of `submit data file` we do `Commit the AppendFiles operation` ########## docs/java-api.md: ########## @@ -147,6 +147,69 @@ t.newAppend().appendFile(data).commit(); t.commitTransaction(); ``` +### WriteData + +The java API can write data into iceberg table. Review Comment: "The Java API can be used to write data into Iceberg tables" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org