This is an automated email from the ASF dual-hosted git repository.

zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 32fd54e  add multisink to DorisBatchSink (#223)
32fd54e is described below

commit 32fd54e3c1595897d02a188b00badebd0b9ef4e2
Author: wudi <676366...@qq.com>
AuthorDate: Mon Nov 6 16:27:08 2023 +0800

    add multisink to DorisBatchSink (#223)
    
    Support multi-table writing on DorisBatchSink
    Example:
    ```java
    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            DorisBatchSink.Builder<RecordWithMeta> builder = 
DorisBatchSink.builder();
            final DorisReadOptions.Builder readOptionBuilder = 
DorisReadOptions.builder();
    
            Properties properties = new Properties();
            properties.setProperty("column_separator", ",");
            properties.setProperty("line_delimiter", "\n");
            properties.setProperty("format", "csv");
            DorisOptions.Builder dorisBuilder = DorisOptions.builder();
            dorisBuilder.setFenodes("127.0.0.1:8030")
                    .setTableIdentifier("")
                    .setUsername("root")
                    .setPassword("");
    
            DorisExecutionOptions.Builder executionBuilder = 
DorisExecutionOptions.builder();
    
            executionBuilder.setLabelPrefix("label")
                    .setStreamLoadProp(properties)
                    .setDeletable(false)
                    .setBufferFlushMaxBytes(8 * 1024)
                    .setBufferFlushMaxRows(10)
                    .setBufferFlushIntervalMs(1000 * 10);
    
            builder.setDorisReadOptions(readOptionBuilder.build())
                    .setDorisExecutionOptions(executionBuilder.build())
                    .setDorisOptions(dorisBuilder.build());
    
           //Multiple table writing
           RecordWithMeta record = new RecordWithMeta("test", 
"test_flink_tmp1", "wangwu,1");
           RecordWithMeta record1 = new RecordWithMeta("test", 
"test_flink_tmp", "wangwu,1");
           DataStreamSource<RecordWithMeta> source = 
env.fromCollection(Arrays.asList(record, record1));
           source.sinkTo(builder.build());
    
    ```
---
 .../doris/flink/sink/batch/BatchRecordBuffer.java  |  25 +++++
 .../doris/flink/sink/batch/DorisBatchSink.java     |   1 -
 .../flink/sink/batch/DorisBatchStreamLoad.java     | 109 ++++++++++-----------
 .../doris/flink/sink/batch/DorisBatchWriter.java   |  29 +++++-
 .../doris/flink/sink/batch/RecordWithMeta.java     |  62 ++++++++++++
 .../doris/flink/sink/writer/LabelGenerator.java    |   4 +
 .../doris/flink/DorisSinkMultiTableExample.java    | 101 +++++++++++++++++++
 7 files changed, 271 insertions(+), 60 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java
index 5fa601d..1de6253 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java
@@ -36,6 +36,8 @@ public class BatchRecordBuffer {
     private int numOfRecords = 0;
     private int bufferSizeBytes = 0;
     private boolean loadBatchFirstRecord = true;
+    private String database;
+    private String table;
 
     public BatchRecordBuffer(){}
 
@@ -45,6 +47,14 @@ public class BatchRecordBuffer {
         this.buffer = ByteBuffer.allocate(bufferSize);
     }
 
+    public BatchRecordBuffer(String database, String table, byte[] 
lineDelimiter, int bufferSize) {
+        super();
+        this.database = database;
+        this.table = table;
+        this.lineDelimiter = lineDelimiter;
+        this.buffer = ByteBuffer.allocate(bufferSize);
+    }
+
     public void insert(byte[] record) {
         ensureCapacity(record.length);
         if(loadBatchFirstRecord){
@@ -141,4 +151,19 @@ public class BatchRecordBuffer {
         this.bufferSizeBytes = bufferSizeBytes;
     }
 
+    public String getDatabase() {
+        return database;
+    }
+
+    public void setDatabase(String database) {
+        this.database = database;
+    }
+
+    public String getTable() {
+        return table;
+    }
+
+    public void setTable(String table) {
+        this.table = table;
+    }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchSink.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchSink.java
index 2c578d4..37d3973 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchSink.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchSink.java
@@ -87,7 +87,6 @@ public class DorisBatchSink<IN> implements Sink<IN> {
         public DorisBatchSink<IN> build() {
             Preconditions.checkNotNull(dorisOptions);
             Preconditions.checkNotNull(dorisExecutionOptions);
-            Preconditions.checkNotNull(serializer);
             if(dorisReadOptions == null) {
                 dorisReadOptions = DorisReadOptions.builder().build();
             }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
index b6a3f65..6adb436 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
@@ -18,6 +18,7 @@
 package org.apache.doris.flink.sink.batch;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
@@ -29,8 +30,7 @@ import org.apache.doris.flink.sink.EscapeHandler;
 import org.apache.doris.flink.sink.HttpPutBuilder;
 import org.apache.doris.flink.sink.HttpUtil;
 import org.apache.doris.flink.sink.writer.LabelGenerator;
-
-import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.util.Preconditions;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.entity.ByteArrayEntity;
 import org.apache.http.impl.client.CloseableHttpClient;
@@ -44,9 +44,10 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -77,16 +78,14 @@ public class DorisBatchStreamLoad implements Serializable {
     private String hostPort;
     private final String username;
     private final String password;
-    private final String db;
-    private final String table;
     private final Properties loadProps;
-    private BatchRecordBuffer buffer;
+    private Map<String, BatchRecordBuffer> bufferMap = new 
ConcurrentHashMap<>();
     private DorisExecutionOptions executionOptions;
     private ExecutorService loadExecutorService;
     private LoadAsyncExecutor loadAsyncExecutor;
-    private BlockingQueue<BatchRecordBuffer> writeQueue;
-    private BlockingQueue<BatchRecordBuffer> readQueue;
+    private BlockingQueue<BatchRecordBuffer> flushQueue;
     private final AtomicBoolean started;
+    private volatile boolean loadThreadAlive = false;
     private AtomicReference<Throwable> exception = new AtomicReference<>(null);
     private CloseableHttpClient httpClient = new HttpUtil().getHttpClient();
     private BackendUtil backendUtil;
@@ -99,24 +98,18 @@ public class DorisBatchStreamLoad implements Serializable {
                 dorisOptions.getBenodes())
                 : new BackendUtil(RestService.getBackendsV2(dorisOptions, 
dorisReadOptions, LOG));
         this.hostPort = backendUtil.getAvailableBackend();
-        String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
-        this.db = tableInfo[0];
-        this.table = tableInfo[1];
         this.username = dorisOptions.getUsername();
         this.password = dorisOptions.getPassword();
-        this.loadUrl = String.format(LOAD_URL_PATTERN, hostPort, db, table);
         this.loadProps = executionOptions.getStreamLoadProp();
         this.labelGenerator = labelGenerator;
         this.lineDelimiter = 
EscapeHandler.escapeString(loadProps.getProperty(LINE_DELIMITER_KEY, 
LINE_DELIMITER_DEFAULT)).getBytes();
         this.executionOptions = executionOptions;
-        //init queue
-        this.writeQueue = new 
ArrayBlockingQueue<>(executionOptions.getFlushQueueSize());
-        LOG.info("init RecordBuffer capacity {}, count {}", 
executionOptions.getBufferFlushMaxBytes(), 
executionOptions.getFlushQueueSize());
-        for (int index = 0; index < executionOptions.getFlushQueueSize(); 
index++) {
-            this.writeQueue.add(new BatchRecordBuffer(this.lineDelimiter, 
executionOptions.getBufferFlushMaxBytes()));
+        this.flushQueue = new 
LinkedBlockingDeque<>(executionOptions.getFlushQueueSize());
+        if(StringUtils.isNotBlank(dorisOptions.getTableIdentifier())){
+            String[] tableInfo = 
dorisOptions.getTableIdentifier().split("\\.");
+            Preconditions.checkState(tableInfo.length == 2, "tableIdentifier 
input error, the format is database.table");
+            this.loadUrl = String.format(LOAD_URL_PATTERN, hostPort, 
tableInfo[0], tableInfo[1]);
         }
-        readQueue = new LinkedBlockingDeque<>();
-
         this.loadAsyncExecutor= new LoadAsyncExecutor();
         this.loadExecutorService = new ThreadPoolExecutor(1, 1, 0L, 
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1), new 
DefaultThreadFactory("streamload-executor"), new 
ThreadPoolExecutor.AbortPolicy());
         this.started = new AtomicBoolean(true);
@@ -128,26 +121,26 @@ public class DorisBatchStreamLoad implements Serializable 
{
      * @param record
      * @throws IOException
      */
-    public synchronized void writeRecord(byte[] record) throws 
InterruptedException {
+    public synchronized void writeRecord(String database, String table, byte[] 
record) throws InterruptedException {
         checkFlushException();
-        if(buffer == null){
-            buffer = takeRecordFromWriteQueue();
-        }
+        String bufferKey = getTableIdentifier(database, table);
+        BatchRecordBuffer buffer = bufferMap.computeIfAbsent(bufferKey, k -> 
new BatchRecordBuffer(database, table, this.lineDelimiter, 
executionOptions.getBufferFlushMaxBytes()));
         buffer.insert(record);
         //When it exceeds 80% of the byteSize,to flush, to avoid triggering 
bytebuffer expansion
         if (buffer.getBufferSizeBytes() >= 
executionOptions.getBufferFlushMaxBytes() * 0.8
                 || (executionOptions.getBufferFlushMaxRows() != 0 && 
buffer.getNumOfRecords() >= executionOptions.getBufferFlushMaxRows())) {
-            flush(false);
+            flush(bufferKey,false);
         }
     }
 
-    public synchronized void flush(boolean waitUtilDone) throws 
InterruptedException {
+    public synchronized void flush(String bufferKey, boolean waitUtilDone) 
throws InterruptedException {
         checkFlushException();
-        if (buffer != null && !buffer.isEmpty()) {
-            buffer.setLabelName(labelGenerator.generateBatchLabel());
-            BatchRecordBuffer tmpBuff = buffer;
-            readQueue.put(tmpBuff);
-            this.buffer = null;
+        if (null == bufferKey) {
+            for (String key : bufferMap.keySet()) {
+                flushBuffer(key);
+            }
+        } else if (bufferMap.containsKey(bufferKey)) {
+            flushBuffer(bufferKey);
         }
 
         if (waitUtilDone) {
@@ -155,20 +148,22 @@ public class DorisBatchStreamLoad implements Serializable 
{
         }
     }
 
-    private void putRecordToWriteQueue(BatchRecordBuffer buffer){
-        try {
-            writeQueue.put(buffer);
-        } catch (InterruptedException e) {
-            throw new RuntimeException("Failed to recycle a buffer to queue");
-        }
+    private synchronized void flushBuffer(String bufferKey) {
+        BatchRecordBuffer buffer = bufferMap.get(bufferKey);
+        
buffer.setLabelName(labelGenerator.generateBatchLabel(buffer.getTable()));
+        putRecordToFlushQueue(buffer);
+        bufferMap.remove(bufferKey);
     }
 
-    private BatchRecordBuffer takeRecordFromWriteQueue(){
+    private void putRecordToFlushQueue(BatchRecordBuffer buffer){
         checkFlushException();
+        if(!loadThreadAlive){
+            throw new RuntimeException("load thread already exit, write was 
interrupted");
+        }
         try {
-            return writeQueue.take();
+            flushQueue.put(buffer);
         } catch (InterruptedException e) {
-            throw new RuntimeException("Failed to take a buffer from queue");
+            throw new RuntimeException("Failed to put record buffer to flush 
queue");
         }
     }
 
@@ -178,31 +173,34 @@ public class DorisBatchStreamLoad implements Serializable 
{
         }
     }
 
-    private void waitAsyncLoadFinish() throws InterruptedException {
+    private void waitAsyncLoadFinish() {
         for(int i = 0; i < executionOptions.getFlushQueueSize() + 1 ; i++){
-            BatchRecordBuffer empty = takeRecordFromWriteQueue();
-            readQueue.put(empty);
+            BatchRecordBuffer empty = new BatchRecordBuffer();
+            putRecordToFlushQueue(empty);
         }
     }
 
+    private String getTableIdentifier(String database, String table) {
+        return database + "." + table;
+    }
+
     public void close(){
         //close async executor
         this.loadExecutorService.shutdown();
         this.started.set(false);
-
         //clear buffer
-        this.writeQueue.clear();
-        this.readQueue.clear();
+        this.flushQueue.clear();
     }
 
     class LoadAsyncExecutor implements Runnable {
         @Override
         public void run() {
             LOG.info("LoadAsyncExecutor start");
+            loadThreadAlive = true;
             while (started.get()) {
                 BatchRecordBuffer buffer = null;
                 try {
-                    buffer = readQueue.poll(2000L, TimeUnit.MILLISECONDS);
+                    buffer = flushQueue.poll(2000L, TimeUnit.MILLISECONDS);
                     if(buffer == null){
                         continue;
                     }
@@ -212,23 +210,20 @@ public class DorisBatchStreamLoad implements Serializable 
{
                 } catch (Exception e) {
                     LOG.error("worker running error", e);
                     exception.set(e);
+                    //clear queue to avoid writer thread blocking
+                    flushQueue.clear();
                     break;
-                } finally {
-                    //Recycle buffer to avoid writer thread blocking
-                    if(buffer != null){
-                        buffer.clear();
-                        putRecordToWriteQueue(buffer);
-                    }
                 }
             }
             LOG.info("LoadAsyncExecutor stop");
+            loadThreadAlive = false;
         }
 
         /**
          * execute stream load
          */
         public void load(String label, BatchRecordBuffer buffer) throws 
IOException{
-            refreshLoadUrl();
+            refreshLoadUrl(buffer.getDatabase(), buffer.getTable());
             ByteBuffer data = buffer.getData();
             ByteArrayEntity entity = new ByteArrayEntity(data.array(), 
data.arrayOffset(), data.limit());
             HttpPutBuilder putBuilder = new HttpPutBuilder();
@@ -266,14 +261,16 @@ public class DorisBatchStreamLoad implements Serializable 
{
                 }
                 retry++;
                 // get available backend retry
-                refreshLoadUrl();
+                refreshLoadUrl(buffer.getDatabase(), buffer.getTable());
                 putBuilder.setUrl(loadUrl);
             }
+            buffer.clear();
+            buffer = null;
         }
 
-        private void refreshLoadUrl(){
+        private void refreshLoadUrl(String database, String table){
             hostPort = backendUtil.getAvailableBackend();
-            loadUrl = String.format(LOAD_URL_PATTERN, hostPort, db, table);
+            loadUrl = String.format(LOAD_URL_PATTERN, hostPort, database, 
table);
         }
     }
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
index d4621c7..6b2ce02 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
@@ -24,11 +24,14 @@ import 
org.apache.doris.flink.sink.writer.DorisRecordSerializer;
 import org.apache.doris.flink.sink.writer.LabelGenerator;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.Objects;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -46,12 +49,20 @@ public class DorisBatchWriter<IN> implements SinkWriter<IN> 
{
     private final DorisRecordSerializer<IN> serializer;
     private final transient ScheduledExecutorService scheduledExecutorService;
     private transient volatile Exception flushException = null;
+    private String database;
+    private String table;
 
     public DorisBatchWriter(Sink.InitContext initContext,
                             DorisRecordSerializer<IN> serializer,
                             DorisOptions dorisOptions,
                             DorisReadOptions dorisReadOptions,
                             DorisExecutionOptions executionOptions) {
+        
if(!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())){
+            String[] tableInfo = 
dorisOptions.getTableIdentifier().split("\\.");
+            Preconditions.checkState(tableInfo.length == 2, "tableIdentifier 
input error, the format is database.table");
+            this.database = tableInfo[0];
+            this.table = tableInfo[1];
+        }
         LOG.info("labelPrefix " + executionOptions.getLabelPrefix());
         this.labelPrefix = executionOptions.getLabelPrefix() + "_" + 
initContext.getSubtaskId();
         this.labelGenerator = new LabelGenerator(labelPrefix, false);
@@ -72,7 +83,7 @@ public class DorisBatchWriter<IN> implements SinkWriter<IN> {
     private void intervalFlush() {
         try {
             LOG.info("interval flush triggered.");
-            batchStreamLoad.flush(false);
+            batchStreamLoad.flush(null, false);
         } catch (InterruptedException e) {
             flushException = e;
         }
@@ -81,18 +92,30 @@ public class DorisBatchWriter<IN> implements SinkWriter<IN> 
{
     @Override
     public void write(IN in, Context context) throws IOException, 
InterruptedException {
         checkFlushException();
+        if(in instanceof RecordWithMeta){
+            RecordWithMeta row = (RecordWithMeta) in;
+            if(StringUtils.isNullOrWhitespaceOnly(row.getTable())
+                    ||StringUtils.isNullOrWhitespaceOnly(row.getDatabase())
+                    ||row.getRecord() == null){
+                LOG.warn("Record or meta format is incorrect, ignore record 
db:{}, table:{}, row:{}", row.getDatabase(), row.getTable(), row.getRecord());
+                return;
+            }
+            batchStreamLoad.writeRecord(row.getDatabase(), row.getTable(), 
row.getRecord().getBytes(StandardCharsets.UTF_8));
+            return;
+        }
+
         byte[] serialize = serializer.serialize(in);
         if(Objects.isNull(serialize)){
             //ddl record
             return;
         }
-        batchStreamLoad.writeRecord(serialize);
+        batchStreamLoad.writeRecord(database, table, serialize);
     }
     @Override
     public void flush(boolean flush) throws IOException, InterruptedException {
         checkFlushException();
         LOG.info("checkpoint flush triggered.");
-        batchStreamLoad.flush(true);
+        batchStreamLoad.flush(null,  true);
     }
 
     @Override
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/RecordWithMeta.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/RecordWithMeta.java
new file mode 100644
index 0000000..7f4d269
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/RecordWithMeta.java
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.batch;
+
+public class RecordWithMeta {
+    private String database;
+    private String table;
+    private String record;
+
+    public RecordWithMeta() {
+    }
+
+    public RecordWithMeta(String database, String table, String record) {
+        this.database = database;
+        this.table = table;
+        this.record = record;
+    }
+
+    public String getDatabase() {
+        return database;
+    }
+
+    public void setDatabase(String database) {
+        this.database = database;
+    }
+
+    public String getTable() {
+        return table;
+    }
+
+    public void setTable(String table) {
+        this.table = table;
+    }
+
+    public String getRecord() {
+        return record;
+    }
+
+    public void setRecord(String record) {
+        this.record = record;
+    }
+
+    public String getTableIdentifier() {
+        return this.database + "." + this.table;
+    }
+
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
index 235b553..55f7811 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
@@ -38,4 +38,8 @@ public class LabelGenerator {
     public String generateBatchLabel() {
         return labelPrefix + "_" + UUID.randomUUID();
     }
+
+    public String generateBatchLabel(String table) {
+        return String.format("%s_%s_%s", labelPrefix, table, 
UUID.randomUUID());
+    }
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkMultiTableExample.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkMultiTableExample.java
new file mode 100644
index 0000000..eade292
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkMultiTableExample.java
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink;
+
+import org.apache.doris.flink.cfg.DorisExecutionOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.sink.batch.DorisBatchSink;
+import org.apache.doris.flink.sink.batch.RecordWithMeta;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import java.util.Properties;
+import java.util.UUID;
+
+
+public class DorisSinkMultiTableExample {
+    public static void main(String[] args) throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+
+        DorisBatchSink.Builder<RecordWithMeta> builder = 
DorisBatchSink.builder();
+        final DorisReadOptions.Builder readOptionBuilder = 
DorisReadOptions.builder();
+
+        readOptionBuilder.setDeserializeArrowAsync(false)
+                .setDeserializeQueueSize(64)
+                .setExecMemLimit(2147483648L)
+                .setRequestQueryTimeoutS(3600)
+                .setRequestBatchSize(1000)
+                .setRequestConnectTimeoutMs(10000)
+                .setRequestReadTimeoutMs(10000)
+                .setRequestRetries(3)
+                .setRequestTabletSize(1024 * 1024);
+
+        Properties properties = new Properties();
+        properties.setProperty("column_separator", ",");
+        properties.setProperty("line_delimiter", "\n");
+        properties.setProperty("format", "csv");
+        DorisOptions.Builder dorisBuilder = DorisOptions.builder();
+        dorisBuilder.setFenodes("127.0.0.1:8030")
+                .setTableIdentifier("test.test_flink_tmp")
+                .setUsername("root")
+                .setPassword("");
+
+        DorisExecutionOptions.Builder executionBuilder = 
DorisExecutionOptions.builder();
+
+        executionBuilder.setLabelPrefix("label")
+                .setStreamLoadProp(properties)
+                .setDeletable(false)
+                .setBufferFlushMaxBytes(8 * 1024)
+                .setBufferFlushMaxRows(10)
+                .setBufferFlushIntervalMs(1000 * 10);
+
+        builder.setDorisReadOptions(readOptionBuilder.build())
+                .setDorisExecutionOptions(executionBuilder.build())
+                .setDorisOptions(dorisBuilder.build());
+
+//        RecordWithMeta record = new RecordWithMeta("test", 
"test_flink_tmp1", "wangwu,1");
+//        RecordWithMeta record1 = new RecordWithMeta("test", 
"test_flink_tmp", "wangwu,1");
+//        DataStreamSource<RecordWithMeta> stringDataStreamSource = 
env.fromCollection(
+//                Arrays.asList(record, record1));
+//        stringDataStreamSource.sinkTo(builder.build());
+
+          env.addSource(new SourceFunction<RecordWithMeta>() {
+            private Long id = 1000000L;
+            @Override
+            public void run(SourceContext<RecordWithMeta> out) throws 
Exception {
+                while (true) {
+                    id = id + 1;
+                    RecordWithMeta record = new RecordWithMeta("test", 
"test_flink_tmp1", UUID.randomUUID() + ",1");
+                    out.collect(record);
+                    record = new RecordWithMeta("test", "test_flink_tmp", 
UUID.randomUUID() + ",1");
+                    out.collect(record);
+                    Thread.sleep(3000);
+                }
+            }
+
+            @Override
+            public void cancel() {
+
+            }
+        }).sinkTo(builder.build());
+
+        env.execute("doris multi table test");
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to