This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git
commit 021e281e0e283e7c0c862ceaf878cc6b7e83de5b Author: wudi <676366...@qq.com> AuthorDate: Sun Jan 23 20:24:41 2022 +0800 [Feature][flink-connector] support flink delete option (#7457) * Flink Connector supports delete option on Unique models Co-authored-by: wudi <w...@shuhaisc.com> --- .../doris/flink/cfg/DorisExecutionOptions.java | 38 +++-- .../apache/doris/flink/cfg/DorisStreamOptions.java | 22 +-- .../org/apache/doris/flink/rest/models/Schema.java | 9 ++ .../flink/table/DorisDynamicOutputFormat.java | 101 +++++++++--- .../flink/table/DorisDynamicTableFactory.java | 176 ++++++++++----------- .../doris/flink/table/DorisDynamicTableSink.java | 24 +-- 6 files changed, 218 insertions(+), 152 deletions(-) diff --git a/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java index 47bb517..ad1ab07 100644 --- a/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java +++ b/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java @@ -26,8 +26,8 @@ import java.util.Properties; * JDBC sink batch options. */ public class DorisExecutionOptions implements Serializable { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 1L; public static final Integer DEFAULT_BATCH_SIZE = 10000; public static final Integer DEFAULT_MAX_RETRY_TIMES = 1; private static final Long DEFAULT_INTERVAL_MILLIS = 10000L; @@ -41,12 +41,27 @@ public class DorisExecutionOptions implements Serializable { */ private final Properties streamLoadProp; - public DorisExecutionOptions(Integer batchSize, Integer maxRetries, Long batchIntervalMs, Properties streamLoadProp) { + private final Boolean enableDelete; + + + public DorisExecutionOptions(Integer batchSize, Integer maxRetries, Long batchIntervalMs, Properties streamLoadProp, Boolean enableDelete) { Preconditions.checkArgument(maxRetries >= 0); this.batchSize = batchSize; this.maxRetries = maxRetries; this.batchIntervalMs = batchIntervalMs; this.streamLoadProp = streamLoadProp; + this.enableDelete = enableDelete; + } + + public static Builder builder() { + return new Builder(); + } + + public static DorisExecutionOptions defaults() { + Properties pro = new Properties(); + pro.setProperty("format", "json"); + pro.setProperty("strip_outer_array", "true"); + return new Builder().setStreamLoadProp(pro).build(); } public Integer getBatchSize() { @@ -65,15 +80,8 @@ public class DorisExecutionOptions implements Serializable { return streamLoadProp; } - public static Builder builder() { - return new Builder(); - } - - public static DorisExecutionOptions defaults() { - Properties pro = new Properties(); - pro.setProperty("format", "json"); - pro.setProperty("strip_outer_array", "true"); - return new Builder().setStreamLoadProp(pro).build(); + public Boolean getEnableDelete() { + return enableDelete; } /** @@ -84,6 +92,7 @@ public class DorisExecutionOptions implements Serializable { private Integer maxRetries = DEFAULT_MAX_RETRY_TIMES; private Long batchIntervalMs = DEFAULT_INTERVAL_MILLIS; private Properties streamLoadProp = new Properties(); + private Boolean enableDelete = false; public Builder setBatchSize(Integer batchSize) { this.batchSize = batchSize; @@ -105,8 +114,13 @@ public class DorisExecutionOptions implements Serializable { return this; } + public Builder setEnableDelete(Boolean enableDelete) { + this.enableDelete = enableDelete; + return this; + } + public DorisExecutionOptions build() { - return new DorisExecutionOptions(batchSize, maxRetries, batchIntervalMs, streamLoadProp); + return new DorisExecutionOptions(batchSize, maxRetries, batchIntervalMs, streamLoadProp, enableDelete); } } diff --git a/src/main/java/org/apache/doris/flink/cfg/DorisStreamOptions.java b/src/main/java/org/apache/doris/flink/cfg/DorisStreamOptions.java index 016b29a..c5c2c16 100644 --- a/src/main/java/org/apache/doris/flink/cfg/DorisStreamOptions.java +++ b/src/main/java/org/apache/doris/flink/cfg/DorisStreamOptions.java @@ -32,8 +32,8 @@ public class DorisStreamOptions implements Serializable { private DorisReadOptions readOptions; public DorisStreamOptions(Properties prop) { - this.prop = prop; - init(); + this.prop = prop; + init(); } /** @@ -47,17 +47,17 @@ public class DorisStreamOptions implements Serializable { .setTableIdentifier(prop.getProperty(ConfigurationOptions.TABLE_IDENTIFIER)); DorisReadOptions.Builder readOptionsBuilder = DorisReadOptions.builder() - .setDeserializeArrowAsync(Boolean.valueOf(prop.getProperty(ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC,ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT.toString()))) - .setDeserializeQueueSize(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE,ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT.toString()))) - .setExecMemLimit(Long.valueOf(prop.getProperty(ConfigurationOptions.DORIS_EXEC_MEM_LIMIT,ConfigurationOptions.DORIS_EXEC_MEM_LIMIT_DEFAULT.toString()))) + .setDeserializeArrowAsync(Boolean.valueOf(prop.getProperty(ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC, ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT.toString()))) + .setDeserializeQueueSize(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE, ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT.toString()))) + .setExecMemLimit(Long.valueOf(prop.getProperty(ConfigurationOptions.DORIS_EXEC_MEM_LIMIT, ConfigurationOptions.DORIS_EXEC_MEM_LIMIT_DEFAULT.toString()))) .setFilterQuery(prop.getProperty(ConfigurationOptions.DORIS_FILTER_QUERY)) .setReadFields(prop.getProperty(ConfigurationOptions.DORIS_READ_FIELD)) - .setRequestQueryTimeoutS(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S,ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT.toString()))) - .setRequestBatchSize(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_BATCH_SIZE,ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT.toString()))) - .setRequestConnectTimeoutMs(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS,ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT.toString()))) - .setRequestReadTimeoutMs(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS,ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT.toString()))) - .setRequestRetries(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_REQUEST_RETRIES,ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT.toString()))) - .setRequestTabletSize(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_TABLET_SIZE,ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT.toString()))); + .setRequestQueryTimeoutS(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S, ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT.toString()))) + .setRequestBatchSize(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_BATCH_SIZE, ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT.toString()))) + .setRequestConnectTimeoutMs(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS, ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT.toString()))) + .setRequestReadTimeoutMs(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS, ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT.toString()))) + .setRequestRetries(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_REQUEST_RETRIES, ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT.toString()))) + .setRequestTabletSize(Integer.valueOf(prop.getProperty(ConfigurationOptions.DORIS_TABLET_SIZE, ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT.toString()))); this.options = optionsBuilder.build(); this.readOptions = readOptionsBuilder.build(); diff --git a/src/main/java/org/apache/doris/flink/rest/models/Schema.java b/src/main/java/org/apache/doris/flink/rest/models/Schema.java index 314aa65..e274352 100644 --- a/src/main/java/org/apache/doris/flink/rest/models/Schema.java +++ b/src/main/java/org/apache/doris/flink/rest/models/Schema.java @@ -23,6 +23,7 @@ import java.util.Objects; public class Schema { private int status = 0; + private String keysType; private List<Field> properties; public Schema() { @@ -41,6 +42,14 @@ public class Schema { this.status = status; } + public String getKeysType() { + return keysType; + } + + public void setKeysType(String keysType) { + this.keysType = keysType; + } + public List<Field> getProperties() { return properties; } diff --git a/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java b/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java index 2a1cec4..44e0a6a 100644 --- a/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java +++ b/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java @@ -23,12 +23,14 @@ import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.exception.DorisException; import org.apache.doris.flink.exception.StreamLoadException; import org.apache.doris.flink.rest.RestService; +import org.apache.doris.flink.rest.models.Schema; import org.apache.flink.api.common.io.RichOutputFormat; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.util.ExecutorThreadFactory; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.types.RowKind; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,6 +48,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; import static org.apache.flink.table.data.RowData.createFieldGetter; @@ -58,6 +61,7 @@ public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> { private static final Logger LOG = LoggerFactory.getLogger(DorisDynamicOutputFormat.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final String COLUMNS_KEY = "columns"; private static final String FIELD_DELIMITER_KEY = "column_separator"; private static final String FIELD_DELIMITER_DEFAULT = "\t"; private static final String LINE_DELIMITER_KEY = "line_delimiter"; @@ -67,20 +71,21 @@ public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> { private static final String NULL_VALUE = "\\N"; private static final String ESCAPE_DELIMITERS_KEY = "escape_delimiters"; private static final String ESCAPE_DELIMITERS_DEFAULT = "false"; - - private String fieldDelimiter; - private String lineDelimiter; + private static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__"; + private static final String UNIQUE_KEYS_TYPE = "UNIQUE_KEYS"; private final String[] fieldNames; private final boolean jsonFormat; + private final RowData.FieldGetter[] fieldGetters; + private final List batch = new ArrayList<>(); + private String fieldDelimiter; + private String lineDelimiter; private DorisOptions options; private DorisReadOptions readOptions; private DorisExecutionOptions executionOptions; private DorisStreamLoad dorisStreamLoad; - private final RowData.FieldGetter[] fieldGetters; + private String keysType; - private final List batch = new ArrayList<>(); private transient volatile boolean closed = false; - private transient ScheduledExecutorService scheduler; private transient ScheduledFuture<?> scheduledFuture; private transient volatile Exception flushException; @@ -93,9 +98,42 @@ public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> { this.options = option; this.readOptions = readOptions; this.executionOptions = executionOptions; + this.fieldNames = fieldNames; + this.jsonFormat = FORMAT_JSON_VALUE.equals(executionOptions.getStreamLoadProp().getProperty(FORMAT_KEY)); + this.keysType = parseKeysType(); - Properties streamLoadProp = executionOptions.getStreamLoadProp(); + handleStreamloadProp(); + this.fieldGetters = new RowData.FieldGetter[logicalTypes.length]; + for (int i = 0; i < logicalTypes.length; i++) { + fieldGetters[i] = createFieldGetter(logicalTypes[i], i); + } + } + + /** + * parse table keysType + * + * @return keysType + */ + private String parseKeysType() { + try { + Schema schema = RestService.getSchema(options, readOptions, LOG); + return schema.getKeysType(); + } catch (DorisException e) { + throw new RuntimeException("Failed fetch doris table schema: " + options.getTableIdentifier()); + } + } + + /** + * A builder used to set parameters to the output format's configuration in a fluent way. + * + * @return builder + */ + public static Builder builder() { + return new Builder(); + } + private void handleStreamloadProp() { + Properties streamLoadProp = executionOptions.getStreamLoadProp(); boolean ifEscape = Boolean.parseBoolean(streamLoadProp.getProperty(ESCAPE_DELIMITERS_KEY, ESCAPE_DELIMITERS_DEFAULT)); if (ifEscape) { this.fieldDelimiter = escapeString(streamLoadProp.getProperty(FIELD_DELIMITER_KEY, @@ -113,11 +151,13 @@ public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> { LINE_DELIMITER_DEFAULT); } - this.fieldNames = fieldNames; - this.jsonFormat = FORMAT_JSON_VALUE.equals(executionOptions.getStreamLoadProp().getProperty(FORMAT_KEY)); - this.fieldGetters = new RowData.FieldGetter[logicalTypes.length]; - for (int i = 0; i < logicalTypes.length; i++) { - fieldGetters[i] = createFieldGetter(logicalTypes[i], i); + //add column key when fieldNames is not empty + if (!streamLoadProp.containsKey(COLUMNS_KEY) && fieldNames != null && fieldNames.length > 0) { + String columns = String.join(",", Arrays.stream(fieldNames).map(item -> String.format("`%s`", item.trim().replace("`", ""))).collect(Collectors.toList())); + if (enableBatchDelete()) { + columns = String.format("%s,%s", columns, DORIS_DELETE_SIGN); + } + streamLoadProp.put(COLUMNS_KEY, columns); } } @@ -133,6 +173,10 @@ public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> { return buf.toString(); } + private boolean enableBatchDelete() { + return executionOptions.getEnableDelete() || UNIQUE_KEYS_TYPE.equals(keysType); + } + @Override public void configure(Configuration configuration) { } @@ -195,9 +239,16 @@ public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> { value.add(data); } } + // add doris delete sign + if (enableBatchDelete()) { + if (jsonFormat) { + valueMap.put(DORIS_DELETE_SIGN, parseDeleteSign(rowData.getRowKind())); + } else { + value.add(parseDeleteSign(rowData.getRowKind())); + } + } Object data = jsonFormat ? valueMap : value.toString(); batch.add(data); - } else if (row instanceof String) { batch.add(row); } else { @@ -205,6 +256,17 @@ public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> { } } + private String parseDeleteSign(RowKind rowKind) { + if (RowKind.INSERT.equals(rowKind) || RowKind.UPDATE_AFTER.equals(rowKind)) { + return "0"; + } else if (RowKind.DELETE.equals(rowKind) || RowKind.UPDATE_BEFORE.equals(rowKind)) { + return "1"; + } else { + throw new RuntimeException("Unrecognized row kind:" + rowKind.toString()); + } + } + + @Override public synchronized void close() throws IOException { if (!closed) { @@ -264,7 +326,6 @@ public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> { } } - private String getBackend() throws IOException { try { //get be url from fe @@ -275,16 +336,6 @@ public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> { } } - - /** - * A builder used to set parameters to the output format's configuration in a fluent way. - * - * @return builder - */ - public static Builder builder() { - return new Builder(); - } - /** * Builder for {@link DorisDynamicOutputFormat}. */ @@ -348,5 +399,7 @@ public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> { optionsBuilder.build(), readOptions, executionOptions, logicalTypes, fieldNames ); } + + } } diff --git a/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java index 4654c0d..dbba859 100644 --- a/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java +++ b/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java @@ -60,99 +60,87 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory public static final ConfigOption<String> TABLE_IDENTIFIER = ConfigOptions.key("table.identifier").stringType().noDefaultValue().withDescription("the jdbc table name."); public static final ConfigOption<String> USERNAME = ConfigOptions.key("username").stringType().noDefaultValue().withDescription("the jdbc user name."); public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password").stringType().noDefaultValue().withDescription("the jdbc password."); - + // Prefix for Doris StreamLoad specific properties. + public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties."; // doris options private static final ConfigOption<String> DORIS_READ_FIELD = ConfigOptions - .key("doris.read.field") - .stringType() - .noDefaultValue() - .withDescription("List of column names in the Doris table, separated by commas"); - + .key("doris.read.field") + .stringType() + .noDefaultValue() + .withDescription("List of column names in the Doris table, separated by commas"); private static final ConfigOption<String> DORIS_FILTER_QUERY = ConfigOptions - .key("doris.filter.query") - .stringType() - .noDefaultValue() - .withDescription("Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering"); - + .key("doris.filter.query") + .stringType() + .noDefaultValue() + .withDescription("Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering"); private static final ConfigOption<Integer> DORIS_TABLET_SIZE = ConfigOptions - .key("doris.request.tablet.size") - .intType() - .defaultValue(DORIS_TABLET_SIZE_DEFAULT) - .withDescription(""); - + .key("doris.request.tablet.size") + .intType() + .defaultValue(DORIS_TABLET_SIZE_DEFAULT) + .withDescription(""); private static final ConfigOption<Integer> DORIS_REQUEST_CONNECT_TIMEOUT_MS = ConfigOptions - .key("doris.request.connect.timeout.ms") - .intType() - .defaultValue(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT) - .withDescription(""); - + .key("doris.request.connect.timeout.ms") + .intType() + .defaultValue(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT) + .withDescription(""); private static final ConfigOption<Integer> DORIS_REQUEST_READ_TIMEOUT_MS = ConfigOptions - .key("doris.request.read.timeout.ms") - .intType() - .defaultValue(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT) - .withDescription(""); - + .key("doris.request.read.timeout.ms") + .intType() + .defaultValue(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT) + .withDescription(""); private static final ConfigOption<Integer> DORIS_REQUEST_QUERY_TIMEOUT_S = ConfigOptions - .key("doris.request.query.timeout.s") - .intType() - .defaultValue(DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT) - .withDescription(""); - + .key("doris.request.query.timeout.s") + .intType() + .defaultValue(DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT) + .withDescription(""); private static final ConfigOption<Integer> DORIS_REQUEST_RETRIES = ConfigOptions - .key("doris.request.retries") - .intType() - .defaultValue(DORIS_REQUEST_RETRIES_DEFAULT) - .withDescription(""); - + .key("doris.request.retries") + .intType() + .defaultValue(DORIS_REQUEST_RETRIES_DEFAULT) + .withDescription(""); private static final ConfigOption<Boolean> DORIS_DESERIALIZE_ARROW_ASYNC = ConfigOptions - .key("doris.deserialize.arrow.async") - .booleanType() - .defaultValue(DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT) - .withDescription(""); - + .key("doris.deserialize.arrow.async") + .booleanType() + .defaultValue(DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT) + .withDescription(""); private static final ConfigOption<Integer> DORIS_DESERIALIZE_QUEUE_SIZE = ConfigOptions - .key("doris.request.retriesdoris.deserialize.queue.size") - .intType() - .defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT) - .withDescription(""); - - + .key("doris.request.retriesdoris.deserialize.queue.size") + .intType() + .defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT) + .withDescription(""); private static final ConfigOption<Integer> DORIS_BATCH_SIZE = ConfigOptions - .key("doris.batch.size") - .intType() - .defaultValue(DORIS_BATCH_SIZE_DEFAULT) - .withDescription(""); - + .key("doris.batch.size") + .intType() + .defaultValue(DORIS_BATCH_SIZE_DEFAULT) + .withDescription(""); private static final ConfigOption<Long> DORIS_EXEC_MEM_LIMIT = ConfigOptions - .key("doris.exec.mem.limit") - .longType() - .defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT) - .withDescription(""); - + .key("doris.exec.mem.limit") + .longType() + .defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT) + .withDescription(""); // flink write config options private static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS = ConfigOptions - .key("sink.batch.size") - .intType() - .defaultValue(100) - .withDescription("the flush max size (includes all append, upsert and delete records), over this number" + - " of records, will flush data. The default value is 100."); - + .key("sink.batch.size") + .intType() + .defaultValue(100) + .withDescription("the flush max size (includes all append, upsert and delete records), over this number" + + " of records, will flush data. The default value is 100."); private static final ConfigOption<Integer> SINK_MAX_RETRIES = ConfigOptions - .key("sink.max-retries") - .intType() - .defaultValue(3) - .withDescription("the max retry times if writing records to database failed."); - + .key("sink.max-retries") + .intType() + .defaultValue(3) + .withDescription("the max retry times if writing records to database failed."); private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL = ConfigOptions - .key("sink.batch.interval") - .durationType() - .defaultValue(Duration.ofSeconds(1)) - .withDescription("the flush interval mills, over this time, asynchronous threads will flush data. The " + - "default value is 1s."); - - - // Prefix for Doris StreamLoad specific properties. - public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties."; + .key("sink.batch.interval") + .durationType() + .defaultValue(Duration.ofSeconds(1)) + .withDescription("the flush interval mills, over this time, asynchronous threads will flush data. The " + + "default value is 1s."); + private static final ConfigOption<Boolean> SINK_ENABLE_DELETE = ConfigOptions + .key("sink.enable-delete") + .booleanType() + .defaultValue(true) + .withDescription("whether to enable the delete function"); @Override public String factoryIdentifier() { @@ -190,6 +178,7 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory options.add(SINK_BUFFER_FLUSH_MAX_ROWS); options.add(SINK_MAX_RETRIES); options.add(SINK_BUFFER_FLUSH_INTERVAL); + options.add(SINK_ENABLE_DELETE); return options; } @@ -199,7 +188,7 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory // or use the provided helper utility final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); // validate all options - helper.validate(); + helper.validateExcept(STREAM_LOAD_PROP_PREFIX); // get the validated options final ReadableConfig options = helper.getOptions(); // derive the produced data type (excluding computed columns) from the catalog table @@ -207,16 +196,16 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); // create and return dynamic table source return new DorisDynamicTableSource( - getDorisOptions(helper.getOptions()), - getDorisReadOptions(helper.getOptions()), - physicalSchema); + getDorisOptions(helper.getOptions()), + getDorisReadOptions(helper.getOptions()), + physicalSchema); } private DorisOptions getDorisOptions(ReadableConfig readableConfig) { final String fenodes = readableConfig.get(FENODES); final DorisOptions.Builder builder = DorisOptions.builder() - .setFenodes(fenodes) - .setTableIdentifier(readableConfig.get(TABLE_IDENTIFIER)); + .setFenodes(fenodes) + .setTableIdentifier(readableConfig.get(TABLE_IDENTIFIER)); readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername); readableConfig.getOptional(PASSWORD).ifPresent(builder::setPassword); @@ -226,16 +215,16 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory private DorisReadOptions getDorisReadOptions(ReadableConfig readableConfig) { final DorisReadOptions.Builder builder = DorisReadOptions.builder(); builder.setDeserializeArrowAsync(readableConfig.get(DORIS_DESERIALIZE_ARROW_ASYNC)) - .setDeserializeQueueSize(readableConfig.get(DORIS_DESERIALIZE_QUEUE_SIZE)) - .setExecMemLimit(readableConfig.get(DORIS_EXEC_MEM_LIMIT)) - .setFilterQuery(readableConfig.get(DORIS_FILTER_QUERY)) - .setReadFields(readableConfig.get(DORIS_READ_FIELD)) - .setRequestQueryTimeoutS(readableConfig.get(DORIS_REQUEST_QUERY_TIMEOUT_S)) - .setRequestBatchSize(readableConfig.get(DORIS_BATCH_SIZE)) - .setRequestConnectTimeoutMs(readableConfig.get(DORIS_REQUEST_CONNECT_TIMEOUT_MS)) - .setRequestReadTimeoutMs(readableConfig.get(DORIS_REQUEST_READ_TIMEOUT_MS)) - .setRequestRetries(readableConfig.get(DORIS_REQUEST_RETRIES)) - .setRequestTabletSize(readableConfig.get(DORIS_TABLET_SIZE)); + .setDeserializeQueueSize(readableConfig.get(DORIS_DESERIALIZE_QUEUE_SIZE)) + .setExecMemLimit(readableConfig.get(DORIS_EXEC_MEM_LIMIT)) + .setFilterQuery(readableConfig.get(DORIS_FILTER_QUERY)) + .setReadFields(readableConfig.get(DORIS_READ_FIELD)) + .setRequestQueryTimeoutS(readableConfig.get(DORIS_REQUEST_QUERY_TIMEOUT_S)) + .setRequestBatchSize(readableConfig.get(DORIS_BATCH_SIZE)) + .setRequestConnectTimeoutMs(readableConfig.get(DORIS_REQUEST_CONNECT_TIMEOUT_MS)) + .setRequestReadTimeoutMs(readableConfig.get(DORIS_REQUEST_READ_TIMEOUT_MS)) + .setRequestRetries(readableConfig.get(DORIS_REQUEST_RETRIES)) + .setRequestTabletSize(readableConfig.get(DORIS_TABLET_SIZE)); return builder.build(); } @@ -245,6 +234,7 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory builder.setMaxRetries(readableConfig.get(SINK_MAX_RETRIES)); builder.setBatchIntervalMs(readableConfig.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis()); builder.setStreamLoadProp(streamLoadProp); + builder.setEnableDelete(readableConfig.get(SINK_ENABLE_DELETE)); return builder.build(); } diff --git a/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java b/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java index 3c0c6df..cccdb45 100644 --- a/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java +++ b/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java @@ -48,23 +48,23 @@ public class DorisDynamicTableSink implements DynamicTableSink { @Override public ChangelogMode getChangelogMode(ChangelogMode changelogMode) { return ChangelogMode.newBuilder() - .addContainedKind(RowKind.INSERT) - .addContainedKind(RowKind.DELETE) - .addContainedKind(RowKind.UPDATE_AFTER) - .build(); + .addContainedKind(RowKind.INSERT) + .addContainedKind(RowKind.DELETE) + .addContainedKind(RowKind.UPDATE_AFTER) + .build(); } @Override public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { DorisDynamicOutputFormat.Builder builder = DorisDynamicOutputFormat.builder() - .setFenodes(options.getFenodes()) - .setUsername(options.getUsername()) - .setPassword(options.getPassword()) - .setTableIdentifier(options.getTableIdentifier()) - .setReadOptions(readOptions) - .setExecutionOptions(executionOptions) - .setFieldDataTypes(tableSchema.getFieldDataTypes()) - .setFieldNames(tableSchema.getFieldNames()); + .setFenodes(options.getFenodes()) + .setUsername(options.getUsername()) + .setPassword(options.getPassword()) + .setTableIdentifier(options.getTableIdentifier()) + .setReadOptions(readOptions) + .setExecutionOptions(executionOptions) + .setFieldDataTypes(tableSchema.getFieldDataTypes()) + .setFieldNames(tableSchema.getFieldNames()); return OutputFormatProvider.of(builder.build()); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org