stevenzwu commented on code in PR #7039:
URL: https://github.com/apache/iceberg/pull/7039#discussion_r1161374565


##########
docs/flink-configuration.md:
##########
@@ -148,13 +148,14 @@ INSERT INTO tableName /*+ 
OPTIONS('upsert-enabled'='true') */
 ...
 ```
 
-| Flink option           | Default                                    | 
Description                                                  |
-| ---------------------- | ------------------------------------------ | 
------------------------------------------------------------ |
-| write-format           | Table write.format.default                 | File 
format to use for this write operation; parquet, avro, or orc |
-| target-file-size-bytes | As per table property                      | 
Overrides this table's write.target-file-size-bytes          |
-| upsert-enabled         | Table write.upsert.enabled                 | 
Overrides this table's write.upsert.enabled                  |
-| overwrite-enabled      | false                                      | 
Overwrite the table's data, overwrite mode shouldn't be enable when configuring 
to use UPSERT data stream. |
-| distribution-mode      | Table write.distribution-mode              | 
Overrides this table's write.distribution-mode               |
-| compression-codec      | Table write.(fileformat).compression-codec | 
Overrides this table's compression codec for this write      |
-| compression-level      | Table write.(fileformat).compression-level | 
Overrides this table's compression level for Parquet and Avro tables for this 
write |
-| compression-strategy   | Table write.orc.compression-strategy       | 
Overrides this table's compression strategy for ORC tables for this write |
+| Flink option           | Default                                             
                 | Description                                                  
                                                                      |
+|------------------------|----------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------|
+| write-format           | Table write.format.default                          
                 | File format to use for this write operation; parquet, avro, 
or orc                                                                 |
+| target-file-size-bytes | As per table property                               
                 | Overrides this table's write.target-file-size-bytes          
                                                                      |
+| upsert-enabled         | Table write.upsert.enabled                          
                 | Overrides this table's write.upsert.enabled                  
                                                                      |
+| overwrite-enabled      | false                                               
                 | Overwrite the table's data, overwrite mode shouldn't be 
enable when configuring to use UPSERT data stream.                         |
+| distribution-mode      | Table write.distribution-mode                       
                 | Overrides this table's write.distribution-mode               
                                                                      |
+| compression-codec      | Table write.(fileformat).compression-codec          
                 | Overrides this table's compression codec for this write      
                                                                      |
+| compression-level      | Table write.(fileformat).compression-level          
                 | Overrides this table's compression level for Parquet and 
Avro tables for this write                                                |
+| compression-strategy   | Table write.orc.compression-strategy                
                 | Overrides this table's compression strategy for ORC tables 
for this write                                                          |
+| write-parallelism      | The same as the parallel of the input operator to 
the write operator | Configuring the write parallel number for iceberg stream 
writer. By default, it is the same as the parallel of the input operator. |

Review Comment:
   The same as the parallel of the input operator to the write operator
   -->
   Upstream operator parallelism
   
   Configuring the write parallel number for iceberg stream writer. By default, 
it is the same as the parallel of the input operator.
   -->
   Overrides the writer parallelism



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java:
##########
@@ -178,6 +183,49 @@ public void testOverwriteTable() throws Exception {
         icebergTable, Lists.newArrayList(SimpleDataUtil.createRecord(2, "b")));
   }
 
+  @Test
+  public void testWriteParallelism() throws Exception {
+    String tableName = "test_write_parallelism";
+
+    List<Row> dataSet =
+        IntStream.range(1, 1000)
+            .mapToObj(i -> ImmutableList.of(Row.of(i, "aaa"), Row.of(i, 
"bbb"), Row.of(i, "ccc")))
+            .flatMap(List::stream)
+            .collect(Collectors.toList());
+    String dataId = 
BoundedTableFactory.registerDataSet(ImmutableList.of(dataSet));
+    sql(
+        "CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL)"
+            + " WITH ('connector'='BoundedSource', 'data-id'='%s')",
+        SOURCE_TABLE, dataId);
+
+    try {
+      sql("CREATE TABLE %s(id INT, data VARCHAR)", tableName);

Review Comment:
   why can't we reuse the table created in the `before` method?



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java:
##########
@@ -178,6 +183,49 @@ public void testOverwriteTable() throws Exception {
         icebergTable, Lists.newArrayList(SimpleDataUtil.createRecord(2, "b")));
   }
 
+  @Test
+  public void testWriteParallelism() throws Exception {
+    String tableName = "test_write_parallelism";
+
+    List<Row> dataSet =
+        IntStream.range(1, 1000)
+            .mapToObj(i -> ImmutableList.of(Row.of(i, "aaa"), Row.of(i, 
"bbb"), Row.of(i, "ccc")))
+            .flatMap(List::stream)
+            .collect(Collectors.toList());
+    String dataId = 
BoundedTableFactory.registerDataSet(ImmutableList.of(dataSet));
+    sql(
+        "CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL)"
+            + " WITH ('connector'='BoundedSource', 'data-id'='%s')",
+        SOURCE_TABLE, dataId);
+
+    try {
+      sql("CREATE TABLE %s(id INT, data VARCHAR)", tableName);
+
+      PlannerBase planner = (PlannerBase) ((TableEnvironmentImpl) 
getTableEnv()).getPlanner();
+      String insertSQL =
+          String.format(
+              "INSERT INTO %s /*+ OPTIONS('write-parallelism'='1') */ SELECT * 
FROM %s",
+              tableName, SOURCE_TABLE);
+      ModifyOperation operation = (ModifyOperation) 
planner.getParser().parse(insertSQL).get(0);
+      Transformation<?> dummySink = 
planner.translate(Collections.singletonList(operation)).get(0);
+      Transformation<?> committer = dummySink.getInputs().get(0);
+      Transformation<?> writer = committer.getInputs().get(0);
+
+      Assert.assertEquals("Should have the expected 1 parallelism.", 1, 
writer.getParallelism());
+
+      writer

Review Comment:
   why we are not doing `get(0)` for the upstream source operator?



##########
docs/flink-configuration.md:
##########
@@ -148,13 +148,14 @@ INSERT INTO tableName /*+ 
OPTIONS('upsert-enabled'='true') */
 ...
 ```
 
