This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch kafka_tvf in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/kafka_tvf by this push: new f5888d3f16a [feature](tvf) doris support kafka tvf (#31889) f5888d3f16a is described below commit f5888d3f16a138b0856feb2a761875c97ba6fef0 Author: nanfeng <nanfeng_...@163.com> AuthorDate: Wed Mar 6 22:43:47 2024 +0800 [feature](tvf) doris support kafka tvf (#31889) ``` mysql> SELECT * FROM kafka("kafka_broker_list" = "127.0.0.1:9092","kafka_topic" = "my-topic", "format" = "csv", "column_separator" = ",", "property.kafka_default_offsets" = "OFFSET_END", csv_schema = "k1:int;k2:int;k3:int"); +------+------+------+ | k1 | k2 | k3 | +------+------+------+ | 1111 | 222 | 333 | | 11 | 220 | 33 | | 22 | 33 | 44 | +------+------+------+ 3 rows in set (10.52 sec) ``` --- .../routine_load/routine_load_task_executor.cpp | 6 +- .../routine_load/routine_load_task_executor.h | 4 +- be/src/service/backend_service.cpp | 40 +++ be/src/service/backend_service.h | 4 + be/src/vec/exec/format/csv/csv_reader.cpp | 6 +- be/src/vec/exec/format/json/new_json_reader.cpp | 2 +- .../org/apache/doris/analysis/StorageBackend.java | 3 +- .../doris/catalog/BuiltinTableValuedFunctions.java | 4 +- .../apache/doris/datasource/FileQueryScanNode.java | 28 +- .../doris/datasource/tvf/source/TVFScanNode.java | 3 +- .../trees/expressions/functions/table/Kafka.java | 56 +++ .../visitor/TableValuedFunctionVisitor.java | 5 + .../tablefunction/KafkaTableValuedFunction.java | 388 +++++++++++++++++++++ .../org/apache/doris/common/GenericPoolTest.java | 5 + .../apache/doris/utframe/MockedBackendFactory.java | 5 + gensrc/thrift/BackendService.thrift | 11 + gensrc/thrift/Types.thrift | 1 + 17 files changed, 561 insertions(+), 10 deletions(-) diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index d22f2bb4a8c..6c14811898c 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -248,6 +248,10 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) { return Status::InternalError("unknown load source type"); } + return offer_task(ctx); +} + +Status RoutineLoadTaskExecutor::offer_task(std::shared_ptr<StreamLoadContext> ctx) { VLOG_CRITICAL << "receive a new routine load task: " << ctx->brief(); // register the task _task_map[ctx->id] = ctx; @@ -326,7 +330,7 @@ void RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx, // must put pipe before executing plan fragment HANDLE_ERROR(_exec_env->new_load_stream_mgr()->put(ctx->id, ctx), "failed to add pipe"); - if (!ctx->is_multi_table) { + if (!ctx->is_multi_table && ctx->load_type == TLoadType::ROUTINE_LOAD) { // only for normal load, single-stream-multi-table load will be planned during consuming #ifndef BE_TEST // execute plan fragment, async diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h b/be/src/runtime/routine_load/routine_load_task_executor.h index e4ad8be5921..8215a309d77 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.h +++ b/be/src/runtime/routine_load/routine_load_task_executor.h @@ -65,6 +65,8 @@ public: Status get_kafka_latest_offsets_for_partitions(const PKafkaMetaProxyRequest& request, std::vector<PIntegerPair>* partition_offsets); + Status offer_task(std::shared_ptr<StreamLoadContext> ctx); + private: // execute the task void exec_task(std::shared_ptr<StreamLoadContext> ctx, DataConsumerPool* pool, @@ -80,7 +82,7 @@ private: std::shared_ptr<StreamLoadContext> ctx); private: - ExecEnv* _exec_env = nullptr; + ExecEnv* _exec_env; PriorityThreadPool _thread_pool; DataConsumerPool _data_consumer_pool; diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index 68adeb1abe2..96622101ab9 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -59,6 +59,7 @@ #include "olap/tablet_manager.h" #include "olap/tablet_meta.h" #include "olap/txn_manager.h" +#include "runtime/define_primitive_type.h" #include "runtime/exec_env.h" #include "runtime/external_scan_context_mgr.h" #include "runtime/fragment_mgr.h" @@ -566,6 +567,45 @@ void BaseBackendService::submit_routine_load_task(TStatus& t_status, return Status::OK().to_thrift(&t_status); } +void BackendService::send_kafka_tvf_task(TStatus& t_status, const TKafkaTvfTask& task) { + LOG(INFO) << "get kafka tvf task from fe, query_id:" << UniqueId(task.id); + + // parse paramaters + // create the context + std::shared_ptr<StreamLoadContext> ctx = std::make_shared<StreamLoadContext>(_exec_env); + ctx->load_type = TLoadType::MANUL_LOAD; + ctx->load_src_type = task.type; + ctx->id = UniqueId(task.id); + + if (task.__isset.max_interval_s) { + ctx->max_interval_s = task.max_interval_s; + } + if (task.__isset.max_batch_rows) { + ctx->max_batch_rows = task.max_batch_rows; + } + if (task.__isset.max_batch_size) { + ctx->max_batch_size = task.max_batch_size; + } + + // set source related params + switch (task.type) { + case TLoadSourceType::KAFKA: + ctx->kafka_info.reset(new KafkaLoadInfo(task.info)); + break; + default: + LOG(WARNING) << "unknown load source type: " << task.type; + return Status::InternalError("unknown load source type").to_thrift(&t_status); + } + + Status st = _exec_env->routine_load_task_executor()->offer_task(ctx); + if (!st.ok()) { + LOG(WARNING) << "failed to submit kafka tvf task. task id: " << task.id; + return st.to_thrift(&t_status); + } + + return Status::OK().to_thrift(&t_status); +} + /* * 1. validate user privilege (todo) * 2. FragmentMgr#exec_plan_fragment diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h index 3aaee529735..7e35e5bbe68 100644 --- a/be/src/service/backend_service.h +++ b/be/src/service/backend_service.h @@ -18,6 +18,7 @@ #pragma once #include <gen_cpp/BackendService.h> +#include <gen_cpp/BackendService_types.h> #include <memory> #include <string> @@ -46,6 +47,7 @@ class TDiskTrashInfo; class TCancelPlanFragmentParams; class TCheckStorageFormatResult; class TRoutineLoadTask; +class TKafkaTvfTask; class TScanBatchResult; class TScanCloseParams; class TScanCloseResult; @@ -102,6 +104,8 @@ public: void submit_routine_load_task(TStatus& t_status, const std::vector<TRoutineLoadTask>& tasks) override; + void send_kafka_tvf_task(TStatus& t_status, const TKafkaTvfTask& tasks) override; + // used for external service, open means start the scan procedure void open_scanner(TScanOpenResult& result_, const TScanOpenParams& params) override; diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index 86986f8eea5..20af3600e62 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -291,7 +291,7 @@ Status CsvReader::init_reader(bool is_load) { _skip_lines = 1; } - if (_params.file_type == TFileType::FILE_STREAM) { + if (_params.file_type == TFileType::FILE_STREAM || _params.file_type == TFileType::FILE_KAFKA) { RETURN_IF_ERROR( FileFactory::create_pipe_reader(_range.load_id, &_file_reader, _state, false)); } else { @@ -859,7 +859,7 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) { _file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0; io::FileReaderOptions reader_options = FileFactory::get_reader_options(_state, _file_description); - if (_params.file_type == TFileType::FILE_STREAM) { + if (_params.file_type == TFileType::FILE_STREAM || _params.file_type == TFileType::FILE_KAFKA) { // Due to http_stream needs to pre read a portion of the data to parse column information, so it is set to true here RETURN_IF_ERROR( FileFactory::create_pipe_reader(_params.load_id, &_file_reader, _state, true)); @@ -869,7 +869,7 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) { &_file_reader)); } if (_file_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM && - _params.file_type != TFileType::FILE_BROKER) { + _params.file_type != TFileType::FILE_KAFKA && _params.file_type != TFileType::FILE_BROKER) { return Status::EndOfFile("get parsed schema failed, empty csv file: " + _range.path); } diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp b/be/src/vec/exec/format/json/new_json_reader.cpp index 514a925cba4..81a48200641 100644 --- a/be/src/vec/exec/format/json/new_json_reader.cpp +++ b/be/src/vec/exec/format/json/new_json_reader.cpp @@ -376,7 +376,7 @@ Status NewJsonReader::_open_file_reader(bool need_schema) { _current_offset = start_offset; - if (_params.file_type == TFileType::FILE_STREAM) { + if (_params.file_type == TFileType::FILE_STREAM || _params.file_type == TFileType::FILE_KAFKA) { // Due to http_stream needs to pre read a portion of the data to parse column information, so it is set to true here RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader, _state, need_schema)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java index 3b4cfefc99e..08cb04b4cf2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java @@ -146,7 +146,8 @@ public class StorageBackend implements ParseNode { OFS("Tencent CHDFS"), GFS("Tencent Goose File System"), JFS("Juicefs"), - STREAM("Stream load pipe"); + STREAM("Stream load pipe"), + KAFKA("Kafka load pipe"); private final String description; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java index acdeb683f26..8bdd71a2506 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java @@ -28,6 +28,7 @@ import org.apache.doris.nereids.trees.expressions.functions.table.Hdfs; import org.apache.doris.nereids.trees.expressions.functions.table.HttpStream; import org.apache.doris.nereids.trees.expressions.functions.table.IcebergMeta; import org.apache.doris.nereids.trees.expressions.functions.table.Jobs; +import org.apache.doris.nereids.trees.expressions.functions.table.Kafka; import org.apache.doris.nereids.trees.expressions.functions.table.Local; import org.apache.doris.nereids.trees.expressions.functions.table.MvInfos; import org.apache.doris.nereids.trees.expressions.functions.table.Numbers; @@ -61,7 +62,8 @@ public class BuiltinTableValuedFunctions implements FunctionHelper { tableValued(Jobs.class, "jobs"), tableValued(Tasks.class, "tasks"), tableValued(WorkloadGroups.class, "workload_groups"), - tableValued(ActiveBeTasks.class, "active_be_tasks") + tableValued(ActiveBeTasks.class, "active_be_tasks"), + tableValued(Kafka.class, "kafka") ); public static final BuiltinTableValuedFunctions INSTANCE = new BuiltinTableValuedFunctions(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java index 7afb04831ce..293955c6752 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java @@ -273,7 +273,8 @@ public abstract class FileQueryScanNode extends FileScanNode { ConnectContext.get().getExecutor().getSummaryProfile().setGetSplitsFinishTime(); } this.inputSplitsNum = inputSplits.size(); - if (inputSplits.isEmpty() && !(getLocationType() == TFileType.FILE_STREAM)) { + if (inputSplits.isEmpty() && !(getLocationType() == TFileType.FILE_STREAM) + && !(getLocationType() == TFileType.FILE_KAFKA)) { return; } TFileFormatType fileFormatType = getFileFormatType(); @@ -307,6 +308,31 @@ public abstract class FileQueryScanNode extends FileScanNode { curLocations.addToLocations(location); scanRangeLocations.add(curLocations); return; + } else if (getLocationType() == TFileType.FILE_KAFKA) { + params.setFileType(TFileType.FILE_KAFKA); + FunctionGenTable table = (FunctionGenTable) this.desc.getTable(); + KafkaTableValuedFunction tableValuedFunction = (KafkaTableValuedFunction) table.getTvf(); + params.setCompressType(tableValuedFunction.getTFileCompressType()); + + Map<Long, TKafkaTvfTask> kafkaTvfTaskMap = tableValuedFunction.getKafkaTvfTaskMap(); + for (Entry<Long, TKafkaTvfTask> entry : kafkaTvfTaskMap.entrySet()) { + TScanRangeLocations curLocations = newLocations(); + TFileRangeDesc rangeDesc = new TFileRangeDesc(); + rangeDesc.setLoadId(ConnectContext.get().queryId()); + rangeDesc.setSize(-1); + rangeDesc.setFileSize(-1); + curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); + curLocations.getScanRange().getExtScanRange().getFileScanRange().setParams(params); + + TScanRangeLocation location = new TScanRangeLocation(); + long backendId = entry.getKey(); + Backend backend = Env.getCurrentSystemInfo().getIdToBackend().get(backendId); + location.setBackendId(backendId); + location.setServer(new TNetworkAddress(backend.getHost(), backend.getBePort())); + curLocations.addToLocations(location); + scanRangeLocations.add(curLocations); + } + return; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java index dcd248a9782..3546cb5fc95 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java @@ -127,7 +127,8 @@ public class TVFScanNode extends FileQueryScanNode { @Override public List<Split> getSplits() throws UserException { List<Split> splits = Lists.newArrayList(); - if (tableValuedFunction.getTFileType() == TFileType.FILE_STREAM) { + if (tableValuedFunction.getTFileType() == TFileType.FILE_STREAM + || tableValuedFunction.getTFileType() == TFileType.FILE_KAFKA) { return splits; } List<TBrokerFileStatus> fileStatuses = tableValuedFunction.getFileStatuses(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Kafka.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Kafka.java new file mode 100644 index 00000000000..38afb8c3da5 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Kafka.java @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.expressions.functions.table; + +import org.apache.doris.catalog.FunctionSignature; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.expressions.Properties; +import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; +import org.apache.doris.nereids.types.coercion.AnyDataType; +import org.apache.doris.tablefunction.KafkaTableValuedFunction; +import org.apache.doris.tablefunction.TableValuedFunctionIf; + +import java.util.Map; + +/** kafka tvf **/ +public class Kafka extends TableValuedFunction { + public Kafka(Properties properties) { + super("kafka", properties); + } + + @Override + public FunctionSignature customSignature() { + return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, getArgumentsTypes()); + } + + @Override + protected TableValuedFunctionIf toCatalogFunction() { + try { + Map<String, String> arguments = getTVFProperties().getMap(); + return new KafkaTableValuedFunction(arguments); + } catch (Throwable t) { + throw new AnalysisException("Can not build KafkaTableValuedFunction by " + + this + ": " + t.getMessage(), t); + } + } + + @Override + public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) { + return visitor.visitKafka(this, context); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java index 36561e5b12c..cfc67ea4796 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java @@ -28,6 +28,7 @@ import org.apache.doris.nereids.trees.expressions.functions.table.Hdfs; import org.apache.doris.nereids.trees.expressions.functions.table.HttpStream; import org.apache.doris.nereids.trees.expressions.functions.table.IcebergMeta; import org.apache.doris.nereids.trees.expressions.functions.table.Jobs; +import org.apache.doris.nereids.trees.expressions.functions.table.Kafka; import org.apache.doris.nereids.trees.expressions.functions.table.Local; import org.apache.doris.nereids.trees.expressions.functions.table.MvInfos; import org.apache.doris.nereids.trees.expressions.functions.table.Numbers; @@ -100,6 +101,10 @@ public interface TableValuedFunctionVisitor<R, C> { return visitTableValuedFunction(s3, context); } + default R visitKafka(Kafka kafka, C context) { + return visitTableValuedFunction(kafka, context); + } + default R visitWorkloadGroups(WorkloadGroups workloadGroups, C context) { return visitTableValuedFunction(workloadGroups, context); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/KafkaTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/KafkaTableValuedFunction.java new file mode 100644 index 00000000000..541afc4c0c8 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/KafkaTableValuedFunction.java @@ -0,0 +1,388 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tablefunction; + +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.analysis.CreateRoutineLoadStmt; +import org.apache.doris.analysis.LoadStmt; +import org.apache.doris.analysis.StorageBackend; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ClientPool; +import org.apache.doris.common.Config; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.LoadException; +import org.apache.doris.common.Pair; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.common.util.SmallFileMgr; +import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.common.util.Util; +import org.apache.doris.datasource.kafka.KafkaUtil; +import org.apache.doris.load.routineload.RoutineLoadJob; +import org.apache.doris.load.routineload.kafka.KafkaConfiguration; +import org.apache.doris.load.routineload.kafka.KafkaDataSourceProperties; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.system.Backend; +import org.apache.doris.system.BeSelectionPolicy; +import org.apache.doris.thrift.BackendService; +import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TKafkaLoadInfo; +import org.apache.doris.thrift.TKafkaTvfTask; +import org.apache.doris.thrift.TLoadSourceType; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TStatus; +import org.apache.doris.thrift.TStatusCode; +import org.apache.doris.thrift.TUniqueId; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import lombok.Getter; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.UUID; +import java.util.stream.Collectors; + +public class KafkaTableValuedFunction extends ExternalFileTableValuedFunction { + public static final String NAME = "kafka"; + private String timezone = TimeUtils.DEFAULT_TIME_ZONE; + + private String brokerList; + + private String topic; + + // for kafka properties + private KafkaDataSourceProperties kafkaDataSourceProperties; + private Map<String, String> customKafkaProperties = Maps.newHashMap(); + private final List<Integer> kafkaPartitions = Lists.newArrayList(); + private Boolean hasCustomPartitions = true; + + // for jobProperties + private final Map<String, Long> jobProperties = Maps.newHashMap(); + + private Long dbId = -1L; + + @Getter + private final Map<Long, TKafkaTvfTask> kafkaTvfTaskMap = Maps.newHashMap(); + + public static final String PROP_GROUP_ID = "group.id"; + public static final String KAFKA_FILE_CATALOG = "kafka"; + + public KafkaTableValuedFunction(Map<String, String> properties) throws UserException { + // 1. parse and analyze common properties + Map<String, String> otherProperties = super.parseCommonProperties(properties); + + // 2. parse and analyze kafka properties + parseAndAnalyzeKafkaProperties(otherProperties); + + // 3. parse and analyze job properties + parseAndAnalyzeJobProperties(otherProperties); + + // 4. divide partitions + int concurrentTaskNum = calculateConcurrentTaskNum(); + getKafkaTvfTaskInfoList(concurrentTaskNum); + + // 5. transfer partition list to be + transferPartitionListToBe(); + } + + @Override + public TFileType getTFileType() { + return TFileType.FILE_KAFKA; + } + + @Override + public String getFilePath() { + return null; + } + + @Override + public BrokerDesc getBrokerDesc() { + return new BrokerDesc("KafkaTvfBroker", StorageBackend.StorageType.KAFKA, locationProperties); + } + + @Override + public String getTableName() { + return "KafkaTableValuedFunction"; + } + + protected void setOptional() throws UserException { + // set custom kafka partitions + if (CollectionUtils.isNotEmpty(kafkaDataSourceProperties.getKafkaPartitionOffsets())) { + setKafkaPartitions(); + } + // set kafka customProperties + if (MapUtils.isNotEmpty(kafkaDataSourceProperties.getCustomKafkaProperties())) { + setCustomKafkaProperties(kafkaDataSourceProperties.getCustomKafkaProperties()); + } + // set group id if not specified + this.customKafkaProperties.putIfAbsent(PROP_GROUP_ID, "_" + UUID.randomUUID()); + } + + private void setKafkaPartitions() throws LoadException { + // get kafka partition offsets + List<Pair<Integer, Long>> kafkaPartitionOffsets = kafkaDataSourceProperties.getKafkaPartitionOffsets(); + boolean isForTimes = kafkaDataSourceProperties.isOffsetsForTimes(); + if (isForTimes) { + // if the offset is set by date time, we need to get the real offset by time + // need to communicate with be + // TODO not need ssl file? + kafkaPartitionOffsets = KafkaUtil.getOffsetsForTimes(kafkaDataSourceProperties.getBrokerList(), + kafkaDataSourceProperties.getTopic(), + customKafkaProperties, kafkaDataSourceProperties.getKafkaPartitionOffsets()); + } + + // get partition number list, eg:[0,1,2] + for (Pair<Integer, Long> partitionOffset : kafkaPartitionOffsets) { + this.kafkaPartitions.add(partitionOffset.first); + } + } + + private void setCustomKafkaProperties(Map<String, String> kafkaProperties) { + this.customKafkaProperties = kafkaProperties; + } + + private void checkCustomProperties() throws DdlException { + SmallFileMgr smallFileMgr = Env.getCurrentEnv().getSmallFileMgr(); + for (Map.Entry<String, String> entry : customKafkaProperties.entrySet()) { + if (entry.getValue().startsWith("FILE:")) { + if (dbId == -1L) { + throw new DdlException("No db specified for storing ssl files"); + } + + String file = entry.getValue().substring(entry.getValue().indexOf(":") + 1); + // check file + if (!smallFileMgr.containsFile(dbId, KAFKA_FILE_CATALOG, file)) { + throw new DdlException("File " + file + " does not exist in db " + + dbId + " with catalog: " + KAFKA_FILE_CATALOG); + } + } + } + } + + private void checkPartition() throws UserException { + // user not define kafka partitions and be return no partitions + if (kafkaPartitions.isEmpty()) { + throw new AnalysisException("there is no available kafka partition"); + } + if (!hasCustomPartitions) { + return; + } + + // get all kafka partitions from be + List<Integer> allKafkaPartitions = getAllKafkaPartitions(); + + for (Integer customPartition : kafkaPartitions) { + if (!allKafkaPartitions.contains(customPartition)) { + throw new LoadException("there is a custom kafka partition " + customPartition + + " which is invalid for topic " + kafkaDataSourceProperties.getTopic()); + } + } + } + + private List<Integer> getAllKafkaPartitions() throws UserException { + convertCustomProperties(); + return KafkaUtil.getAllKafkaPartitions(brokerList, topic, customKafkaProperties); + } + + private void convertCustomProperties() throws DdlException { + SmallFileMgr smallFileMgr = Env.getCurrentEnv().getSmallFileMgr(); + for (Map.Entry<String, String> entry : customKafkaProperties.entrySet()) { + if (entry.getValue().startsWith("FILE:")) { + // convert FILE:file_name -> FILE:file_id:md5 + String file = entry.getValue().substring(entry.getValue().indexOf(":") + 1); + SmallFileMgr.SmallFile smallFile = smallFileMgr.getSmallFile(dbId, KAFKA_FILE_CATALOG, file, true); + customKafkaProperties.put(entry.getKey(), "FILE:" + smallFile.id + ":" + smallFile.md5); + } else { + customKafkaProperties.put(entry.getKey(), entry.getValue()); + } + } + } + + public void parseAndAnalyzeKafkaProperties(Map<String, String> properties) throws UserException { + // kafka partition offset may be time-data format + // get time zone, to convert time into timestamps during the analysis phase + if (ConnectContext.get() != null) { + timezone = ConnectContext.get().getSessionVariable().getTimeZone(); + } + timezone = TimeUtils.checkTimeZoneValidAndStandardize(properties.getOrDefault( + LoadStmt.TIMEZONE, timezone)); + + topic = properties.getOrDefault(KafkaConfiguration.KAFKA_TOPIC.getName(), ""); + brokerList = properties.getOrDefault(KafkaConfiguration.KAFKA_BROKER_LIST.getName(), ""); + Preconditions.checkState(!Strings.isNullOrEmpty(topic), "topic must be set before analyzing"); + Preconditions.checkState(!Strings.isNullOrEmpty(brokerList), "broker list must be set before analyzing"); + + String partitionStr = properties.getOrDefault(KafkaConfiguration.KAFKA_PARTITIONS.getName(), ""); + if (partitionStr.isEmpty()) { + hasCustomPartitions = false; + // get all kafka partitions from be + List<Integer> allKafkaPartitions = getAllKafkaPartitions(); + partitionStr = allKafkaPartitions.stream() + .map(String::valueOf) + .collect(Collectors.joining(",")); + properties.put(KafkaConfiguration.KAFKA_PARTITIONS.getName(), partitionStr); + } + // parse and analyze kafka properties, broker list and topic are required, others are optional + this.kafkaDataSourceProperties = new KafkaDataSourceProperties(properties); + this.kafkaDataSourceProperties.setTimezone(this.timezone); + this.kafkaDataSourceProperties.analyze(); + // get ssl file db + String tableName = properties.getOrDefault(KAFKA_FILE_CATALOG, ""); + if (!tableName.isEmpty()) { + Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(tableName); + dbId = db.getId(); + } + + // set custom properties and partition, include some converted operations + setOptional(); + checkCustomProperties(); + checkPartition(); + } + + public void parseAndAnalyzeJobProperties(Map<String, String> properties) throws AnalysisException { + // Copy the properties, because we will remove the key from properties. + Map<String, String> copiedProps = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); + copiedProps.putAll(properties); + + // get job properties and check range + long desiredConcurrentNum = Util.getLongPropertyOrDefault( + copiedProps.get(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY), + Config.max_routine_load_task_concurrent_num, + CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PRED, + CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY + " should > 0"); + long maxBatchIntervalS = Util.getLongPropertyOrDefault(copiedProps.get( + CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY), + RoutineLoadJob.DEFAULT_MAX_INTERVAL_SECOND, CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_PRED, + CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY + " should between 1 and 60"); + long maxBatchRows = Util.getLongPropertyOrDefault(copiedProps.get( + CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY), + RoutineLoadJob.DEFAULT_MAX_BATCH_ROWS, CreateRoutineLoadStmt.MAX_BATCH_ROWS_PRED, + CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY + " should > 200000"); + long maxBatchSizeBytes = Util.getLongPropertyOrDefault(copiedProps.get( + CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY), + RoutineLoadJob.DEFAULT_MAX_BATCH_SIZE, CreateRoutineLoadStmt.MAX_BATCH_SIZE_PRED, + CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY + " should between 100MB and 1GB"); + + // put into jobProperties + jobProperties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, desiredConcurrentNum); + jobProperties.put(CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY, maxBatchIntervalS); + jobProperties.put(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY, maxBatchRows); + jobProperties.put(CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY, maxBatchSizeBytes); + } + + public void getKafkaTvfTaskInfoList(int currentConcurrentTaskNum) throws AnalysisException { + // select be by policy + BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needQueryAvailable().needLoadAvailable().build(); + List<Long> backendIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, currentConcurrentTaskNum); + if (backendIds.isEmpty() || backendIds.size() != currentConcurrentTaskNum) { + throw new AnalysisException("No available backends or incorrect number of backends" + + ", policy: " + policy); + } + + for (int i = 0; i < currentConcurrentTaskNum; i++) { + TKafkaTvfTask tKafkaLoadInfo = createTKafkaTvfTask(ConnectContext.get().queryId(), + currentConcurrentTaskNum, i); + kafkaTvfTaskMap.put(backendIds.get(i), tKafkaLoadInfo); + } + + } + + public int calculateConcurrentTaskNum() throws AnalysisException { + int aliveBeNums = Env.getCurrentSystemInfo().getAllBackendIds(true).size(); + int desireTaskConcurrentNum = jobProperties.get( + CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY).intValue(); + int partitionNum = kafkaPartitions.size(); + + int[] numbers = {aliveBeNums, desireTaskConcurrentNum, partitionNum, + Config.max_routine_load_task_concurrent_num}; + int concurrentTaskNum = Arrays.stream(numbers).min().getAsInt(); + if (concurrentTaskNum == 0) { + throw new AnalysisException("concurrent task number is 0"); + } + + return concurrentTaskNum; + } + + private TKafkaTvfTask createTKafkaTvfTask(TUniqueId uniqueId, int currentConcurrentTaskNum, int i) { + Map<Integer, Long> taskKafkaProgress = Maps.newHashMap(); + List<Pair<Integer, Long>> kafkaPartitionOffsets = kafkaDataSourceProperties.getKafkaPartitionOffsets(); + for (int j = i; j < kafkaPartitionOffsets.size(); j = j + currentConcurrentTaskNum) { + Pair<Integer, Long> pair = kafkaPartitionOffsets.get(j); + taskKafkaProgress.put(pair.key(), pair.value()); + } + + TKafkaLoadInfo tKafkaLoadInfo = new TKafkaLoadInfo(); + tKafkaLoadInfo.setBrokers(kafkaDataSourceProperties.getBrokerList()); + tKafkaLoadInfo.setTopic(kafkaDataSourceProperties.getTopic()); + tKafkaLoadInfo.setProperties(kafkaDataSourceProperties.getCustomKafkaProperties()); + tKafkaLoadInfo.setPartitionBeginOffset(taskKafkaProgress); + + TKafkaTvfTask tKafkaTvfTask = new TKafkaTvfTask(TLoadSourceType.KAFKA, uniqueId, tKafkaLoadInfo); + tKafkaTvfTask.setMaxIntervalS(jobProperties.get(CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY)); + tKafkaTvfTask.setMaxBatchRows(jobProperties.get(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY)); + tKafkaTvfTask.setMaxBatchSize(jobProperties.get(CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY)); + + return tKafkaTvfTask; + } + + public void transferPartitionListToBe() throws AnalysisException { + for (Entry<Long, TKafkaTvfTask> entry : kafkaTvfTaskMap.entrySet()) { + Long beId = entry.getKey(); + TKafkaTvfTask task = entry.getValue(); + + Backend backend = Env.getCurrentSystemInfo().getBackend(beId); + if (backend == null) { + throw new AnalysisException("failed to send tasks to backend " + beId + " because not exist"); + } + + TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBePort()); + + boolean ok = false; + BackendService.Client client = null; + try { + client = ClientPool.backendPool.borrowObject(address); + TStatus tStatus = client.sendKafkaTvfTask(task); + ok = true; + + if (tStatus.getStatusCode() != TStatusCode.OK) { + throw new AnalysisException("failed to send task. error code: " + tStatus.getStatusCode() + + ", msg: " + (tStatus.getErrorMsgsSize() > 0 ? tStatus.getErrorMsgs().get(0) : "NaN")); + } + LOG.debug("send kafka tvf task {} to BE: {}", DebugUtil.printId(task.id), beId); + } catch (Exception e) { + throw new AnalysisException("failed to send task: " + e.getMessage(), e); + } finally { + if (ok) { + ClientPool.backendPool.returnObject(address, client); + } else { + ClientPool.backendPool.invalidateObject(address, client); + } + } + + } + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java index eb9ac858b3e..684758db357 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java @@ -35,6 +35,7 @@ import org.apache.doris.thrift.TGetTopNHotPartitionsRequest; import org.apache.doris.thrift.TGetTopNHotPartitionsResponse; import org.apache.doris.thrift.TIngestBinlogRequest; import org.apache.doris.thrift.TIngestBinlogResult; +import org.apache.doris.thrift.TKafkaTvfTask; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPreCacheAsyncRequest; import org.apache.doris.thrift.TPreCacheAsyncResponse; @@ -206,6 +207,10 @@ public class GenericPoolTest { return null; } + public TStatus sendKafkaTvfTask(TKafkaTvfTask tasks) throws TException { + return null; + } + @Override public TScanOpenResult openScanner(TScanOpenParams params) throws TException { return null; diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java index c705893c672..431396e008e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java @@ -53,6 +53,7 @@ import org.apache.doris.thrift.TGetTopNHotPartitionsResponse; import org.apache.doris.thrift.THeartbeatResult; import org.apache.doris.thrift.TIngestBinlogRequest; import org.apache.doris.thrift.TIngestBinlogResult; +import org.apache.doris.thrift.TKafkaTvfTask; import org.apache.doris.thrift.TMasterInfo; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPreCacheAsyncRequest; @@ -391,6 +392,10 @@ public class MockedBackendFactory { return new TStatus(TStatusCode.OK); } + public TStatus sendKafkaTvfTask(TKafkaTvfTask tasks) throws TException { + return new TStatus(TStatusCode.OK); + } + @Override public TScanOpenResult openScanner(TScanOpenParams params) throws TException { return null; diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index 376e2a34df9..7ed4516c4b6 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -70,6 +70,15 @@ struct TRoutineLoadTask { 17: optional bool memtable_on_sink_node; } +struct TKafkaTvfTask { + 1: required Types.TLoadSourceType type + 2: required Types.TUniqueId id + 3: required TKafkaLoadInfo info + 4: optional i64 max_interval_s + 5: optional i64 max_batch_rows + 6: optional i64 max_batch_size +} + struct TKafkaMetaProxyRequest { 1: optional TKafkaLoadInfo kafka_info } @@ -353,6 +362,8 @@ service BackendService { Status.TStatus submit_routine_load_task(1:list<TRoutineLoadTask> tasks); + Status.TStatus send_kafka_tvf_task(1:TKafkaTvfTask task); + // doris will build a scan context for this session, context_id returned if success DorisExternalService.TScanOpenResult open_scanner(1: DorisExternalService.TScanOpenParams params); diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index 04a1fd35163..8c01dd086dd 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -666,6 +666,7 @@ enum TFileType { FILE_S3, FILE_HDFS, FILE_NET, // read file by network, such as http + FILE_KAFKA, } struct TTabletCommitInfo { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org