swapna267 commented on code in PR #13878:
URL: https://github.com/apache/iceberg/pull/13878#discussion_r2289493607


##########
flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java:
##########
@@ -431,4 +435,36 @@ public void testConsumeFromStartTag() throws Exception {
         .isInstanceOf(IllegalArgumentException.class)
         .hasMessage("START_SNAPSHOT_ID and START_TAG cannot both be set.");
   }
+
+  @TestTemplate
+  void testWithParallelismWithProps() {
+    int customScanParallelism = defaultJobParallelism + 1;
+    sql(
+        "CREATE TABLE %s (id INT, data VARCHAR, dt VARCHAR) WITH 
('scan.parallelism'='%s')",
+        TABLE, customScanParallelism);
+
+    final org.apache.flink.table.api.Table table =
+        getTableEnv().sqlQuery(String.format("select * from %s", TABLE));
+    final String explain = table.explain(ExplainDetail.JSON_EXECUTION_PLAN);
+    final String expectedPhysicalExecutionPlanFragment =
+        "\"parallelism\" : " + customScanParallelism;

Review Comment:
   There is no explicit api that we can use to extract it, so had to fall back 
to this option. 
   
   This is the explain output. As we have only Iceberg source configured in 
this SQL, we can expect parallelism to be only once and for the source node.
   
   ```
   == Abstract Syntax Tree ==
   LogicalProject(id=[$0], data=[$1], dt=[$2])
   +- LogicalTableScan(table=[[testhive, db, test_table]], hints=[[[OPTIONS 
inheritPath:[] options:{streaming=true, scan.parallelism=5}]]])
   
   == Optimized Physical Plan ==
   TableSourceScan(table=[[testhive, db, test_table]], fields=[id, data, dt], 
hints=[[[OPTIONS options:{streaming=true, scan.parallelism=5}]]])
   
   == Optimized Execution Plan ==
   TableSourceScan(table=[[testhive, db, test_table]], fields=[id, data, dt], 
hints=[[[OPTIONS options:{streaming=true, scan.parallelism=5}]]])
   
   == Physical Execution Plan ==
   {
     "nodes" : [ {
       "id" : 1,
       "type" : "Source: test_table[1]",
       "pact" : "Data Source",
       "contents" : "[1]:TableSourceScan(table=[[testhive, db, test_table]], 
fields=[id, data, dt], hints=[[[OPTIONS options:{streaming=true, 
scan.parallelism=5}]]])",
       "parallelism" : 5
     } ]
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to