This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 7285212b [Chore](test) improve test example for doc (#513)
7285212b is described below

commit 7285212b836ba166d0a9fc7ebefacc52948a8073
Author: wudi <676366...@qq.com>
AuthorDate: Mon Nov 18 14:40:40 2024 +0800

    [Chore](test) improve test example for doc (#513)
---
 .../doris/flink/example/DorisSinkExample.java      | 58 ++++++++++++++-----
 .../flink/example/DorisSinkExampleRowData.java     | 45 ++++++---------
 .../flink/example/DorisSinkMultiTableExample.java  | 55 +++++++++---------
 .../doris/flink/example/DorisSourceDataStream.java | 67 ++++++++++++++++++++--
 4 files changed, 152 insertions(+), 73 deletions(-)

diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSinkExample.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSinkExample.java
index 35ef73fd..bcb16965 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSinkExample.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSinkExample.java
@@ -40,6 +40,49 @@ import java.util.Properties;
 public class DorisSinkExample {
 
     public static void main(String[] args) throws Exception {
+        JSONFormatWrite();
+    }
+
+    public static void JSONFormatWrite() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(30000);
+        DorisSink.Builder<String> builder = DorisSink.builder();
+
+        DorisOptions dorisOptions =
+                DorisOptions.builder()
+                        .setFenodes("127.0.0.1:8030")
+                        .setTableIdentifier("test.student")
+                        .setUsername("root")
+                        .setPassword("")
+                        .build();
+
+        Properties properties = new Properties();
+        properties.setProperty("read_json_by_line", "true");
+        properties.setProperty("format", "json");
+
+        DorisExecutionOptions executionOptions =
+                DorisExecutionOptions.builder()
+                        .setLabelPrefix("label-doris")
+                        .setDeletable(false)
+                        .setBatchMode(true)
+                        .setStreamLoadProp(properties)
+                        .build();
+
+        builder.setDorisReadOptions(DorisReadOptions.builder().build())
+                .setDorisExecutionOptions(executionOptions)
+                .setSerializer(new SimpleStringSerializer())
+                .setDorisOptions(dorisOptions);
+
+        List<String> data = new ArrayList<>();
+        data.add("{\"id\":3,\"name\":\"Michael\",\"age\":28}");
+        data.add("{\"id\":4,\"name\":\"David\",\"age\":38}");
+
+        env.fromCollection(data).sinkTo(builder.build());
+        env.execute("doris test");
+    }
+
+    public static void CSVFormatWrite() throws Exception {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
@@ -49,24 +92,13 @@ public class DorisSinkExample {
                         
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
         env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 
Time.milliseconds(30000)));
         DorisSink.Builder<String> builder = DorisSink.builder();
-        final DorisReadOptions.Builder readOptionBuilder = 
DorisReadOptions.builder();
-        readOptionBuilder
-                .setDeserializeArrowAsync(false)
-                .setDeserializeQueueSize(64)
-                .setExecMemLimit(2147483648L)
-                .setRequestQueryTimeoutS(3600)
-                .setRequestBatchSize(1000)
-                .setRequestConnectTimeoutMs(10000)
-                .setRequestReadTimeoutMs(10000)
-                .setRequestRetries(3)
-                .setRequestTabletSize(1024 * 1024);
         Properties properties = new Properties();
         properties.setProperty("column_separator", ",");
         properties.setProperty("line_delimiter", "\n");
         properties.setProperty("format", "csv");
         DorisOptions.Builder dorisBuilder = DorisOptions.builder();
         dorisBuilder
-                .setFenodes("127.0.0.1:8040")
+                .setFenodes("127.0.0.1:8030")
                 .setTableIdentifier("db.table")
                 .setUsername("test")
                 .setPassword("test");
@@ -77,7 +109,7 @@ public class DorisSinkExample {
                 .setBufferSize(8 * 1024)
                 .setBufferCount(3);
 
-        builder.setDorisReadOptions(readOptionBuilder.build())
+        builder.setDorisReadOptions(DorisReadOptions.builder().build())
                 .setDorisExecutionOptions(executionBuilder.build())
                 .setSerializer(new SimpleStringSerializer())
                 .setDorisOptions(dorisBuilder.build());
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSinkExampleRowData.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSinkExampleRowData.java
index 8037e2ea..ae6aa593 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSinkExampleRowData.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSinkExampleRowData.java
@@ -17,13 +17,8 @@
 
 package org.apache.doris.flink.example;
 
-import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.GenericRowData;
@@ -45,40 +40,37 @@ public class DorisSinkExampleRowData {
 
     public static void main(String[] args) throws Exception {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
         env.enableCheckpointing(10000);
         env.setParallelism(1);
-        env.getCheckpointConfig()
-                .enableExternalizedCheckpoints(
-                        
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
-        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 
Time.milliseconds(30000)));
+
         DorisSink.Builder<RowData> builder = DorisSink.builder();
 
         Properties properties = new Properties();
         properties.setProperty("column_separator", ",");
         properties.setProperty("line_delimiter", "\n");
-        // properties.setProperty("read_json_by_line", "true");
-        // properties.setProperty("format", "json");
+        properties.setProperty("format", "csv");
         DorisOptions.Builder dorisBuilder = DorisOptions.builder();
         dorisBuilder
                 .setFenodes("127.0.0.1:8030")
-                .setTableIdentifier("db.tbl")
+                .setTableIdentifier("test.students")
                 .setUsername("root")
                 .setPassword("");
         DorisExecutionOptions.Builder executionBuilder = 
DorisExecutionOptions.builder();
-        
executionBuilder.setLabelPrefix(UUID.randomUUID().toString()).setStreamLoadProp(properties);
+        executionBuilder
+                .setLabelPrefix(UUID.randomUUID().toString())
+                .setDeletable(false)
+                .setStreamLoadProp(properties);
 
         // flink rowdata‘s schema
-        String[] fields = {"name", "age"};
-        DataType[] types = {DataTypes.VARCHAR(256), DataTypes.INT()};
+        String[] fields = {"id", "name", "age"};
+        DataType[] types = {DataTypes.INT(), DataTypes.VARCHAR(256), 
DataTypes.INT()};
 
         builder.setDorisExecutionOptions(executionBuilder.build())
                 .setSerializer(
                         RowDataSerializer.builder() // serialize according to 
rowdata
-                                .setType(LoadConstants.CSV) // 
.setType(LoadConstants.CSV)
+                                .setType(LoadConstants.CSV)
                                 .setFieldDelimiter(",")
-                                .setFieldNames(fields) // 
.setFieldDelimiter(",")
+                                .setFieldNames(fields)
                                 .setFieldType(types)
                                 .build())
                 .setDorisOptions(dorisBuilder.build());
@@ -91,16 +83,17 @@ public class DorisSinkExampleRowData {
                                     @Override
                                     public void flatMap(String s, 
Collector<RowData> out)
                                             throws Exception {
-                                        GenericRowData genericRowData = new 
GenericRowData(2);
+                                        GenericRowData genericRowData = new 
GenericRowData(3);
+                                        genericRowData.setField(0, 1);
                                         genericRowData.setField(
-                                                0, 
StringData.fromString("beijing"));
-                                        genericRowData.setField(1, 123);
+                                                1, 
StringData.fromString("Michael"));
+                                        genericRowData.setField(2, 18);
                                         out.collect(genericRowData);
 
-                                        GenericRowData genericRowData2 = new 
GenericRowData(2);
-                                        genericRowData2.setField(
-                                                0, 
StringData.fromString("shanghai"));
-                                        genericRowData2.setField(1, 1234);
+                                        GenericRowData genericRowData2 = new 
GenericRowData(3);
+                                        genericRowData2.setField(0, 2);
+                                        genericRowData2.setField(1, 
StringData.fromString("David"));
+                                        genericRowData2.setField(2, 38);
                                         out.collect(genericRowData2);
                                     }
                                 });
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSinkMultiTableExample.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSinkMultiTableExample.java
index feff8b32..2aeb3956 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSinkMultiTableExample.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSinkMultiTableExample.java
@@ -17,38 +17,26 @@
 
 package org.apache.doris.flink.example;
 
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
 
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
-import org.apache.doris.flink.sink.batch.DorisBatchSink;
+import org.apache.doris.flink.sink.DorisSink;
 import org.apache.doris.flink.sink.batch.RecordWithMeta;
 import org.apache.doris.flink.sink.writer.serializer.RecordWithMetaSerializer;
 
+import java.util.Arrays;
 import java.util.Properties;
-import java.util.UUID;
 
 public class DorisSinkMultiTableExample {
     public static void main(String[] args) throws Exception {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(1);
+        env.enableCheckpointing(15000);
 
-        DorisBatchSink.Builder<RecordWithMeta> builder = 
DorisBatchSink.builder();
-        final DorisReadOptions.Builder readOptionBuilder = 
DorisReadOptions.builder();
-
-        readOptionBuilder
-                .setDeserializeArrowAsync(false)
-                .setDeserializeQueueSize(64)
-                .setExecMemLimit(2147483648L)
-                .setRequestQueryTimeoutS(3600)
-                .setRequestBatchSize(1000)
-                .setRequestConnectTimeoutMs(10000)
-                .setRequestReadTimeoutMs(10000)
-                .setRequestRetries(3)
-                .setRequestTabletSize(1024 * 1024);
-
+        DorisSink.Builder<RecordWithMeta> builder = DorisSink.builder();
         Properties properties = new Properties();
         properties.setProperty("column_separator", ",");
         properties.setProperty("line_delimiter", "\n");
@@ -56,7 +44,7 @@ public class DorisSinkMultiTableExample {
         DorisOptions.Builder dorisBuilder = DorisOptions.builder();
         dorisBuilder
                 .setFenodes("127.0.0.1:8030")
-                .setTableIdentifier("test.test_flink_tmp")
+                .setTableIdentifier("")
                 .setUsername("root")
                 .setPassword("");
 
@@ -66,21 +54,24 @@ public class DorisSinkMultiTableExample {
                 .setLabelPrefix("label")
                 .setStreamLoadProp(properties)
                 .setDeletable(false)
-                .setBufferFlushMaxBytes(8 * 1024)
-                .setBufferFlushMaxRows(10)
+                .setBatchMode(true)
+                .setBufferFlushMaxBytes(10 * 1024 * 1024)
+                .setBufferFlushMaxRows(10000)
                 .setBufferFlushIntervalMs(1000 * 10);
 
-        builder.setDorisReadOptions(readOptionBuilder.build())
+        builder.setDorisReadOptions(DorisReadOptions.builder().build())
                 .setDorisExecutionOptions(executionBuilder.build())
                 .setDorisOptions(dorisBuilder.build())
                 .setSerializer(new RecordWithMetaSerializer());
 
-        // RecordWithMeta record = new RecordWithMeta("test", 
"test_flink_tmp1", "wangwu,1");
-        // RecordWithMeta record1 = new RecordWithMeta("test", 
"test_flink_tmp", "wangwu,1");
-        // DataStreamSource<RecordWithMeta> stringDataStreamSource = 
env.fromCollection(
-        // Arrays.asList(record, record1));
-        // stringDataStreamSource.sinkTo(builder.build());
+        RecordWithMeta record = new RecordWithMeta("test", "test_flink_tmp1", 
"wangwu,1");
+        RecordWithMeta record1 = new RecordWithMeta("test", "test_flink_tmp", 
"wangwu,1");
+        DataStreamSource<RecordWithMeta> stringDataStreamSource =
+                env.fromCollection(Arrays.asList(record, record1));
+        stringDataStreamSource.sinkTo(builder.build());
 
+        /*
+        // mock unbounded streaming source
         env.addSource(
                         new SourceFunction<RecordWithMeta>() {
                             private Long id = 1000000L;
@@ -90,6 +81,12 @@ public class DorisSinkMultiTableExample {
                                 while (true) {
                                     id = id + 1;
                                     RecordWithMeta record =
+                                            new RecordWithMeta(
+                                                    "test",
+                                                    "test_flink_tmp",
+                                                    UUID.randomUUID() + ",1");
+                                    out.collect(record);
+                                    record =
                                             new RecordWithMeta(
                                                     "test",
                                                     "test_flink_tmp1",
@@ -98,10 +95,10 @@ public class DorisSinkMultiTableExample {
                                     record =
                                             new RecordWithMeta(
                                                     "test",
-                                                    "test_flink_tmp",
+                                                    "test_flink_tmp2",
                                                     UUID.randomUUID() + ",1");
                                     out.collect(record);
-                                    Thread.sleep(3000);
+                                    Thread.sleep(1000);
                                 }
                             }
 
@@ -109,7 +106,7 @@ public class DorisSinkMultiTableExample {
                             public void cancel() {}
                         })
                 .sinkTo(builder.build());
-
+         **/
         env.execute("doris multi table test");
     }
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSourceDataStream.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSourceDataStream.java
index ee3fa135..ba57b950 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSourceDataStream.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSourceDataStream.java
@@ -17,27 +17,84 @@
 
 package org.apache.doris.flink.example;
 
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.cfg.DorisStreamOptions;
 import org.apache.doris.flink.datastream.DorisSourceFunction;
 import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema;
+import org.apache.doris.flink.source.DorisSource;
 
+import java.util.List;
 import java.util.Properties;
 
 public class DorisSourceDataStream {
 
     public static void main(String[] args) throws Exception {
+        useArrowFlightSQLRead();
+    }
+
+    public static void useArrowFlightSQLRead() throws Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        DorisOptions option =
+                DorisOptions.builder()
+                        .setFenodes("127.0.0.1:8030")
+                        .setTableIdentifier("test.students")
+                        .setUsername("root")
+                        .setPassword("")
+                        .build();
+
+        DorisReadOptions readOptions =
+                DorisReadOptions.builder()
+                        .setUseFlightSql(true)
+                        .setFlightSqlPort(29747)
+                        .setFilterQuery("age > 1")
+                        .build();
+
+        DorisSource<List<?>> dorisSource =
+                DorisSource.<List<?>>builder()
+                        .setDorisOptions(option)
+                        .setDorisReadOptions(readOptions)
+                        .setDeserializer(new SimpleListDeserializationSchema())
+                        .build();
+
+        env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris 
source").print();
+        env.execute("Doris Source Test");
+    }
+
+    public static void useThriftRead() throws Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        DorisOptions option =
+                DorisOptions.builder()
+                        .setFenodes("127.0.0.1:8030")
+                        .setTableIdentifier("test.students")
+                        .setUsername("root")
+                        .setPassword("")
+                        .build();
+
+        DorisReadOptions readOptions = DorisReadOptions.builder().build();
+        DorisSource<List<?>> dorisSource =
+                DorisSource.<List<?>>builder()
+                        .setDorisOptions(option)
+                        .setDorisReadOptions(readOptions)
+                        .setDeserializer(new SimpleListDeserializationSchema())
+                        .build();
+
+        env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris 
source").print();
+        env.execute("Doris Source Test");
+    }
+
+    public static void useSourceFunctionRead() throws Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         Properties properties = new Properties();
-        properties.put("fenodes", "FE_IP:8030");
+        properties.put("fenodes", "127.0.0.1:8030");
         properties.put("username", "root");
         properties.put("password", "");
-        properties.put("table.identifier", "db.table");
-        properties.put("doris.read.field", "id,code,name");
-        properties.put("doris.filter.query", "name='doris'");
+        properties.put("table.identifier", "test.students");
         DorisStreamOptions options = new DorisStreamOptions(properties);
 
-        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(2);
         env.addSource(new DorisSourceFunction(options, new 
SimpleListDeserializationSchema()))
                 .print();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to