vshinde-medacist opened a new issue, #31275: URL: https://github.com/apache/doris/issues/31275
Hi Team, Seeking advice on below. We are conducting a PoC and are in the process of evaluating Flink and Doris integration using the below versions and dependencies. - Flink-1.18.1 - Doris-2.0.4 - All the dependencies have been added to POM file Doris is up and running successfully (set up by following Quick Start from official Doris documentation). Able to access FE using - `http://10.0.2.15:8030` Below is the PoC code snippet (Java maven project) being used to read the CSV file as Flink FileSystem source and insert it into Doris table. ```java import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; import org.apache.flink.table.catalog.*; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; public class FlinkDorisIntegration { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointStorage("file:///home/user/Documents/app_data/checkpoint/"); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // Flink FileSystem CSV format source table in default in-memory catalog tableEnv.executeSql("create table IF NOT EXISTS `default_catalog`.`default_database`.`fs_src`(station string, txdate string, txtime string, txseq string)" + " with (\n" + "'connector' = 'filesystem',\n" + "'path' = 'file:///home/user/Documents/app_data/source/',\n" + "'format' = 'csv',\n" + "'csv.ignore-parse-errors' = 'true',\n" + "'csv.allow-comments' = 'true',\n" + "'source.monitor-interval' = '1s'\n" + ");"); // Create Doris Catalog (Not able to execute this, throwing an exception as illegalargumentexception ) tableEnv.executeSql("CREATE CATALOG demo_catalog WITH('type' = 'jdbc', 'default-database' = 'db_test', 'username' = 'root', 'password' = '', 'base-url' = 'jdbc:mysql://10.0.2.15:9030')"); // Switch to Doris Catalog tableEnv.executeSql("USE CATALOG demo_catalog;"); //Create Doris Sink table tableEnv.executeSql("CREATE TABLE IF NOT EXISTS db_test.flink_doris_sink (station varchar, txdate varchar, txtime varchar, txseq varchar) " + " with (\n" + "'connector' = 'doris',\n" + "'fenodes' = '10.0.2.15:8030',\n" + "'table.identifier' = 'db_test.flink_doris_sink',\n" + "'username' = 'root',\n" + "'password' = '',\n" + "'sink.label-prefix' = 'doris_label'\n" + ");"); // Insert into Doris table tableEnv.executeSql("INSERT INTO demo_catalog.db_test.flink_doris_sink " + " (station, txdate, txtime, txseq)" + " SELECT " + " station, txdate, txtime, txseq " + " FROM `default_catalog`.`default_database`.fs_src; "); env.execute(); } } ```` To access Doris table, I need to switch to Doris catalog but getting `illegalargumentexception` exception while trying to create Doris catalog from Flink. **Can someone please help with the below:** - Is the approach used in the code snippet the right way to access Doris (create catalog, tables and insert)? - How to create Doris catalog/table from Flink and access it (any sample code or documentation would be helpful) - Any prerequisites have been missed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org