This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git

commit 021e281e0e283e7c0c862ceaf878cc6b7e83de5b
Author: wudi <676366...@qq.com>
AuthorDate: Sun Jan 23 20:24:41 2022 +0800

    [Feature][flink-connector] support flink  delete option (#7457)
    
    * Flink Connector supports delete option on Unique models
    Co-authored-by: wudi <w...@shuhaisc.com>
---
 .../doris/flink/cfg/DorisExecutionOptions.java     |  38 +++--
 .../apache/doris/flink/cfg/DorisStreamOptions.java |  22 +--
 .../org/apache/doris/flink/rest/models/Schema.java |   9 ++
 .../flink/table/DorisDynamicOutputFormat.java      | 101 +++++++++---
 .../flink/table/DorisDynamicTableFactory.java      | 176 ++++++++++-----------
 .../doris/flink/table/DorisDynamicTableSink.java   |  24 +--
 6 files changed, 218 insertions(+), 152 deletions(-)

diff --git 
a/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java 
b/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index 47bb517..ad1ab07 100644
--- a/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++ b/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -26,8 +26,8 @@ import java.util.Properties;
  * JDBC sink batch options.
  */
 public class DorisExecutionOptions implements Serializable {
-    private static final long serialVersionUID = 1L;
 
+    private static final long serialVersionUID = 1L;
     public static final Integer DEFAULT_BATCH_SIZE = 10000;
     public static final Integer DEFAULT_MAX_RETRY_TIMES = 1;
     private static final Long DEFAULT_INTERVAL_MILLIS = 10000L;
@@ -41,12 +41,27 @@ public class DorisExecutionOptions implements Serializable {
      */
     private final Properties streamLoadProp;
 
-    public DorisExecutionOptions(Integer batchSize, Integer maxRetries, Long 
batchIntervalMs, Properties streamLoadProp) {
+    private final Boolean enableDelete;
+
+
+    public DorisExecutionOptions(Integer batchSize, Integer maxRetries, Long 
batchIntervalMs, Properties streamLoadProp, Boolean enableDelete) {
         Preconditions.checkArgument(maxRetries >= 0);
         this.batchSize = batchSize;
         this.maxRetries = maxRetries;
         this.batchIntervalMs = batchIntervalMs;
         this.streamLoadProp = streamLoadProp;
+        this.enableDelete = enableDelete;
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static DorisExecutionOptions defaults() {
+        Properties pro = new Properties();
+        pro.setProperty("format", "json");
+        pro.setProperty("strip_outer_array", "true");
+        return new Builder().setStreamLoadProp(pro).build();
     }
 
     public Integer getBatchSize() {
@@ -65,15 +80,8 @@ public class DorisExecutionOptions implements Serializable {
         return streamLoadProp;
     }
 
-    public static Builder builder() {
-        return new Builder();
-    }
-
-    public static DorisExecutionOptions defaults() {
-        Properties pro = new Properties();
-        pro.setProperty("format", "json");
-        pro.setProperty("strip_outer_array", "true");
-        return new Builder().setStreamLoadProp(pro).build();
+    public Boolean getEnableDelete() {
+        return enableDelete;
     }
 
     /**
@@ -84,6 +92,7 @@ public class DorisExecutionOptions implements Serializable {
         private Integer maxRetries = DEFAULT_MAX_RETRY_TIMES;
         private Long batchIntervalMs = DEFAULT_INTERVAL_MILLIS;
         private Properties streamLoadProp = new Properties();
+        private Boolean enableDelete = false;
 
         public Builder setBatchSize(Integer batchSize) {
             this.batchSize = batchSize;
@@ -105,8 +114,13 @@ public class DorisExecutionOptions implements Serializable 
{
             return this;
         }
 
+        public Builder setEnableDelete(Boolean enableDelete) {
+            this.enableDelete = enableDelete;
+            return this;
+        }
+
         public DorisExecutionOptions build() {
-            return new DorisExecutionOptions(batchSize, maxRetries, 
batchIntervalMs, streamLoadProp);
+            return new DorisExecutionOptions(batchSize, maxRetries, 
batchIntervalMs, streamLoadProp, enableDelete);
         }
     }
 
diff --git a/src/main/java/org/apache/doris/flink/cfg/DorisStreamOptions.java 
b/src/main/java/org/apache/doris/flink/cfg/DorisStreamOptions.java
index 016b29a..c5c2c16 100644
--- a/src/main/java/org/apache/doris/flink/cfg/DorisStreamOptions.java
+++ b/src/main/java/org/apache/doris/flink/cfg/DorisStreamOptions.java
@@ -32,8 +32,8 @@ public class DorisStreamOptions implements Serializable {
     private DorisReadOptions readOptions;
 
     public DorisStreamOptions(Properties prop) {
-         this.prop = prop;
-         init();
+        this.prop = prop;
+        init();
     }
 
     /**
@@ -47,17 +47,17 @@ public class DorisStreamOptions implements Serializable {
                 
.setTableIdentifier(prop.getProperty(ConfigurationOptions.TABLE_IDENTIFIER));
 
         DorisReadOptions.Builder readOptionsBuilder = 
DorisReadOptions.builder()
-                
.setDeserializeArrowAsync(Boolean.valueOf(prop.getProperty(ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC,ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT.toString())))
-                
.setDeserializeQueueSize(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE,ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT.toString())))
-                
.setExecMemLimit(Long.valueOf(prop.getProperty(ConfigurationOptions.DORIS_EXEC_MEM_LIMIT,ConfigurationOptions.DORIS_EXEC_MEM_LIMIT_DEFAULT.toString())))
+                
.setDeserializeArrowAsync(Boolean.valueOf(prop.getProperty(ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC,
 ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT.toString())))
+                
.setDeserializeQueueSize(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE,
 ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT.toString())))
+                
.setExecMemLimit(Long.valueOf(prop.getProperty(ConfigurationOptions.DORIS_EXEC_MEM_LIMIT,
 ConfigurationOptions.DORIS_EXEC_MEM_LIMIT_DEFAULT.toString())))
                 
.setFilterQuery(prop.getProperty(ConfigurationOptions.DORIS_FILTER_QUERY))
                 
.setReadFields(prop.getProperty(ConfigurationOptions.DORIS_READ_FIELD))
-                
.setRequestQueryTimeoutS(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S,ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT.toString())))
-                
.setRequestBatchSize(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_BATCH_SIZE,ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT.toString())))
-                
.setRequestConnectTimeoutMs(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS,ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT.toString())))
-                
.setRequestReadTimeoutMs(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS,ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT.toString())))
-                
.setRequestRetries(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_REQUEST_RETRIES,ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT.toString())))
-                
.setRequestTabletSize(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_TABLET_SIZE,ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT.toString())));
+                
.setRequestQueryTimeoutS(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S,
 ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT.toString())))
+                
.setRequestBatchSize(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_BATCH_SIZE,
 ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT.toString())))
+                
.setRequestConnectTimeoutMs(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS,
 ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT.toString())))
+                
.setRequestReadTimeoutMs(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS,
 ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT.toString())))
+                
.setRequestRetries(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_REQUEST_RETRIES,
 ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT.toString())))
+                
.setRequestTabletSize(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_TABLET_SIZE,
 ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT.toString())));
 
         this.options = optionsBuilder.build();
         this.readOptions = readOptionsBuilder.build();
diff --git a/src/main/java/org/apache/doris/flink/rest/models/Schema.java 
b/src/main/java/org/apache/doris/flink/rest/models/Schema.java
index 314aa65..e274352 100644
--- a/src/main/java/org/apache/doris/flink/rest/models/Schema.java
+++ b/src/main/java/org/apache/doris/flink/rest/models/Schema.java
@@ -23,6 +23,7 @@ import java.util.Objects;
 
 public class Schema {
     private int status = 0;
+    private String keysType;
     private List<Field> properties;
 
     public Schema() {
@@ -41,6 +42,14 @@ public class Schema {
         this.status = status;
     }
 
+    public String getKeysType() {
+        return keysType;
+    }
+
+    public void setKeysType(String keysType) {
+        this.keysType = keysType;
+    }
+
     public List<Field> getProperties() {
         return properties;
     }
diff --git 
a/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java 
b/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
index 2a1cec4..44e0a6a 100644
--- a/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
+++ b/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
@@ -23,12 +23,14 @@ import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.exception.DorisException;
 import org.apache.doris.flink.exception.StreamLoadException;
 import org.apache.doris.flink.rest.RestService;
+import org.apache.doris.flink.rest.models.Schema;
 import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,6 +48,7 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.table.data.RowData.createFieldGetter;
 
@@ -58,6 +61,7 @@ public class DorisDynamicOutputFormat<T> extends 
RichOutputFormat<T> {
     private static final Logger LOG = 
LoggerFactory.getLogger(DorisDynamicOutputFormat.class);
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
+    private static final String COLUMNS_KEY = "columns";
     private static final String FIELD_DELIMITER_KEY = "column_separator";
     private static final String FIELD_DELIMITER_DEFAULT = "\t";
     private static final String LINE_DELIMITER_KEY = "line_delimiter";
@@ -67,20 +71,21 @@ public class DorisDynamicOutputFormat<T> extends 
RichOutputFormat<T> {
     private static final String NULL_VALUE = "\\N";
     private static final String ESCAPE_DELIMITERS_KEY = "escape_delimiters";
     private static final String ESCAPE_DELIMITERS_DEFAULT = "false";
-
-    private String fieldDelimiter;
-    private String lineDelimiter;
+    private static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__";
+    private static final String UNIQUE_KEYS_TYPE = "UNIQUE_KEYS";
     private final String[] fieldNames;
     private final boolean jsonFormat;
+    private final RowData.FieldGetter[] fieldGetters;
+    private final List batch = new ArrayList<>();
+    private String fieldDelimiter;
+    private String lineDelimiter;
     private DorisOptions options;
     private DorisReadOptions readOptions;
     private DorisExecutionOptions executionOptions;
     private DorisStreamLoad dorisStreamLoad;
-    private final RowData.FieldGetter[] fieldGetters;
+    private String keysType;
 
-    private final List batch = new ArrayList<>();
     private transient volatile boolean closed = false;
-
     private transient ScheduledExecutorService scheduler;
     private transient ScheduledFuture<?> scheduledFuture;
     private transient volatile Exception flushException;
@@ -93,9 +98,42 @@ public class DorisDynamicOutputFormat<T> extends 
RichOutputFormat<T> {
         this.options = option;
         this.readOptions = readOptions;
         this.executionOptions = executionOptions;
+        this.fieldNames = fieldNames;
+        this.jsonFormat = 
FORMAT_JSON_VALUE.equals(executionOptions.getStreamLoadProp().getProperty(FORMAT_KEY));
+        this.keysType = parseKeysType();
 
-        Properties streamLoadProp = executionOptions.getStreamLoadProp();
+        handleStreamloadProp();
+        this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
+        for (int i = 0; i < logicalTypes.length; i++) {
+            fieldGetters[i] = createFieldGetter(logicalTypes[i], i);
+        }
+    }
+
+    /**
+     * parse table keysType
+     *
+     * @return keysType
+     */
+    private String parseKeysType() {
+        try {
+            Schema schema = RestService.getSchema(options, readOptions, LOG);
+            return schema.getKeysType();
+        } catch (DorisException e) {
+            throw new RuntimeException("Failed fetch doris table schema: " + 
options.getTableIdentifier());
+        }
+    }
+
+    /**
+     * A builder used to set parameters to the output format's configuration 
in a fluent way.
+     *
+     * @return builder
+     */
+    public static Builder builder() {
+        return new Builder();
+    }
 
+    private void handleStreamloadProp() {
+        Properties streamLoadProp = executionOptions.getStreamLoadProp();
         boolean ifEscape = 
Boolean.parseBoolean(streamLoadProp.getProperty(ESCAPE_DELIMITERS_KEY, 
ESCAPE_DELIMITERS_DEFAULT));
         if (ifEscape) {
             this.fieldDelimiter = 
escapeString(streamLoadProp.getProperty(FIELD_DELIMITER_KEY,
@@ -113,11 +151,13 @@ public class DorisDynamicOutputFormat<T> extends 
RichOutputFormat<T> {
                     LINE_DELIMITER_DEFAULT);
         }
 
-        this.fieldNames = fieldNames;
-        this.jsonFormat = 
FORMAT_JSON_VALUE.equals(executionOptions.getStreamLoadProp().getProperty(FORMAT_KEY));
-        this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
-        for (int i = 0; i < logicalTypes.length; i++) {
-            fieldGetters[i] = createFieldGetter(logicalTypes[i], i);
+        //add column key when fieldNames is not empty
+        if (!streamLoadProp.containsKey(COLUMNS_KEY) && fieldNames != null && 
fieldNames.length > 0) {
+            String columns = String.join(",", 
Arrays.stream(fieldNames).map(item -> String.format("`%s`", 
item.trim().replace("`", ""))).collect(Collectors.toList()));
+            if (enableBatchDelete()) {
+                columns = String.format("%s,%s", columns, DORIS_DELETE_SIGN);
+            }
+            streamLoadProp.put(COLUMNS_KEY, columns);
         }
     }
 
@@ -133,6 +173,10 @@ public class DorisDynamicOutputFormat<T> extends 
RichOutputFormat<T> {
         return buf.toString();
     }
 
+    private boolean enableBatchDelete() {
+        return executionOptions.getEnableDelete() || 
UNIQUE_KEYS_TYPE.equals(keysType);
+    }
+
     @Override
     public void configure(Configuration configuration) {
     }
@@ -195,9 +239,16 @@ public class DorisDynamicOutputFormat<T> extends 
RichOutputFormat<T> {
                     value.add(data);
                 }
             }
+            // add doris delete sign
+            if (enableBatchDelete()) {
+                if (jsonFormat) {
+                    valueMap.put(DORIS_DELETE_SIGN, 
parseDeleteSign(rowData.getRowKind()));
+                } else {
+                    value.add(parseDeleteSign(rowData.getRowKind()));
+                }
+            }
             Object data = jsonFormat ? valueMap : value.toString();
             batch.add(data);
-
         } else if (row instanceof String) {
             batch.add(row);
         } else {
@@ -205,6 +256,17 @@ public class DorisDynamicOutputFormat<T> extends 
RichOutputFormat<T> {
         }
     }
 
+    private String parseDeleteSign(RowKind rowKind) {
+        if (RowKind.INSERT.equals(rowKind) || 
RowKind.UPDATE_AFTER.equals(rowKind)) {
+            return "0";
+        } else if (RowKind.DELETE.equals(rowKind) || 
RowKind.UPDATE_BEFORE.equals(rowKind)) {
+            return "1";
+        } else {
+            throw new RuntimeException("Unrecognized row kind:" + 
rowKind.toString());
+        }
+    }
+
+
     @Override
     public synchronized void close() throws IOException {
         if (!closed) {
@@ -264,7 +326,6 @@ public class DorisDynamicOutputFormat<T> extends 
RichOutputFormat<T> {
         }
     }
 
-
     private String getBackend() throws IOException {
         try {
             //get be url from fe
@@ -275,16 +336,6 @@ public class DorisDynamicOutputFormat<T> extends 
RichOutputFormat<T> {
         }
     }
 
-
-    /**
-     * A builder used to set parameters to the output format's configuration 
in a fluent way.
-     *
-     * @return builder
-     */
-    public static Builder builder() {
-        return new Builder();
-    }
-
     /**
      * Builder for {@link DorisDynamicOutputFormat}.
      */
@@ -348,5 +399,7 @@ public class DorisDynamicOutputFormat<T> extends 
RichOutputFormat<T> {
                     optionsBuilder.build(), readOptions, executionOptions, 
logicalTypes, fieldNames
             );
         }
+
+
     }
 }
diff --git 
a/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java 
b/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index 4654c0d..dbba859 100644
--- a/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++ b/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -60,99 +60,87 @@ public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory
     public static final ConfigOption<String> TABLE_IDENTIFIER = 
ConfigOptions.key("table.identifier").stringType().noDefaultValue().withDescription("the
 jdbc table name.");
     public static final ConfigOption<String> USERNAME = 
ConfigOptions.key("username").stringType().noDefaultValue().withDescription("the
 jdbc user name.");
     public static final ConfigOption<String> PASSWORD = 
ConfigOptions.key("password").stringType().noDefaultValue().withDescription("the
 jdbc password.");
-
+    // Prefix for Doris StreamLoad specific properties.
+    public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties.";
     // doris options
     private static final ConfigOption<String> DORIS_READ_FIELD = ConfigOptions
-        .key("doris.read.field")
-        .stringType()
-        .noDefaultValue()
-        .withDescription("List of column names in the Doris table, separated 
by commas");
-
+            .key("doris.read.field")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("List of column names in the Doris table, 
separated by commas");
     private static final ConfigOption<String> DORIS_FILTER_QUERY = 
ConfigOptions
-        .key("doris.filter.query")
-        .stringType()
-        .noDefaultValue()
-        .withDescription("Filter expression of the query, which is 
transparently transmitted to Doris. Doris uses this expression to complete 
source-side data filtering");
-
+            .key("doris.filter.query")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("Filter expression of the query, which is 
transparently transmitted to Doris. Doris uses this expression to complete 
source-side data filtering");
     private static final ConfigOption<Integer> DORIS_TABLET_SIZE = 
ConfigOptions
-        .key("doris.request.tablet.size")
-        .intType()
-        .defaultValue(DORIS_TABLET_SIZE_DEFAULT)
-        .withDescription("");
-
+            .key("doris.request.tablet.size")
+            .intType()
+            .defaultValue(DORIS_TABLET_SIZE_DEFAULT)
+            .withDescription("");
     private static final ConfigOption<Integer> 
DORIS_REQUEST_CONNECT_TIMEOUT_MS = ConfigOptions
-        .key("doris.request.connect.timeout.ms")
-        .intType()
-        .defaultValue(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT)
-        .withDescription("");
-
+            .key("doris.request.connect.timeout.ms")
+            .intType()
+            .defaultValue(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT)
+            .withDescription("");
     private static final ConfigOption<Integer> DORIS_REQUEST_READ_TIMEOUT_MS = 
ConfigOptions
-        .key("doris.request.read.timeout.ms")
-        .intType()
-        .defaultValue(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT)
-        .withDescription("");
-
+            .key("doris.request.read.timeout.ms")
+            .intType()
+            .defaultValue(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT)
+            .withDescription("");
     private static final ConfigOption<Integer> DORIS_REQUEST_QUERY_TIMEOUT_S = 
ConfigOptions
-        .key("doris.request.query.timeout.s")
-        .intType()
-        .defaultValue(DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT)
-        .withDescription("");
-
+            .key("doris.request.query.timeout.s")
+            .intType()
+            .defaultValue(DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT)
+            .withDescription("");
     private static final ConfigOption<Integer> DORIS_REQUEST_RETRIES = 
ConfigOptions
-        .key("doris.request.retries")
-        .intType()
-        .defaultValue(DORIS_REQUEST_RETRIES_DEFAULT)
-        .withDescription("");
-
+            .key("doris.request.retries")
+            .intType()
+            .defaultValue(DORIS_REQUEST_RETRIES_DEFAULT)
+            .withDescription("");
     private static final ConfigOption<Boolean> DORIS_DESERIALIZE_ARROW_ASYNC = 
ConfigOptions
-        .key("doris.deserialize.arrow.async")
-        .booleanType()
-        .defaultValue(DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT)
-        .withDescription("");
-
+            .key("doris.deserialize.arrow.async")
+            .booleanType()
+            .defaultValue(DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT)
+            .withDescription("");
     private static final ConfigOption<Integer> DORIS_DESERIALIZE_QUEUE_SIZE = 
ConfigOptions
-        .key("doris.request.retriesdoris.deserialize.queue.size")
-        .intType()
-        .defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT)
-        .withDescription("");
-
-
+            .key("doris.request.retriesdoris.deserialize.queue.size")
+            .intType()
+            .defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT)
+            .withDescription("");
     private static final ConfigOption<Integer> DORIS_BATCH_SIZE = ConfigOptions
-        .key("doris.batch.size")
-        .intType()
-        .defaultValue(DORIS_BATCH_SIZE_DEFAULT)
-        .withDescription("");
-
+            .key("doris.batch.size")
+            .intType()
+            .defaultValue(DORIS_BATCH_SIZE_DEFAULT)
+            .withDescription("");
     private static final ConfigOption<Long> DORIS_EXEC_MEM_LIMIT = 
ConfigOptions
-        .key("doris.exec.mem.limit")
-        .longType()
-        .defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT)
-        .withDescription("");
-
+            .key("doris.exec.mem.limit")
+            .longType()
+            .defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT)
+            .withDescription("");
     // flink write config options
     private static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS = 
ConfigOptions
-        .key("sink.batch.size")
-        .intType()
-        .defaultValue(100)
-        .withDescription("the flush max size (includes all append, upsert and 
delete records), over this number" +
-            " of records, will flush data. The default value is 100.");
-
+            .key("sink.batch.size")
+            .intType()
+            .defaultValue(100)
+            .withDescription("the flush max size (includes all append, upsert 
and delete records), over this number" +
+                    " of records, will flush data. The default value is 100.");
     private static final ConfigOption<Integer> SINK_MAX_RETRIES = ConfigOptions
-        .key("sink.max-retries")
-        .intType()
-        .defaultValue(3)
-        .withDescription("the max retry times if writing records to database 
failed.");
-
+            .key("sink.max-retries")
+            .intType()
+            .defaultValue(3)
+            .withDescription("the max retry times if writing records to 
database failed.");
     private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL = 
ConfigOptions
-        .key("sink.batch.interval")
-        .durationType()
-        .defaultValue(Duration.ofSeconds(1))
-        .withDescription("the flush interval mills, over this time, 
asynchronous threads will flush data. The " +
-            "default value is 1s.");
-
-
-    // Prefix for Doris StreamLoad specific properties.
-    public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties.";
+            .key("sink.batch.interval")
+            .durationType()
+            .defaultValue(Duration.ofSeconds(1))
+            .withDescription("the flush interval mills, over this time, 
asynchronous threads will flush data. The " +
+                    "default value is 1s.");
+    private static final ConfigOption<Boolean> SINK_ENABLE_DELETE = 
ConfigOptions
+            .key("sink.enable-delete")
+            .booleanType()
+            .defaultValue(true)
+            .withDescription("whether to enable the delete function");
 
     @Override
     public String factoryIdentifier() {
@@ -190,6 +178,7 @@ public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory
         options.add(SINK_BUFFER_FLUSH_MAX_ROWS);
         options.add(SINK_MAX_RETRIES);
         options.add(SINK_BUFFER_FLUSH_INTERVAL);
+        options.add(SINK_ENABLE_DELETE);
         return options;
     }
 
@@ -199,7 +188,7 @@ public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory
         // or use the provided helper utility
         final FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
         // validate all options
-        helper.validate();
+        helper.validateExcept(STREAM_LOAD_PROP_PREFIX);
         // get the validated options
         final ReadableConfig options = helper.getOptions();
         // derive the produced data type (excluding computed columns) from the 
catalog table
@@ -207,16 +196,16 @@ public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory
         TableSchema physicalSchema = 
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
         // create and return dynamic table source
         return new DorisDynamicTableSource(
-            getDorisOptions(helper.getOptions()),
-            getDorisReadOptions(helper.getOptions()),
-            physicalSchema);
+                getDorisOptions(helper.getOptions()),
+                getDorisReadOptions(helper.getOptions()),
+                physicalSchema);
     }
 
     private DorisOptions getDorisOptions(ReadableConfig readableConfig) {
         final String fenodes = readableConfig.get(FENODES);
         final DorisOptions.Builder builder = DorisOptions.builder()
-            .setFenodes(fenodes)
-            .setTableIdentifier(readableConfig.get(TABLE_IDENTIFIER));
+                .setFenodes(fenodes)
+                .setTableIdentifier(readableConfig.get(TABLE_IDENTIFIER));
 
         readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername);
         readableConfig.getOptional(PASSWORD).ifPresent(builder::setPassword);
@@ -226,16 +215,16 @@ public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory
     private DorisReadOptions getDorisReadOptions(ReadableConfig 
readableConfig) {
         final DorisReadOptions.Builder builder = DorisReadOptions.builder();
         
builder.setDeserializeArrowAsync(readableConfig.get(DORIS_DESERIALIZE_ARROW_ASYNC))
-            
.setDeserializeQueueSize(readableConfig.get(DORIS_DESERIALIZE_QUEUE_SIZE))
-            .setExecMemLimit(readableConfig.get(DORIS_EXEC_MEM_LIMIT))
-            .setFilterQuery(readableConfig.get(DORIS_FILTER_QUERY))
-            .setReadFields(readableConfig.get(DORIS_READ_FIELD))
-            
.setRequestQueryTimeoutS(readableConfig.get(DORIS_REQUEST_QUERY_TIMEOUT_S))
-            .setRequestBatchSize(readableConfig.get(DORIS_BATCH_SIZE))
-            
.setRequestConnectTimeoutMs(readableConfig.get(DORIS_REQUEST_CONNECT_TIMEOUT_MS))
-            
.setRequestReadTimeoutMs(readableConfig.get(DORIS_REQUEST_READ_TIMEOUT_MS))
-            .setRequestRetries(readableConfig.get(DORIS_REQUEST_RETRIES))
-            .setRequestTabletSize(readableConfig.get(DORIS_TABLET_SIZE));
+                
.setDeserializeQueueSize(readableConfig.get(DORIS_DESERIALIZE_QUEUE_SIZE))
+                .setExecMemLimit(readableConfig.get(DORIS_EXEC_MEM_LIMIT))
+                .setFilterQuery(readableConfig.get(DORIS_FILTER_QUERY))
+                .setReadFields(readableConfig.get(DORIS_READ_FIELD))
+                
.setRequestQueryTimeoutS(readableConfig.get(DORIS_REQUEST_QUERY_TIMEOUT_S))
+                .setRequestBatchSize(readableConfig.get(DORIS_BATCH_SIZE))
+                
.setRequestConnectTimeoutMs(readableConfig.get(DORIS_REQUEST_CONNECT_TIMEOUT_MS))
+                
.setRequestReadTimeoutMs(readableConfig.get(DORIS_REQUEST_READ_TIMEOUT_MS))
+                .setRequestRetries(readableConfig.get(DORIS_REQUEST_RETRIES))
+                .setRequestTabletSize(readableConfig.get(DORIS_TABLET_SIZE));
         return builder.build();
     }
 
@@ -245,6 +234,7 @@ public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory
         builder.setMaxRetries(readableConfig.get(SINK_MAX_RETRIES));
         
builder.setBatchIntervalMs(readableConfig.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
         builder.setStreamLoadProp(streamLoadProp);
+        builder.setEnableDelete(readableConfig.get(SINK_ENABLE_DELETE));
         return builder.build();
     }
 
diff --git 
a/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java 
b/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
index 3c0c6df..cccdb45 100644
--- a/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
+++ b/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
@@ -48,23 +48,23 @@ public class DorisDynamicTableSink implements 
DynamicTableSink {
     @Override
     public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
         return ChangelogMode.newBuilder()
-            .addContainedKind(RowKind.INSERT)
-            .addContainedKind(RowKind.DELETE)
-            .addContainedKind(RowKind.UPDATE_AFTER)
-            .build();
+                .addContainedKind(RowKind.INSERT)
+                .addContainedKind(RowKind.DELETE)
+                .addContainedKind(RowKind.UPDATE_AFTER)
+                .build();
     }
 
     @Override
     public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
         DorisDynamicOutputFormat.Builder builder = 
DorisDynamicOutputFormat.builder()
-            .setFenodes(options.getFenodes())
-            .setUsername(options.getUsername())
-            .setPassword(options.getPassword())
-            .setTableIdentifier(options.getTableIdentifier())
-            .setReadOptions(readOptions)
-            .setExecutionOptions(executionOptions)
-            .setFieldDataTypes(tableSchema.getFieldDataTypes())
-            .setFieldNames(tableSchema.getFieldNames());
+                .setFenodes(options.getFenodes())
+                .setUsername(options.getUsername())
+                .setPassword(options.getPassword())
+                .setTableIdentifier(options.getTableIdentifier())
+                .setReadOptions(readOptions)
+                .setExecutionOptions(executionOptions)
+                .setFieldDataTypes(tableSchema.getFieldDataTypes())
+                .setFieldNames(tableSchema.getFieldNames());
         return OutputFormatProvider.of(builder.build());
     }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to