vshinde-medacist opened a new issue, #31275:
URL: https://github.com/apache/doris/issues/31275

   Hi Team, 
   
   Seeking advice on below.
   
   We are conducting a PoC and are in the process of evaluating Flink and Doris 
integration using the below versions and dependencies. 
   
   - Flink-1.18.1
   - Doris-2.0.4
   - All the dependencies have been added to POM file
   
   Doris is up and running successfully (set up by following Quick Start from 
official Doris documentation). Able to access FE using - `http://10.0.2.15:8030`
   
   Below is the PoC code snippet (Java maven project) being used to read the 
CSV file as Flink FileSystem source and insert it into Doris table. 
   
   ```java
   import org.apache.flink.streaming.api.CheckpointingMode;
   import org.apache.flink.streaming.api.datastream.DataStream;
   import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
   import org.apache.flink.table.api.Table;
   import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
   import org.apache.flink.types.Row;
   import org.apache.flink.table.catalog.*;
   import org.apache.flink.table.api.EnvironmentSettings;
   import org.apache.flink.table.api.TableEnvironment;
   
   public class FlinkDorisIntegration {
       public static void main(String[] args) throws Exception {
   
           
           StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
           env.enableCheckpointing(1000);
           
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
           
env.getCheckpointConfig().setCheckpointStorage("file:///home/user/Documents/app_data/checkpoint/");
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
                
                // Flink FileSystem CSV format source table in default 
in-memory catalog
                tableEnv.executeSql("create table IF NOT EXISTS 
`default_catalog`.`default_database`.`fs_src`(station string, txdate string, 
txtime string, txseq string)" +
                                " with (\n" +
                                "'connector' = 'filesystem',\n" +
                                "'path'      = 
'file:///home/user/Documents/app_data/source/',\n" +
                                "'format'    = 'csv',\n" +
                                "'csv.ignore-parse-errors' = 'true',\n" +
                                "'csv.allow-comments' = 'true',\n" +
                                "'source.monitor-interval' = '1s'\n" +
                                ");");
                                
                // Create Doris Catalog (Not able to execute this, throwing an 
exception as illegalargumentexception )
                tableEnv.executeSql("CREATE CATALOG demo_catalog WITH('type' = 
'jdbc', 'default-database' = 'db_test', 'username' = 'root', 'password' = '', 
'base-url' = 'jdbc:mysql://10.0.2.15:9030')");
                 
                // Switch to Doris Catalog
                tableEnv.executeSql("USE CATALOG demo_catalog;");
                
                //Create Doris Sink table
                tableEnv.executeSql("CREATE TABLE IF NOT EXISTS 
db_test.flink_doris_sink (station varchar, txdate varchar, txtime varchar, 
txseq varchar) " +
                   " with (\n" +
                   "'connector' = 'doris',\n" +
                   "'fenodes'      = '10.0.2.15:8030',\n" +
                   "'table.identifier'    = 'db_test.flink_doris_sink',\n" +
                   "'username' = 'root',\n" +
                   "'password' = '',\n" +
                   "'sink.label-prefix' = 'doris_label'\n" +
                   ");");
                                
                // Insert into Doris table
                tableEnv.executeSql("INSERT INTO 
demo_catalog.db_test.flink_doris_sink " +
                   " (station, txdate, txtime, txseq)" +
                   " SELECT " +
                   " station, txdate, txtime, txseq " +
                   " FROM `default_catalog`.`default_database`.fs_src; ");
   
                env.execute();
        }
   }
   ````
   
   To access Doris table, I need to switch to Doris catalog but getting 
`illegalargumentexception` exception while trying to create Doris catalog from 
Flink. 
   
   **Can someone please help with the below:**
   - Is the approach used in the code snippet the right way to access Doris 
(create catalog, tables and insert)?
   - How to create Doris catalog/table from Flink and access it (any sample 
code or documentation would be helpful)
   - Any prerequisites have been missed?
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to