chuang-wang-pre commented on code in PR #85:
URL:
https://github.com/apache/doris-kafka-connector/pull/85#discussion_r2265899547
##########
src/main/java/org/apache/doris/kafka/connector/cfg/DorisSinkConnectorConfig.java:
##########
@@ -212,100 +215,145 @@ public static ConfigDef newConfigDef() {
Importance.HIGH,
"Doris database name",
DORIS_INFO,
- 6,
+ 5,
ConfigDef.Width.NONE,
DORIS_DATABASE)
.define(
TOPICS_TABLES_MAP,
Type.STRING,
"",
topicToTableValidator,
- Importance.LOW,
+ Importance.HIGH,
"Map of topics to tables (optional). Format :
comma-separated tuples, e.g."
+ "
<topic-1>:<table-1>,<topic-2>:<table-2>,... ",
CONNECTOR_CONFIG,
- 0,
+ 1,
ConfigDef.Width.NONE,
TOPICS_TABLES_MAP)
.define(
BUFFER_COUNT_RECORDS,
Type.LONG,
BUFFER_COUNT_RECORDS_DEFAULT,
ConfigDef.Range.atLeast(1),
- Importance.LOW,
- "Number of records buffered in memory per partition
before triggering",
+ Importance.HIGH,
+ "Number of records buffered in memory before
triggering",
CONNECTOR_CONFIG,
- 1,
+ 2,
ConfigDef.Width.NONE,
BUFFER_COUNT_RECORDS)
.define(
BUFFER_SIZE_BYTES,
Type.LONG,
BUFFER_SIZE_BYTES_DEFAULT,
ConfigDef.Range.atLeast(1),
- Importance.LOW,
- "Cumulative size of records buffered in memory per
partition before triggering",
+ Importance.HIGH,
+ "Cumulative size of records buffered in memory before
triggering",
CONNECTOR_CONFIG,
- 2,
+ 3,
ConfigDef.Width.NONE,
BUFFER_SIZE_BYTES)
.define(
BUFFER_FLUSH_TIME_SEC,
Type.LONG,
BUFFER_FLUSH_TIME_SEC_DEFAULT,
ConfigDef.Range.atLeast(Duration.ofSeconds(1).getSeconds()),
- Importance.LOW,
+ Importance.HIGH,
"The time in seconds to flush cached data",
CONNECTOR_CONFIG,
- 3,
+ 4,
ConfigDef.Width.NONE,
BUFFER_FLUSH_TIME_SEC)
+ .define(
+ ENABLE_COMBINE_FLUSH,
+ Type.BOOLEAN,
+ ENABLE_COMBINE_FLUSH_DEFAULT,
+ Importance.HIGH,
+ "Whether to merge data from all partitions together
and write them. The default value is false. When enabled, only at_least_once
semantics are guaranteed.",
+ CONNECTOR_CONFIG,
+ 5,
+ ConfigDef.Width.NONE,
+ ENABLE_COMBINE_FLUSH)
+ .define(
+ DELIVERY_GUARANTEE,
+ Type.STRING,
+ DELIVERY_GUARANTEE_DEFAULT,
+ Importance.MEDIUM,
+ "How to ensure data consistency when consuming Kafka
data is imported into Doris. Supports at_least_once exactly_once, default is
at_least_once. Doris needs to be upgraded to 2.1.0 or above to ensure data
exactly_once",
+ CONNECTOR_CONFIG,
+ 6,
+ ConfigDef.Width.NONE,
+ DELIVERY_GUARANTEE,
+ EnumRecommender.in(DeliveryGuarantee.values()))
.define(
RECORD_TABLE_NAME_FIELD,
Type.STRING,
null,
Importance.LOW,
"The field name of record, and use this field value as
the table name to be written",
CONNECTOR_CONFIG,
- 4,
+ 7,
ConfigDef.Width.NONE,
RECORD_TABLE_NAME_FIELD)
+ // debezium config
.define(
- JMX_OPT,
- ConfigDef.Type.BOOLEAN,
- JMX_OPT_DEFAULT,
- ConfigDef.Importance.HIGH,
- "Whether to enable JMX MBeans for custom metrics")
+ CONVERTER_MODE,
+ Type.STRING,
+ CONVERT_MODE_DEFAULT,
+ Importance.LOW,
+ "Type conversion mode of upstream data when using
Connector to consume Kafka data.\n"
+ + "normal means consuming data in Kafka
normally without any type conversion.\n"
+ + "debezium_ingestion means that when Kafka
upstream data is collected through CDC (Changelog Data Capture) tools such as
Debezium, the upstream data needs to undergo special type conversion to support
it.",
+ DEBEZIUM_CONFIG,
+ 1,
+ ConfigDef.Width.NONE,
+ CONVERTER_MODE)
.define(
ENABLE_DELETE,
ConfigDef.Type.BOOLEAN,
ENABLE_DELETE_DEFAULT,
- ConfigDef.Importance.HIGH,
- "Used to synchronize delete events")
- .define(
- LOAD_MODEL,
- Type.STRING,
- LOAD_MODEL_DEFAULT,
- Importance.HIGH,
- "load model is stream_load.")
+ Importance.LOW,
+ "Under Debezium synchronization, whether to
synchronize deletion events. Non-Debezium messages need to be marked with
deletions themselves.",
+ DEBEZIUM_CONFIG,
+ 2,
+ ConfigDef.Width.NONE,
+ ENABLE_DELETE)
+ // Retries
.define(
MAX_RETRIES,
Type.INT,
MAX_RETRIES_DEFAULT,
Importance.MEDIUM,
- "The maximum number of times to retry on errors before
failing the task.")
+ "The maximum number of times to retry on errors before
failing the task.",
+ RETRIES_GROUP,
+ 1,
+ ConfigDef.Width.NONE,
+ MAX_RETRIES)
.define(
RETRY_INTERVAL_MS,
Type.INT,
RETRY_INTERVAL_MS_DEFAULT,
Importance.MEDIUM,
- "The time in milliseconds to wait following an error
before a retry attempt is made.")
+ "The time in milliseconds to wait following an error
before a retry attempt is made.",
+ RETRIES_GROUP,
+ 2,
+ ConfigDef.Width.NONE,
+ RETRY_INTERVAL_MS)
.define(
BEHAVIOR_ON_NULL_VALUES,
Type.STRING,
BEHAVIOR_ON_NULL_VALUES_DEFAULT,
Importance.LOW,
- "Used to handle records with a null value .");
+ "Used to handle records with a null value.",
+ CONNECTOR_CONFIG,
+ 18,
Review Comment:
Maybe this order number is also need to change and put it together with
other configurations in the 'CONNECTOR_CONFIG' group
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]