-| Flink option           | Default                                    | 
Description                                                  |
-| ---------------------- | ------------------------------------------ | 
------------------------------------------------------------ |
-| write-format           | Table write.format.default                 | File 
format to use for this write operation; parquet, avro, or orc |
-| target-file-size-bytes | As per table property                      | 
Overrides this table's write.target-file-size-bytes          |
-| upsert-enabled         | Table write.upsert.enabled                 | 
Overrides this table's write.upsert.enabled                  |
-| overwrite-enabled      | false                                      | 
Overwrite the table's data, overwrite mode shouldn't be enable when configuring 
to use UPSERT data stream. |
-| distribution-mode      | Table write.distribution-mode              | 
Overrides this table's write.distribution-mode               |
-| compression-codec      | Table write.(fileformat).compression-codec | 
Overrides this table's compression codec for this write      |
-| compression-level      | Table write.(fileformat).compression-level | 
Overrides this table's compression level for Parquet and Avro tables for this 
write |
-| compression-strategy   | Table write.orc.compression-strategy       | 
Overrides this table's compression strategy for ORC tables for this write |
+| Flink option           | Default                                             
                 | Description                                                  
                                                                      |
+|------------------------|----------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------|
+| write-format           | Table write.format.default                          
                 | File format to use for this write operation; parquet, avro, 
or orc                                                                 |
+| target-file-size-bytes | As per table property                               
                 | Overrides this table's write.target-file-size-bytes          
                                                                      |
+| upsert-enabled         | Table write.upsert.enabled                          
                 | Overrides this table's write.upsert.enabled                  
                                                                      |
+| overwrite-enabled      | false                                               
                 | Overwrite the table's data, overwrite mode shouldn't be 
enable when configuring to use UPSERT data stream.                         |
+| distribution-mode      | Table write.distribution-mode                       
                 | Overrides this table's write.distribution-mode               
                                                                      |
+| compression-codec      | Table write.(fileformat).compression-codec          
                 | Overrides this table's compression codec for this write      
                                                                      |
+| compression-level      | Table write.(fileformat).compression-level          
                 | Overrides this table's compression level for Parquet and 
Avro tables for this write                                                |
+| compression-strategy   | Table write.orc.compression-strategy                
                 | Overrides this table's compression strategy for ORC tables 
for this write                                                          |
+| write-parallelism      | The same as the parallel of the input operator to 
the write operator | Configuring the write parallel number for iceberg stream 
writer. By default, it is the same as the parallel of the input operator. |

Review Comment:
   Then maybe we can revert the table width change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to