This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 7285212b [Chore](test) improve test example for doc (#513) 7285212b is described below commit 7285212b836ba166d0a9fc7ebefacc52948a8073 Author: wudi <676366...@qq.com> AuthorDate: Mon Nov 18 14:40:40 2024 +0800 [Chore](test) improve test example for doc (#513) --- .../doris/flink/example/DorisSinkExample.java | 58 ++++++++++++++----- .../flink/example/DorisSinkExampleRowData.java | 45 ++++++--------- .../flink/example/DorisSinkMultiTableExample.java | 55 +++++++++--------- .../doris/flink/example/DorisSourceDataStream.java | 67 ++++++++++++++++++++-- 4 files changed, 152 insertions(+), 73 deletions(-) diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSinkExample.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSinkExample.java index 35ef73fd..bcb16965 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSinkExample.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSinkExample.java @@ -40,6 +40,49 @@ import java.util.Properties; public class DorisSinkExample { public static void main(String[] args) throws Exception { + JSONFormatWrite(); + } + + public static void JSONFormatWrite() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.enableCheckpointing(30000); + DorisSink.Builder<String> builder = DorisSink.builder(); + + DorisOptions dorisOptions = + DorisOptions.builder() + .setFenodes("127.0.0.1:8030") + .setTableIdentifier("test.student") + .setUsername("root") + .setPassword("") + .build(); + + Properties properties = new Properties(); + properties.setProperty("read_json_by_line", "true"); + properties.setProperty("format", "json"); + + DorisExecutionOptions executionOptions = + DorisExecutionOptions.builder() + .setLabelPrefix("label-doris") + .setDeletable(false) + .setBatchMode(true) + .setStreamLoadProp(properties) + .build(); + + builder.setDorisReadOptions(DorisReadOptions.builder().build()) + .setDorisExecutionOptions(executionOptions) + .setSerializer(new SimpleStringSerializer()) + .setDorisOptions(dorisOptions); + + List<String> data = new ArrayList<>(); + data.add("{\"id\":3,\"name\":\"Michael\",\"age\":28}"); + data.add("{\"id\":4,\"name\":\"David\",\"age\":38}"); + + env.fromCollection(data).sinkTo(builder.build()); + env.execute("doris test"); + } + + public static void CSVFormatWrite() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setRuntimeMode(RuntimeExecutionMode.BATCH); @@ -49,24 +92,13 @@ public class DorisSinkExample { CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.milliseconds(30000))); DorisSink.Builder<String> builder = DorisSink.builder(); - final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder(); - readOptionBuilder - .setDeserializeArrowAsync(false) - .setDeserializeQueueSize(64) - .setExecMemLimit(2147483648L) - .setRequestQueryTimeoutS(3600) - .setRequestBatchSize(1000) - .setRequestConnectTimeoutMs(10000) - .setRequestReadTimeoutMs(10000) - .setRequestRetries(3) - .setRequestTabletSize(1024 * 1024); Properties properties = new Properties(); properties.setProperty("column_separator", ","); properties.setProperty("line_delimiter", "\n"); properties.setProperty("format", "csv"); DorisOptions.Builder dorisBuilder = DorisOptions.builder(); dorisBuilder - .setFenodes("127.0.0.1:8040") + .setFenodes("127.0.0.1:8030") .setTableIdentifier("db.table") .setUsername("test") .setPassword("test"); @@ -77,7 +109,7 @@ public class DorisSinkExample { .setBufferSize(8 * 1024) .setBufferCount(3); - builder.setDorisReadOptions(readOptionBuilder.build()) + builder.setDorisReadOptions(DorisReadOptions.builder().build()) .setDorisExecutionOptions(executionBuilder.build()) .setSerializer(new SimpleStringSerializer()) .setDorisOptions(dorisBuilder.build()); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSinkExampleRowData.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSinkExampleRowData.java index 8037e2ea..ae6aa593 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSinkExampleRowData.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSinkExampleRowData.java @@ -17,13 +17,8 @@ package org.apache.doris.flink.example; -import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.data.GenericRowData; @@ -45,40 +40,37 @@ public class DorisSinkExampleRowData { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - env.setRuntimeMode(RuntimeExecutionMode.BATCH); env.enableCheckpointing(10000); env.setParallelism(1); - env.getCheckpointConfig() - .enableExternalizedCheckpoints( - CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.milliseconds(30000))); + DorisSink.Builder<RowData> builder = DorisSink.builder(); Properties properties = new Properties(); properties.setProperty("column_separator", ","); properties.setProperty("line_delimiter", "\n"); - // properties.setProperty("read_json_by_line", "true"); - // properties.setProperty("format", "json"); + properties.setProperty("format", "csv"); DorisOptions.Builder dorisBuilder = DorisOptions.builder(); dorisBuilder .setFenodes("127.0.0.1:8030") - .setTableIdentifier("db.tbl") + .setTableIdentifier("test.students") .setUsername("root") .setPassword(""); DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder(); - executionBuilder.setLabelPrefix(UUID.randomUUID().toString()).setStreamLoadProp(properties); + executionBuilder + .setLabelPrefix(UUID.randomUUID().toString()) + .setDeletable(false) + .setStreamLoadProp(properties); // flink rowdata‘s schema - String[] fields = {"name", "age"}; - DataType[] types = {DataTypes.VARCHAR(256), DataTypes.INT()}; + String[] fields = {"id", "name", "age"}; + DataType[] types = {DataTypes.INT(), DataTypes.VARCHAR(256), DataTypes.INT()}; builder.setDorisExecutionOptions(executionBuilder.build()) .setSerializer( RowDataSerializer.builder() // serialize according to rowdata - .setType(LoadConstants.CSV) // .setType(LoadConstants.CSV) + .setType(LoadConstants.CSV) .setFieldDelimiter(",") - .setFieldNames(fields) // .setFieldDelimiter(",") + .setFieldNames(fields) .setFieldType(types) .build()) .setDorisOptions(dorisBuilder.build()); @@ -91,16 +83,17 @@ public class DorisSinkExampleRowData { @Override public void flatMap(String s, Collector<RowData> out) throws Exception { - GenericRowData genericRowData = new GenericRowData(2); + GenericRowData genericRowData = new GenericRowData(3); + genericRowData.setField(0, 1); genericRowData.setField( - 0, StringData.fromString("beijing")); - genericRowData.setField(1, 123); + 1, StringData.fromString("Michael")); + genericRowData.setField(2, 18); out.collect(genericRowData); - GenericRowData genericRowData2 = new GenericRowData(2); - genericRowData2.setField( - 0, StringData.fromString("shanghai")); - genericRowData2.setField(1, 1234); + GenericRowData genericRowData2 = new GenericRowData(3); + genericRowData2.setField(0, 2); + genericRowData2.setField(1, StringData.fromString("David")); + genericRowData2.setField(2, 38); out.collect(genericRowData2); } }); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSinkMultiTableExample.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSinkMultiTableExample.java index feff8b32..2aeb3956 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSinkMultiTableExample.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSinkMultiTableExample.java @@ -17,38 +17,26 @@ package org.apache.doris.flink.example; +import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; -import org.apache.doris.flink.sink.batch.DorisBatchSink; +import org.apache.doris.flink.sink.DorisSink; import org.apache.doris.flink.sink.batch.RecordWithMeta; import org.apache.doris.flink.sink.writer.serializer.RecordWithMetaSerializer; +import java.util.Arrays; import java.util.Properties; -import java.util.UUID; public class DorisSinkMultiTableExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); + env.enableCheckpointing(15000); - DorisBatchSink.Builder<RecordWithMeta> builder = DorisBatchSink.builder(); - final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder(); - - readOptionBuilder - .setDeserializeArrowAsync(false) - .setDeserializeQueueSize(64) - .setExecMemLimit(2147483648L) - .setRequestQueryTimeoutS(3600) - .setRequestBatchSize(1000) - .setRequestConnectTimeoutMs(10000) - .setRequestReadTimeoutMs(10000) - .setRequestRetries(3) - .setRequestTabletSize(1024 * 1024); - + DorisSink.Builder<RecordWithMeta> builder = DorisSink.builder(); Properties properties = new Properties(); properties.setProperty("column_separator", ","); properties.setProperty("line_delimiter", "\n"); @@ -56,7 +44,7 @@ public class DorisSinkMultiTableExample { DorisOptions.Builder dorisBuilder = DorisOptions.builder(); dorisBuilder .setFenodes("127.0.0.1:8030") - .setTableIdentifier("test.test_flink_tmp") + .setTableIdentifier("") .setUsername("root") .setPassword(""); @@ -66,21 +54,24 @@ public class DorisSinkMultiTableExample { .setLabelPrefix("label") .setStreamLoadProp(properties) .setDeletable(false) - .setBufferFlushMaxBytes(8 * 1024) - .setBufferFlushMaxRows(10) + .setBatchMode(true) + .setBufferFlushMaxBytes(10 * 1024 * 1024) + .setBufferFlushMaxRows(10000) .setBufferFlushIntervalMs(1000 * 10); - builder.setDorisReadOptions(readOptionBuilder.build()) + builder.setDorisReadOptions(DorisReadOptions.builder().build()) .setDorisExecutionOptions(executionBuilder.build()) .setDorisOptions(dorisBuilder.build()) .setSerializer(new RecordWithMetaSerializer()); - // RecordWithMeta record = new RecordWithMeta("test", "test_flink_tmp1", "wangwu,1"); - // RecordWithMeta record1 = new RecordWithMeta("test", "test_flink_tmp", "wangwu,1"); - // DataStreamSource<RecordWithMeta> stringDataStreamSource = env.fromCollection( - // Arrays.asList(record, record1)); - // stringDataStreamSource.sinkTo(builder.build()); + RecordWithMeta record = new RecordWithMeta("test", "test_flink_tmp1", "wangwu,1"); + RecordWithMeta record1 = new RecordWithMeta("test", "test_flink_tmp", "wangwu,1"); + DataStreamSource<RecordWithMeta> stringDataStreamSource = + env.fromCollection(Arrays.asList(record, record1)); + stringDataStreamSource.sinkTo(builder.build()); + /* + // mock unbounded streaming source env.addSource( new SourceFunction<RecordWithMeta>() { private Long id = 1000000L; @@ -90,6 +81,12 @@ public class DorisSinkMultiTableExample { while (true) { id = id + 1; RecordWithMeta record = + new RecordWithMeta( + "test", + "test_flink_tmp", + UUID.randomUUID() + ",1"); + out.collect(record); + record = new RecordWithMeta( "test", "test_flink_tmp1", @@ -98,10 +95,10 @@ public class DorisSinkMultiTableExample { record = new RecordWithMeta( "test", - "test_flink_tmp", + "test_flink_tmp2", UUID.randomUUID() + ",1"); out.collect(record); - Thread.sleep(3000); + Thread.sleep(1000); } } @@ -109,7 +106,7 @@ public class DorisSinkMultiTableExample { public void cancel() {} }) .sinkTo(builder.build()); - + **/ env.execute("doris multi table test"); } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSourceDataStream.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSourceDataStream.java index ee3fa135..ba57b950 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSourceDataStream.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/example/DorisSourceDataStream.java @@ -17,27 +17,84 @@ package org.apache.doris.flink.example; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.cfg.DorisStreamOptions; import org.apache.doris.flink.datastream.DorisSourceFunction; import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema; +import org.apache.doris.flink.source.DorisSource; +import java.util.List; import java.util.Properties; public class DorisSourceDataStream { public static void main(String[] args) throws Exception { + useArrowFlightSQLRead(); + } + + public static void useArrowFlightSQLRead() throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DorisOptions option = + DorisOptions.builder() + .setFenodes("127.0.0.1:8030") + .setTableIdentifier("test.students") + .setUsername("root") + .setPassword("") + .build(); + + DorisReadOptions readOptions = + DorisReadOptions.builder() + .setUseFlightSql(true) + .setFlightSqlPort(29747) + .setFilterQuery("age > 1") + .build(); + + DorisSource<List<?>> dorisSource = + DorisSource.<List<?>>builder() + .setDorisOptions(option) + .setDorisReadOptions(readOptions) + .setDeserializer(new SimpleListDeserializationSchema()) + .build(); + + env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source").print(); + env.execute("Doris Source Test"); + } + + public static void useThriftRead() throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DorisOptions option = + DorisOptions.builder() + .setFenodes("127.0.0.1:8030") + .setTableIdentifier("test.students") + .setUsername("root") + .setPassword("") + .build(); + + DorisReadOptions readOptions = DorisReadOptions.builder().build(); + DorisSource<List<?>> dorisSource = + DorisSource.<List<?>>builder() + .setDorisOptions(option) + .setDorisReadOptions(readOptions) + .setDeserializer(new SimpleListDeserializationSchema()) + .build(); + + env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source").print(); + env.execute("Doris Source Test"); + } + + public static void useSourceFunctionRead() throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); - properties.put("fenodes", "FE_IP:8030"); + properties.put("fenodes", "127.0.0.1:8030"); properties.put("username", "root"); properties.put("password", ""); - properties.put("table.identifier", "db.table"); - properties.put("doris.read.field", "id,code,name"); - properties.put("doris.filter.query", "name='doris'"); + properties.put("table.identifier", "test.students"); DorisStreamOptions options = new DorisStreamOptions(properties); - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); env.addSource(new DorisSourceFunction(options, new SimpleListDeserializationSchema())) .print(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org