This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 748604f [RoutineLoad] Support alter broker list and topic for kafka routine load (#6335) 748604f is described below commit 748604ff4f3d8693d1f91cdb67f6637491fde5b8 Author: Mingyu Chen <morningman....@gmail.com> AuthorDate: Tue Aug 3 11:58:38 2021 +0800 [RoutineLoad] Support alter broker list and topic for kafka routine load (#6335) ``` alter routine load for cmy2 from kafka("kafka_broker_list" = "ip2:9094", "kafka_topic" = "my_topic"); ``` This is useful when the kafka broker list or topic has been changed. Also modify `show create routine load`, support showing "kafka_partitions" and "kafka_offsets". --- .../Data Manipulation/SHOW CREATE ROUTINE LOAD.md | 7 +++-- .../Data Manipulation/alter-routine-load.md | 4 ++- .../Data Manipulation/SHOW CREATE ROUTINE LOAD.md | 6 ++-- .../Data Manipulation/alter-routine-load.md | 4 ++- .../analysis/RoutineLoadDataSourceProperties.java | 2 ++ .../java/org/apache/doris/analysis/Separator.java | 4 +++ .../doris/analysis/ShowCreateRoutineLoadStmt.java | 6 ++-- .../doris/load/routineload/KafkaProgress.java | 27 ++++++++++++++++-- .../load/routineload/KafkaRoutineLoadJob.java | 18 ++++++++---- .../doris/load/routineload/RoutineLoadJob.java | 32 ++++++++++++++++------ .../load/routineload/RoutineLoadProgress.java | 2 +- .../doris/analysis/AlterRoutineLoadStmtTest.java | 4 +-- .../RoutineLoadDataSourcePropertiesTest.java | 4 +-- .../doris/load/routineload/RoutineLoadJobTest.java | 20 ++++++++------ 14 files changed, 101 insertions(+), 39 deletions(-) diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW CREATE ROUTINE LOAD.md b/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW CREATE ROUTINE LOAD.md index ce783c5..499b82c 100644 --- a/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW CREATE ROUTINE LOAD.md +++ b/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW CREATE ROUTINE LOAD.md @@ -26,7 +26,10 @@ under the License. # SHOW CREATE ROUTINE LOAD ## description - The statement is used to show the routine load job creation statement of user-defined + The statement is used to show the routine load job creation statement of user-defined. + + The kafka partition and offset in the result show the currently consumed partition and the corresponding offset to be consumed. + grammar: SHOW [ALL] CREATE ROUTINE LOAD for load_name; @@ -39,4 +42,4 @@ under the License. SHOW CREATE ROUTINE LOAD for test_load ## keyword - SHOW,CREATE,ROUTINE,LOAD \ No newline at end of file + SHOW,CREATE,ROUTINE,LOAD diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/alter-routine-load.md b/docs/en/sql-reference/sql-statements/Data Manipulation/alter-routine-load.md index 41b5e0e..bc22697 100644 --- a/docs/en/sql-reference/sql-statements/Data Manipulation/alter-routine-load.md +++ b/docs/en/sql-reference/sql-statements/Data Manipulation/alter-routine-load.md @@ -72,7 +72,9 @@ Syntax: 1. `kafka_partitions` 2. `kafka_offsets` - 3. Custom property, such as `property.group.id` + 3. `kafka_broker_list` + 4. `kafka_topic` + 5. Custom property, such as `property.group.id` Notice: diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW CREATE ROUTINE LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW CREATE ROUTINE LOAD.md index 922db38..4809962 100644 --- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW CREATE ROUTINE LOAD.md +++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW CREATE ROUTINE LOAD.md @@ -26,7 +26,9 @@ under the License. # SHOW CREATE ROUTINE LOAD ## description - 该语句用于展示例行导入作业的创建语句 + 该语句用于展示例行导入作业的创建语句。 + 结果中的 kafka partition 和 offset 展示的当前消费的 partition,以及对应的待消费的 offset。 + 语法: SHOW [ALL] CREATE ROUTINE LOAD for load_name; @@ -39,4 +41,4 @@ under the License. SHOW CREATE ROUTINE LOAD for test_load ## keyword - SHOW,CREATE,ROUTINE,LOAD \ No newline at end of file + SHOW,CREATE,ROUTINE,LOAD diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/alter-routine-load.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/alter-routine-load.md index 7541136..52544a5 100644 --- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/alter-routine-load.md +++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/alter-routine-load.md @@ -76,7 +76,9 @@ under the License. 1. `kafka_partitions` 2. `kafka_offsets` - 3. 自定义 property,如 `property.group.id` + 3. `kafka_broker_list` + 4. `kafka_topic` + 5. 自定义 property,如 `property.group.id` 注: diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/RoutineLoadDataSourceProperties.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/RoutineLoadDataSourceProperties.java index 1a10d66..41d3a7b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/RoutineLoadDataSourceProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/RoutineLoadDataSourceProperties.java @@ -51,6 +51,8 @@ public class RoutineLoadDataSourceProperties { .build(); private static final ImmutableSet<String> CONFIGURABLE_DATA_SOURCE_PROPERTIES_SET = new ImmutableSet.Builder<String>() + .add(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY) + .add(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY) .add(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY) .add(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY) .add(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Separator.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Separator.java index 11404b5..217c06f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Separator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Separator.java @@ -34,6 +34,10 @@ public class Separator implements ParseNode { this.separator = null; } + public String getOriSeparator() { + return oriSeparator; + } + public String getSeparator() { return separator; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowCreateRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowCreateRoutineLoadStmt.java index d0ae92b..7450c08 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowCreateRoutineLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowCreateRoutineLoadStmt.java @@ -27,9 +27,9 @@ public class ShowCreateRoutineLoadStmt extends ShowStmt { private static final ShowResultSetMetaData META_DATA = ShowResultSetMetaData.builder() - .addColumn(new Column("Routine Load Id", ScalarType.createVarchar(20))) - .addColumn(new Column("Routine Load Name", ScalarType.createVarchar(20))) - .addColumn(new Column("Create Routine Load", ScalarType.createVarchar(30))) + .addColumn(new Column("JobId", ScalarType.createVarchar(128))) + .addColumn(new Column("JobName", ScalarType.createVarchar(128))) + .addColumn(new Column("CreateStmt", ScalarType.createVarchar(65535))) .build(); private final LabelName labelName; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java index 6d704d9..6c88b5b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java @@ -22,13 +22,14 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.thrift.TKafkaRLTaskProgress; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import com.google.common.base.Joiner; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.gson.Gson; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -128,6 +129,26 @@ public class KafkaProgress extends RoutineLoadProgress { } } + public List<Pair<Integer, String>> getPartitionOffsetPairs(boolean alreadyConsumed) { + List<Pair<Integer, String>> pairs = Lists.newArrayList(); + for (Map.Entry<Integer, Long> entry : partitionIdToOffset.entrySet()) { + if (entry.getValue() == 0) { + pairs.add(Pair.create(entry.getKey(), OFFSET_ZERO)); + } else if (entry.getValue() == -1) { + pairs.add(Pair.create(entry.getKey(), OFFSET_END)); + } else if (entry.getValue() == -2) { + pairs.add(Pair.create(entry.getKey(), OFFSET_BEGINNING)); + } else { + long offset = entry.getValue(); + if (alreadyConsumed) { + offset -= 1; + } + pairs.add(Pair.create(entry.getKey(), "" + offset)); + } + } + return pairs; + } + @Override public String toString() { Map<Integer, String> showPartitionIdToOffset = Maps.newHashMap(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index 0496173..ead99ea 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -55,15 +55,16 @@ import com.google.gson.GsonBuilder; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.parquet.Strings; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.HashMap; import java.util.TimeZone; import java.util.UUID; @@ -554,7 +555,6 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { throw new DdlException("Only supports modification of PAUSED jobs"); } - modifyPropertiesInternal(jobProperties, dataSourceProperties); AlterRoutineLoadJobOperationLog log = new AlterRoutineLoadJobOperationLog(this.id, @@ -593,15 +593,23 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { ((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets); } + if (!customKafkaProperties.isEmpty()) { + this.customProperties.putAll(customKafkaProperties); + convertCustomProperties(true); + } + if (!jobProperties.isEmpty()) { Map<String, String> copiedJobProperties = Maps.newHashMap(jobProperties); modifyCommonJobProperties(copiedJobProperties); this.jobProperties.putAll(copiedJobProperties); } - if (!customKafkaProperties.isEmpty()) { - this.customProperties.putAll(customKafkaProperties); - convertCustomProperties(true); + // modify broker list and topic + if (!Strings.isNullOrEmpty(dataSourceProperties.getKafkaBrokerList())) { + this.brokerList = dataSourceProperties.getKafkaBrokerList(); + } + if (!Strings.isNullOrEmpty(dataSourceProperties.getKafkaTopic())) { + this.topic = dataSourceProperties.getKafkaTopic(); } LOG.info("modify the properties of kafka routine load job: {}, jobProperties: {}, datasource properties: {}", diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index a9b7083..a758d65 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -39,6 +39,7 @@ import org.apache.doris.common.ErrorReport; import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.InternalErrorCode; import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; @@ -1325,10 +1326,10 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl // 4.load_properties // 4.1.column_separator if (columnSeparator != null) { - sb.append("COLUMNS TERMINATED BY \"").append(columnSeparator.getSeparator()).append("\",\n"); + sb.append("COLUMNS TERMINATED BY \"").append(columnSeparator.getOriSeparator()).append("\",\n"); } // 4.2.columns_mapping - if (columnDescs != null) { + if (columnDescs != null && !columnDescs.descs.isEmpty()) { sb.append("COLUMNS(").append(Joiner.on(",").join(columnDescs.descs)).append("),\n"); } // 4.3.where_predicates @@ -1352,22 +1353,25 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl sb.append("PRECEDING FILTER ").append(precedingFilter.toSql()).append(",\n"); } // remove the last , - if (",".equals(sb.charAt(sb.length() - 2))) { + if (sb.charAt(sb.length() - 2) == ',') { sb.replace(sb.length() - 2, sb.length() - 1, ""); } - // 5.job_properties + // 5.job_properties. See PROPERTIES_SET of CreateRoutineLoadStmt sb.append("PROPERTIES\n(\n"); appendProperties(sb, CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, desireTaskConcurrentNum, false); + appendProperties(sb, CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, maxErrorNum, false); appendProperties(sb, CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY, maxBatchIntervalS, false); appendProperties(sb, CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY, maxBatchRows, false); appendProperties(sb, CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY, maxBatchSizeBytes, false); - appendProperties(sb, CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, maxErrorNum, false); - appendProperties(sb, LoadStmt.STRICT_MODE, isStrictMode(), false); - appendProperties(sb, LoadStmt.TIMEZONE, getTimezone(), false); appendProperties(sb, PROPS_FORMAT, getFormat(), false); appendProperties(sb, PROPS_JSONPATHS, getJsonPaths(), false); appendProperties(sb, PROPS_STRIP_OUTER_ARRAY, isStripOuterArray(), false); + appendProperties(sb, PROPS_NUM_AS_STRING, isNumAsString(), false); + appendProperties(sb, PROPS_FUZZY_PARSE, isFuzzyParse(), false); appendProperties(sb, PROPS_JSONROOT, getJsonRoot(), true); + appendProperties(sb, LoadStmt.STRICT_MODE, isStrictMode(), false); + appendProperties(sb, LoadStmt.TIMEZONE, getTimezone(), false); + appendProperties(sb, LoadStmt.EXEC_MEM_LIMIT, getMemLimit(), true); sb.append(")\n"); // 6. data_source sb.append("FROM ").append(dataSourceType).append("\n"); @@ -1375,13 +1379,25 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl sb.append("(\n"); getDataSourceProperties().forEach((k, v) -> appendProperties(sb, k, v, false)); getCustomProperties().forEach((k, v) -> appendProperties(sb, k, v, false)); - // remove the last , + if (progress instanceof KafkaProgress) { + // append partitions and offsets. + // the offsets is the next offset to be consumed. + List<Pair<Integer, String>> pairs = ((KafkaProgress) progress).getPartitionOffsetPairs(false); + appendProperties(sb, CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, + Joiner.on(", ").join(pairs.stream().map(p -> p.first).toArray()), false); + appendProperties(sb, CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, + Joiner.on(", ").join(pairs.stream().map(p -> p.second).toArray()), false); + } + // remove the last "," sb.replace(sb.length() - 2, sb.length() - 1, ""); sb.append(");"); return sb.toString(); } private static void appendProperties(StringBuilder sb, String key, Object value, boolean end) { + if (value == null || Strings.isNullOrEmpty(value.toString())) { + return; + } sb.append("\"").append(key).append("\"").append(" = ").append("\"").append(value).append("\""); if (!end) { sb.append(",\n"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java index bf746a6..224dd54 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java @@ -40,7 +40,7 @@ public abstract class RoutineLoadProgress implements Writable { abstract void update(RLTaskTxnCommitAttachment attachment); abstract String toJsonString(); - + public static RoutineLoadProgress read(DataInput in) throws IOException { RoutineLoadProgress progress = null; LoadDataSourceType type = LoadDataSourceType.valueOf(Text.readString(in)); diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/AlterRoutineLoadStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/AlterRoutineLoadStmtTest.java index f9ddb688..0af88cd 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/AlterRoutineLoadStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/AlterRoutineLoadStmtTest.java @@ -125,6 +125,7 @@ public class AlterRoutineLoadStmtTest { } } + // alter topic is now supported { Map<String, String> jobProperties = Maps.newHashMap(); jobProperties.put(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, "100"); @@ -138,9 +139,8 @@ public class AlterRoutineLoadStmtTest { try { stmt.analyze(analyzer); - Assert.fail(); } catch (AnalysisException e) { - Assert.assertTrue(e.getMessage().contains("kafka_topic is invalid kafka property")); + Assert.fail(); } catch (UserException e) { e.printStackTrace(); Assert.fail(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/RoutineLoadDataSourcePropertiesTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/RoutineLoadDataSourcePropertiesTest.java index bcadc90..0acdf04 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/RoutineLoadDataSourcePropertiesTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/RoutineLoadDataSourcePropertiesTest.java @@ -291,7 +291,7 @@ public class RoutineLoadDataSourcePropertiesTest { @Test public void testAlterAbnormal() { - // can not set KAFKA_BROKER_LIST_PROPERTY + // now support set KAFKA_BROKER_LIST_PROPERTY Map<String, String> properties = Maps.newHashMap(); properties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, "127.0.0.1:8080"); properties.put("property." + CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS, "-1"); @@ -301,7 +301,7 @@ public class RoutineLoadDataSourcePropertiesTest { dsProperties.analyze(); Assert.fail(); } catch (UserException e) { - Assert.assertTrue(e.getMessage().contains("kafka_broker_list is invalid kafka property")); + Assert.assertTrue(e.getMessage().contains("kafka_default_offsets can only be set to OFFSET_BEGINNING, OFFSET_END or date time")); } // can not set datetime formatted offset and integer offset together diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java index 23aeb17..7f0676f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java @@ -31,14 +31,14 @@ import org.apache.doris.thrift.TKafkaRLTaskProgress; import org.apache.doris.transaction.TransactionException; import org.apache.doris.transaction.TransactionState; -import org.apache.kafka.common.PartitionInfo; -import org.junit.Assert; -import org.junit.Test; - import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.kafka.common.PartitionInfo; +import org.junit.Assert; +import org.junit.Test; + import java.util.List; import java.util.Map; @@ -328,22 +328,24 @@ public class RoutineLoadJobTest { "PROPERTIES\n" + "(\n" + "\"desired_concurrent_number\" = \"0\",\n" + + "\"max_error_number\" = \"10\",\n" + "\"max_batch_interval\" = \"10\",\n" + "\"max_batch_rows\" = \"10\",\n" + "\"max_batch_size\" = \"104857600\",\n" + - "\"max_error_number\" = \"10\",\n" + - "\"strict_mode\" = \"false\",\n" + - "\"timezone\" = \"Asia/Shanghai\",\n" + "\"format\" = \"csv\",\n" + - "\"jsonpaths\" = \"\",\n" + "\"strip_outer_array\" = \"false\",\n" + - "\"json_root\" = \"\"\n" + + "\"num_as_string\" = \"false\",\n" + + "\"fuzzy_parse\" = \"false\",\n" + + "\"strict_mode\" = \"false\",\n" + + "\"timezone\" = \"Asia/Shanghai\",\n" + + "\"exec_mem_limit\" = \"2147483648\"\n" + ")\n" + "FROM KAFKA\n" + "(\n" + "\"kafka_broker_list\" = \"localhost:9092\",\n" + "\"kafka_topic\" = \"test_topic\"\n" + ");"; + System.out.println(showCreateInfo); Assert.assertEquals(expect, showCreateInfo); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org