Crossoverrr opened a new issue, #7769:
URL: https://github.com/apache/iceberg/issues/7769

   ### Apache Iceberg version
   
   0.13.0
   
   ### Query engine
   
   Flink
   
   ### Please describe the bug 🐞
   
   How to enable Iceberg UPSERT operation in Iceberg version 0.13.0 and flink 
1.13.2?
   I find object dynamicOptions is empty in method PlannerBase#translateToRel 
line 200, but I have set table.dynamic-table-options.enabled='true', 
write.upsert.enabled = true and format-version = 2 already.
   Please help me about this, this problem has sucked me a lot, I have tried a 
lot but nothing worked...
   codes is :
   `public static void main(String[] args)
               throws Exception
       {
           try {
               // set up the streaming execution environment
               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
               StreamTableEnvironment tblEnv = 
StreamTableEnvironment.create(env);
   
               org.apache.flink.configuration.Configuration flinkConf = 
tblEnv.getConfig().getConfiguration();
               flinkConf.setBoolean("table.dynamic-table-options.enabled", 
true);
   
               env.enableCheckpointing(1000);
   
               tblEnv.executeSql("create catalog " + catalogName + " with 
('type' = 'iceberg', 'catalog-type'='hadoop', 'warehouse'='" + BASE_PATH + 
"')");
   
               Map<String, String> props = new HashMap<>();
               props.put(TableProperties.DEFAULT_FILE_FORMAT, 
FileFormat.PARQUET.name());
               props.put(TableProperties.UPSERT_ENABLED, "true");
               props.put(TableProperties.FORMAT_VERSION, "2");
               /*tblEnv.executeSql(String.format("CREATE TABLE " + catalogName 
+ "." + databaseName + ".%s (id BIGINT,\n" +
                               "market_ad_id BIGINT,\n" +
                               "approval_notes STRING,\n" +
                               "network_id BIGINT,\n" +
                               "created_at STRING,\n" +
                               "updated_at STRING,\n" +
                               "external_approval_notes STRING, " +
                               "PRIMARY KEY(id) NOT ENFORCED) " +
                               " WITH %s",
                       sinkTable, toWithClause(props)));*/
   
               tblEnv.executeSql("insert into " + catalogName + "." + 
databaseName + "." + sinkTable +
                       " values (1,1,'11',1,'1','1','1') ");
   
   
               tblEnv.executeSql("select * from "  + catalogName + "." + 
databaseName + "." + sinkTable + " ").print();
   
   
           }
           catch (Exception e) {
               e.printStackTrace();
           }
       }`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to