This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch kafka_tvf
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/kafka_tvf by this push:
     new f5888d3f16a [feature](tvf) doris support kafka tvf  (#31889)
f5888d3f16a is described below

commit f5888d3f16a138b0856feb2a761875c97ba6fef0
Author: nanfeng <nanfeng_...@163.com>
AuthorDate: Wed Mar 6 22:43:47 2024 +0800

    [feature](tvf) doris support kafka tvf  (#31889)
    
    ```
    mysql> SELECT * FROM kafka("kafka_broker_list" = 
"127.0.0.1:9092","kafka_topic" = "my-topic",
    "format" = "csv", "column_separator" = ",", 
"property.kafka_default_offsets" = "OFFSET_END",
    csv_schema = "k1:int;k2:int;k3:int");
    
    +------+------+------+
    | k1   | k2   | k3   |
    +------+------+------+
    | 1111 | 222  | 333  |
    | 11   | 220  | 33   |
    | 22   | 33   | 44   |
    +------+------+------+
    3 rows in set (10.52 sec)
    ```
---
 .../routine_load/routine_load_task_executor.cpp    |   6 +-
 .../routine_load/routine_load_task_executor.h      |   4 +-
 be/src/service/backend_service.cpp                 |  40 +++
 be/src/service/backend_service.h                   |   4 +
 be/src/vec/exec/format/csv/csv_reader.cpp          |   6 +-
 be/src/vec/exec/format/json/new_json_reader.cpp    |   2 +-
 .../org/apache/doris/analysis/StorageBackend.java  |   3 +-
 .../doris/catalog/BuiltinTableValuedFunctions.java |   4 +-
 .../apache/doris/datasource/FileQueryScanNode.java |  28 +-
 .../doris/datasource/tvf/source/TVFScanNode.java   |   3 +-
 .../trees/expressions/functions/table/Kafka.java   |  56 +++
 .../visitor/TableValuedFunctionVisitor.java        |   5 +
 .../tablefunction/KafkaTableValuedFunction.java    | 388 +++++++++++++++++++++
 .../org/apache/doris/common/GenericPoolTest.java   |   5 +
 .../apache/doris/utframe/MockedBackendFactory.java |   5 +
 gensrc/thrift/BackendService.thrift                |  11 +
 gensrc/thrift/Types.thrift                         |   1 +
 17 files changed, 561 insertions(+), 10 deletions(-)

diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp 
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index d22f2bb4a8c..6c14811898c 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -248,6 +248,10 @@ Status RoutineLoadTaskExecutor::submit_task(const 
TRoutineLoadTask& task) {
         return Status::InternalError("unknown load source type");
     }
 
+    return offer_task(ctx);
+}
+
+Status RoutineLoadTaskExecutor::offer_task(std::shared_ptr<StreamLoadContext> 
ctx) {
     VLOG_CRITICAL << "receive a new routine load task: " << ctx->brief();
     // register the task
     _task_map[ctx->id] = ctx;
@@ -326,7 +330,7 @@ void 
RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx,
     // must put pipe before executing plan fragment
     HANDLE_ERROR(_exec_env->new_load_stream_mgr()->put(ctx->id, ctx), "failed 
to add pipe");
 
-    if (!ctx->is_multi_table) {
+    if (!ctx->is_multi_table && ctx->load_type == TLoadType::ROUTINE_LOAD) {
         // only for normal load, single-stream-multi-table load will be 
planned during consuming
 #ifndef BE_TEST
         // execute plan fragment, async
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h 
b/be/src/runtime/routine_load/routine_load_task_executor.h
index e4ad8be5921..8215a309d77 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.h
+++ b/be/src/runtime/routine_load/routine_load_task_executor.h
@@ -65,6 +65,8 @@ public:
     Status get_kafka_latest_offsets_for_partitions(const 
PKafkaMetaProxyRequest& request,
                                                    std::vector<PIntegerPair>* 
partition_offsets);
 
+    Status offer_task(std::shared_ptr<StreamLoadContext> ctx);
+
 private:
     // execute the task
     void exec_task(std::shared_ptr<StreamLoadContext> ctx, DataConsumerPool* 
pool,
@@ -80,7 +82,7 @@ private:
                         std::shared_ptr<StreamLoadContext> ctx);
 
 private:
-    ExecEnv* _exec_env = nullptr;
+    ExecEnv* _exec_env;
     PriorityThreadPool _thread_pool;
     DataConsumerPool _data_consumer_pool;
 
diff --git a/be/src/service/backend_service.cpp 
b/be/src/service/backend_service.cpp
index 68adeb1abe2..96622101ab9 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -59,6 +59,7 @@
 #include "olap/tablet_manager.h"
 #include "olap/tablet_meta.h"
 #include "olap/txn_manager.h"
+#include "runtime/define_primitive_type.h"
 #include "runtime/exec_env.h"
 #include "runtime/external_scan_context_mgr.h"
 #include "runtime/fragment_mgr.h"
@@ -566,6 +567,45 @@ void BaseBackendService::submit_routine_load_task(TStatus& 
t_status,
     return Status::OK().to_thrift(&t_status);
 }
 
+void BackendService::send_kafka_tvf_task(TStatus& t_status, const 
TKafkaTvfTask& task) {
+    LOG(INFO) << "get kafka tvf task from fe, query_id:" << UniqueId(task.id);
+
+    // parse paramaters
+    // create the context
+    std::shared_ptr<StreamLoadContext> ctx = 
std::make_shared<StreamLoadContext>(_exec_env);
+    ctx->load_type = TLoadType::MANUL_LOAD;
+    ctx->load_src_type = task.type;
+    ctx->id = UniqueId(task.id);
+
+    if (task.__isset.max_interval_s) {
+        ctx->max_interval_s = task.max_interval_s;
+    }
+    if (task.__isset.max_batch_rows) {
+        ctx->max_batch_rows = task.max_batch_rows;
+    }
+    if (task.__isset.max_batch_size) {
+        ctx->max_batch_size = task.max_batch_size;
+    }
+
+    // set source related params
+    switch (task.type) {
+    case TLoadSourceType::KAFKA:
+        ctx->kafka_info.reset(new KafkaLoadInfo(task.info));
+        break;
+    default:
+        LOG(WARNING) << "unknown load source type: " << task.type;
+        return Status::InternalError("unknown load source 
type").to_thrift(&t_status);
+    }
+
+    Status st = _exec_env->routine_load_task_executor()->offer_task(ctx);
+    if (!st.ok()) {
+        LOG(WARNING) << "failed to submit kafka tvf task. task id: " << 
task.id;
+        return st.to_thrift(&t_status);
+    }
+
+    return Status::OK().to_thrift(&t_status);
+}
+
 /*
  * 1. validate user privilege (todo)
  * 2. FragmentMgr#exec_plan_fragment
diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h
index 3aaee529735..7e35e5bbe68 100644
--- a/be/src/service/backend_service.h
+++ b/be/src/service/backend_service.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include <gen_cpp/BackendService.h>
+#include <gen_cpp/BackendService_types.h>
 
 #include <memory>
 #include <string>
@@ -46,6 +47,7 @@ class TDiskTrashInfo;
 class TCancelPlanFragmentParams;
 class TCheckStorageFormatResult;
 class TRoutineLoadTask;
+class TKafkaTvfTask;
 class TScanBatchResult;
 class TScanCloseParams;
 class TScanCloseResult;
@@ -102,6 +104,8 @@ public:
     void submit_routine_load_task(TStatus& t_status,
                                   const std::vector<TRoutineLoadTask>& tasks) 
override;
 
+    void send_kafka_tvf_task(TStatus& t_status, const TKafkaTvfTask& tasks) 
override;
+
     // used for external service, open means start the scan procedure
     void open_scanner(TScanOpenResult& result_, const TScanOpenParams& params) 
override;
 
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp 
b/be/src/vec/exec/format/csv/csv_reader.cpp
index 86986f8eea5..20af3600e62 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -291,7 +291,7 @@ Status CsvReader::init_reader(bool is_load) {
         _skip_lines = 1;
     }
 
-    if (_params.file_type == TFileType::FILE_STREAM) {
+    if (_params.file_type == TFileType::FILE_STREAM || _params.file_type == 
TFileType::FILE_KAFKA) {
         RETURN_IF_ERROR(
                 FileFactory::create_pipe_reader(_range.load_id, &_file_reader, 
_state, false));
     } else {
@@ -859,7 +859,7 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* 
is_parse_name) {
     _file_description.mtime = _range.__isset.modification_time ? 
_range.modification_time : 0;
     io::FileReaderOptions reader_options =
             FileFactory::get_reader_options(_state, _file_description);
-    if (_params.file_type == TFileType::FILE_STREAM) {
+    if (_params.file_type == TFileType::FILE_STREAM || _params.file_type == 
TFileType::FILE_KAFKA) {
         // Due to http_stream needs to pre read a portion of the data to parse 
column information, so it is set to true here
         RETURN_IF_ERROR(
                 FileFactory::create_pipe_reader(_params.load_id, 
&_file_reader, _state, true));
@@ -869,7 +869,7 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* 
is_parse_name) {
                                                         &_file_reader));
     }
     if (_file_reader->size() == 0 && _params.file_type != 
TFileType::FILE_STREAM &&
-        _params.file_type != TFileType::FILE_BROKER) {
+        _params.file_type != TFileType::FILE_KAFKA && _params.file_type != 
TFileType::FILE_BROKER) {
         return Status::EndOfFile("get parsed schema failed, empty csv file: " 
+ _range.path);
     }
 
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp 
b/be/src/vec/exec/format/json/new_json_reader.cpp
index 514a925cba4..81a48200641 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -376,7 +376,7 @@ Status NewJsonReader::_open_file_reader(bool need_schema) {
 
     _current_offset = start_offset;
 
-    if (_params.file_type == TFileType::FILE_STREAM) {
+    if (_params.file_type == TFileType::FILE_STREAM || _params.file_type == 
TFileType::FILE_KAFKA) {
         // Due to http_stream needs to pre read a portion of the data to parse 
column information, so it is set to true here
         RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, 
&_file_reader, _state,
                                                         need_schema));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java
index 3b4cfefc99e..08cb04b4cf2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java
@@ -146,7 +146,8 @@ public class StorageBackend implements ParseNode {
         OFS("Tencent CHDFS"),
         GFS("Tencent Goose File System"),
         JFS("Juicefs"),
-        STREAM("Stream load pipe");
+        STREAM("Stream load pipe"),
+        KAFKA("Kafka load pipe");
 
         private final String description;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
index acdeb683f26..8bdd71a2506 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
@@ -28,6 +28,7 @@ import 
org.apache.doris.nereids.trees.expressions.functions.table.Hdfs;
 import org.apache.doris.nereids.trees.expressions.functions.table.HttpStream;
 import org.apache.doris.nereids.trees.expressions.functions.table.IcebergMeta;
 import org.apache.doris.nereids.trees.expressions.functions.table.Jobs;
+import org.apache.doris.nereids.trees.expressions.functions.table.Kafka;
 import org.apache.doris.nereids.trees.expressions.functions.table.Local;
 import org.apache.doris.nereids.trees.expressions.functions.table.MvInfos;
 import org.apache.doris.nereids.trees.expressions.functions.table.Numbers;
@@ -61,7 +62,8 @@ public class BuiltinTableValuedFunctions implements 
FunctionHelper {
             tableValued(Jobs.class, "jobs"),
             tableValued(Tasks.class, "tasks"),
             tableValued(WorkloadGroups.class, "workload_groups"),
-            tableValued(ActiveBeTasks.class, "active_be_tasks")
+            tableValued(ActiveBeTasks.class, "active_be_tasks"),
+            tableValued(Kafka.class, "kafka")
     );
 
     public static final BuiltinTableValuedFunctions INSTANCE = new 
BuiltinTableValuedFunctions();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
index 7afb04831ce..293955c6752 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
@@ -273,7 +273,8 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
             
ConnectContext.get().getExecutor().getSummaryProfile().setGetSplitsFinishTime();
         }
         this.inputSplitsNum = inputSplits.size();
-        if (inputSplits.isEmpty() && !(getLocationType() == 
TFileType.FILE_STREAM)) {
+        if (inputSplits.isEmpty() && !(getLocationType() == 
TFileType.FILE_STREAM)
+                && !(getLocationType() == TFileType.FILE_KAFKA)) {
             return;
         }
         TFileFormatType fileFormatType = getFileFormatType();
@@ -307,6 +308,31 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
                 curLocations.addToLocations(location);
                 scanRangeLocations.add(curLocations);
                 return;
+            } else if (getLocationType() == TFileType.FILE_KAFKA) {
+                params.setFileType(TFileType.FILE_KAFKA);
+                FunctionGenTable table = (FunctionGenTable) 
this.desc.getTable();
+                KafkaTableValuedFunction tableValuedFunction = 
(KafkaTableValuedFunction) table.getTvf();
+                
params.setCompressType(tableValuedFunction.getTFileCompressType());
+
+                Map<Long, TKafkaTvfTask> kafkaTvfTaskMap = 
tableValuedFunction.getKafkaTvfTaskMap();
+                for (Entry<Long, TKafkaTvfTask> entry : 
kafkaTvfTaskMap.entrySet()) {
+                    TScanRangeLocations curLocations = newLocations();
+                    TFileRangeDesc rangeDesc = new TFileRangeDesc();
+                    rangeDesc.setLoadId(ConnectContext.get().queryId());
+                    rangeDesc.setSize(-1);
+                    rangeDesc.setFileSize(-1);
+                    
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
+                    
curLocations.getScanRange().getExtScanRange().getFileScanRange().setParams(params);
+
+                    TScanRangeLocation location = new TScanRangeLocation();
+                    long backendId = entry.getKey();
+                    Backend backend = 
Env.getCurrentSystemInfo().getIdToBackend().get(backendId);
+                    location.setBackendId(backendId);
+                    location.setServer(new TNetworkAddress(backend.getHost(), 
backend.getBePort()));
+                    curLocations.addToLocations(location);
+                    scanRangeLocations.add(curLocations);
+                }
+                return;
             }
         }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
index dcd248a9782..3546cb5fc95 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
@@ -127,7 +127,8 @@ public class TVFScanNode extends FileQueryScanNode {
     @Override
     public List<Split> getSplits() throws UserException {
         List<Split> splits = Lists.newArrayList();
-        if (tableValuedFunction.getTFileType() == TFileType.FILE_STREAM) {
+        if (tableValuedFunction.getTFileType() == TFileType.FILE_STREAM
+                || tableValuedFunction.getTFileType() == TFileType.FILE_KAFKA) 
{
             return splits;
         }
         List<TBrokerFileStatus> fileStatuses = 
tableValuedFunction.getFileStatuses();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Kafka.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Kafka.java
new file mode 100644
index 00000000000..38afb8c3da5
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/Kafka.java
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.expressions.functions.table;
+
+import org.apache.doris.catalog.FunctionSignature;
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.nereids.trees.expressions.Properties;
+import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
+import org.apache.doris.nereids.types.coercion.AnyDataType;
+import org.apache.doris.tablefunction.KafkaTableValuedFunction;
+import org.apache.doris.tablefunction.TableValuedFunctionIf;
+
+import java.util.Map;
+
+/** kafka tvf **/
+public class Kafka extends TableValuedFunction {
+    public Kafka(Properties properties) {
+        super("kafka", properties);
+    }
+
+    @Override
+    public FunctionSignature customSignature() {
+        return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, 
getArgumentsTypes());
+    }
+
+    @Override
+    protected TableValuedFunctionIf toCatalogFunction() {
+        try {
+            Map<String, String> arguments = getTVFProperties().getMap();
+            return new KafkaTableValuedFunction(arguments);
+        } catch (Throwable t) {
+            throw new AnalysisException("Can not build 
KafkaTableValuedFunction by "
+                + this + ": " + t.getMessage(), t);
+        }
+    }
+
+    @Override
+    public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
+        return visitor.visitKafka(this, context);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
index 36561e5b12c..cfc67ea4796 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
@@ -28,6 +28,7 @@ import 
org.apache.doris.nereids.trees.expressions.functions.table.Hdfs;
 import org.apache.doris.nereids.trees.expressions.functions.table.HttpStream;
 import org.apache.doris.nereids.trees.expressions.functions.table.IcebergMeta;
 import org.apache.doris.nereids.trees.expressions.functions.table.Jobs;
+import org.apache.doris.nereids.trees.expressions.functions.table.Kafka;
 import org.apache.doris.nereids.trees.expressions.functions.table.Local;
 import org.apache.doris.nereids.trees.expressions.functions.table.MvInfos;
 import org.apache.doris.nereids.trees.expressions.functions.table.Numbers;
@@ -100,6 +101,10 @@ public interface TableValuedFunctionVisitor<R, C> {
         return visitTableValuedFunction(s3, context);
     }
 
+    default R visitKafka(Kafka kafka, C context) {
+        return visitTableValuedFunction(kafka, context);
+    }
+
     default R visitWorkloadGroups(WorkloadGroups workloadGroups, C context) {
         return visitTableValuedFunction(workloadGroups, context);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/KafkaTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/KafkaTableValuedFunction.java
new file mode 100644
index 00000000000..541afc4c0c8
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/KafkaTableValuedFunction.java
@@ -0,0 +1,388 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tablefunction;
+
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.analysis.CreateRoutineLoadStmt;
+import org.apache.doris.analysis.LoadStmt;
+import org.apache.doris.analysis.StorageBackend;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.LoadException;
+import org.apache.doris.common.Pair;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.common.util.SmallFileMgr;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.common.util.Util;
+import org.apache.doris.datasource.kafka.KafkaUtil;
+import org.apache.doris.load.routineload.RoutineLoadJob;
+import org.apache.doris.load.routineload.kafka.KafkaConfiguration;
+import org.apache.doris.load.routineload.kafka.KafkaDataSourceProperties;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.BeSelectionPolicy;
+import org.apache.doris.thrift.BackendService;
+import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.TKafkaLoadInfo;
+import org.apache.doris.thrift.TKafkaTvfTask;
+import org.apache.doris.thrift.TLoadSourceType;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStatus;
+import org.apache.doris.thrift.TStatusCode;
+import org.apache.doris.thrift.TUniqueId;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import lombok.Getter;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+public class KafkaTableValuedFunction extends ExternalFileTableValuedFunction {
+    public static final String NAME = "kafka";
+    private String timezone = TimeUtils.DEFAULT_TIME_ZONE;
+
+    private String brokerList;
+
+    private String topic;
+
+    // for kafka properties
+    private KafkaDataSourceProperties kafkaDataSourceProperties;
+    private Map<String, String> customKafkaProperties = Maps.newHashMap();
+    private final List<Integer> kafkaPartitions = Lists.newArrayList();
+    private Boolean hasCustomPartitions = true;
+
+    // for jobProperties
+    private final Map<String, Long> jobProperties = Maps.newHashMap();
+
+    private Long dbId = -1L;
+
+    @Getter
+    private final Map<Long, TKafkaTvfTask> kafkaTvfTaskMap = Maps.newHashMap();
+
+    public static final String PROP_GROUP_ID = "group.id";
+    public static final String KAFKA_FILE_CATALOG = "kafka";
+
+    public KafkaTableValuedFunction(Map<String, String> properties) throws 
UserException {
+        // 1. parse and analyze common properties
+        Map<String, String> otherProperties = 
super.parseCommonProperties(properties);
+
+        // 2. parse and analyze kafka properties
+        parseAndAnalyzeKafkaProperties(otherProperties);
+
+        // 3. parse and analyze job properties
+        parseAndAnalyzeJobProperties(otherProperties);
+
+        // 4. divide partitions
+        int concurrentTaskNum = calculateConcurrentTaskNum();
+        getKafkaTvfTaskInfoList(concurrentTaskNum);
+
+        // 5. transfer partition list to be
+        transferPartitionListToBe();
+    }
+
+    @Override
+    public TFileType getTFileType() {
+        return TFileType.FILE_KAFKA;
+    }
+
+    @Override
+    public String getFilePath() {
+        return null;
+    }
+
+    @Override
+    public BrokerDesc getBrokerDesc() {
+        return new BrokerDesc("KafkaTvfBroker", 
StorageBackend.StorageType.KAFKA, locationProperties);
+    }
+
+    @Override
+    public String getTableName() {
+        return "KafkaTableValuedFunction";
+    }
+
+    protected void setOptional() throws UserException {
+        // set custom kafka partitions
+        if 
(CollectionUtils.isNotEmpty(kafkaDataSourceProperties.getKafkaPartitionOffsets()))
 {
+            setKafkaPartitions();
+        }
+        // set kafka customProperties
+        if 
(MapUtils.isNotEmpty(kafkaDataSourceProperties.getCustomKafkaProperties())) {
+            
setCustomKafkaProperties(kafkaDataSourceProperties.getCustomKafkaProperties());
+        }
+        // set group id if not specified
+        this.customKafkaProperties.putIfAbsent(PROP_GROUP_ID, "_" + 
UUID.randomUUID());
+    }
+
+    private void setKafkaPartitions() throws LoadException {
+        // get kafka partition offsets
+        List<Pair<Integer, Long>> kafkaPartitionOffsets = 
kafkaDataSourceProperties.getKafkaPartitionOffsets();
+        boolean isForTimes = kafkaDataSourceProperties.isOffsetsForTimes();
+        if (isForTimes) {
+            // if the offset is set by date time, we need to get the real 
offset by time
+            // need to communicate with be
+            // TODO not need ssl file?
+            kafkaPartitionOffsets = 
KafkaUtil.getOffsetsForTimes(kafkaDataSourceProperties.getBrokerList(),
+                    kafkaDataSourceProperties.getTopic(),
+                    customKafkaProperties, 
kafkaDataSourceProperties.getKafkaPartitionOffsets());
+        }
+
+        // get partition number list, eg:[0,1,2]
+        for (Pair<Integer, Long> partitionOffset : kafkaPartitionOffsets) {
+            this.kafkaPartitions.add(partitionOffset.first);
+        }
+    }
+
+    private void setCustomKafkaProperties(Map<String, String> kafkaProperties) 
{
+        this.customKafkaProperties = kafkaProperties;
+    }
+
+    private void checkCustomProperties() throws DdlException {
+        SmallFileMgr smallFileMgr = Env.getCurrentEnv().getSmallFileMgr();
+        for (Map.Entry<String, String> entry : 
customKafkaProperties.entrySet()) {
+            if (entry.getValue().startsWith("FILE:")) {
+                if (dbId == -1L) {
+                    throw new DdlException("No db specified for storing ssl 
files");
+                }
+
+                String file = 
entry.getValue().substring(entry.getValue().indexOf(":") + 1);
+                // check file
+                if (!smallFileMgr.containsFile(dbId, KAFKA_FILE_CATALOG, 
file)) {
+                    throw new DdlException("File " + file + " does not exist 
in db "
+                            + dbId + " with catalog: " + KAFKA_FILE_CATALOG);
+                }
+            }
+        }
+    }
+
+    private void checkPartition() throws UserException {
+        // user not define kafka partitions and be return no partitions
+        if (kafkaPartitions.isEmpty()) {
+            throw new AnalysisException("there is no available kafka 
partition");
+        }
+        if (!hasCustomPartitions) {
+            return;
+        }
+
+        // get all kafka partitions from be
+        List<Integer> allKafkaPartitions = getAllKafkaPartitions();
+
+        for (Integer customPartition : kafkaPartitions) {
+            if (!allKafkaPartitions.contains(customPartition)) {
+                throw new LoadException("there is a custom kafka partition " + 
customPartition
+                        + " which is invalid for topic " + 
kafkaDataSourceProperties.getTopic());
+            }
+        }
+    }
+
+    private List<Integer> getAllKafkaPartitions() throws UserException {
+        convertCustomProperties();
+        return KafkaUtil.getAllKafkaPartitions(brokerList, topic, 
customKafkaProperties);
+    }
+
+    private void convertCustomProperties() throws DdlException {
+        SmallFileMgr smallFileMgr = Env.getCurrentEnv().getSmallFileMgr();
+        for (Map.Entry<String, String> entry : 
customKafkaProperties.entrySet()) {
+            if (entry.getValue().startsWith("FILE:")) {
+                // convert FILE:file_name -> FILE:file_id:md5
+                String file = 
entry.getValue().substring(entry.getValue().indexOf(":") + 1);
+                SmallFileMgr.SmallFile smallFile = 
smallFileMgr.getSmallFile(dbId, KAFKA_FILE_CATALOG, file, true);
+                customKafkaProperties.put(entry.getKey(), "FILE:" + 
smallFile.id + ":" + smallFile.md5);
+            } else {
+                customKafkaProperties.put(entry.getKey(), entry.getValue());
+            }
+        }
+    }
+
+    public void parseAndAnalyzeKafkaProperties(Map<String, String> properties) 
throws UserException {
+        // kafka partition offset may be time-data format
+        // get time zone, to convert time into timestamps during the analysis 
phase
+        if (ConnectContext.get() != null) {
+            timezone = ConnectContext.get().getSessionVariable().getTimeZone();
+        }
+        timezone = 
TimeUtils.checkTimeZoneValidAndStandardize(properties.getOrDefault(
+                LoadStmt.TIMEZONE, timezone));
+
+        topic = 
properties.getOrDefault(KafkaConfiguration.KAFKA_TOPIC.getName(), "");
+        brokerList = 
properties.getOrDefault(KafkaConfiguration.KAFKA_BROKER_LIST.getName(), "");
+        Preconditions.checkState(!Strings.isNullOrEmpty(topic), "topic must be 
set before analyzing");
+        Preconditions.checkState(!Strings.isNullOrEmpty(brokerList), "broker 
list must be set before analyzing");
+
+        String partitionStr = 
properties.getOrDefault(KafkaConfiguration.KAFKA_PARTITIONS.getName(), "");
+        if (partitionStr.isEmpty()) {
+            hasCustomPartitions = false;
+            // get all kafka partitions from be
+            List<Integer> allKafkaPartitions = getAllKafkaPartitions();
+            partitionStr = allKafkaPartitions.stream()
+                    .map(String::valueOf)
+                    .collect(Collectors.joining(","));
+            properties.put(KafkaConfiguration.KAFKA_PARTITIONS.getName(), 
partitionStr);
+        }
+        // parse and analyze kafka properties, broker list and topic are 
required, others are optional
+        this.kafkaDataSourceProperties = new 
KafkaDataSourceProperties(properties);
+        this.kafkaDataSourceProperties.setTimezone(this.timezone);
+        this.kafkaDataSourceProperties.analyze();
+        // get ssl file db
+        String tableName = properties.getOrDefault(KAFKA_FILE_CATALOG, "");
+        if (!tableName.isEmpty()) {
+            Database db = 
Env.getCurrentInternalCatalog().getDbOrDdlException(tableName);
+            dbId = db.getId();
+        }
+
+        // set custom properties and partition, include some converted 
operations
+        setOptional();
+        checkCustomProperties();
+        checkPartition();
+    }
+
+    public void parseAndAnalyzeJobProperties(Map<String, String> properties) 
throws AnalysisException {
+        // Copy the properties, because we will remove the key from properties.
+        Map<String, String> copiedProps = 
Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
+        copiedProps.putAll(properties);
+
+        // get job properties and check range
+        long desiredConcurrentNum = Util.getLongPropertyOrDefault(
+                
copiedProps.get(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY),
+                Config.max_routine_load_task_concurrent_num,
+                CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PRED,
+                CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY + " 
should > 0");
+        long maxBatchIntervalS = Util.getLongPropertyOrDefault(copiedProps.get(
+                        CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY),
+                RoutineLoadJob.DEFAULT_MAX_INTERVAL_SECOND, 
CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_PRED,
+                CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY + " 
should between 1 and 60");
+        long maxBatchRows = Util.getLongPropertyOrDefault(copiedProps.get(
+                        CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY),
+                RoutineLoadJob.DEFAULT_MAX_BATCH_ROWS, 
CreateRoutineLoadStmt.MAX_BATCH_ROWS_PRED,
+                CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY + " should > 
200000");
+        long maxBatchSizeBytes = Util.getLongPropertyOrDefault(copiedProps.get(
+                        CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY),
+                RoutineLoadJob.DEFAULT_MAX_BATCH_SIZE, 
CreateRoutineLoadStmt.MAX_BATCH_SIZE_PRED,
+                CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY + " should 
between 100MB and 1GB");
+
+        // put into jobProperties
+        
jobProperties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, 
desiredConcurrentNum);
+        
jobProperties.put(CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY, 
maxBatchIntervalS);
+        jobProperties.put(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY, 
maxBatchRows);
+        jobProperties.put(CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY, 
maxBatchSizeBytes);
+    }
+
+    public void getKafkaTvfTaskInfoList(int currentConcurrentTaskNum) throws 
AnalysisException {
+        // select be by policy
+        BeSelectionPolicy policy = new 
BeSelectionPolicy.Builder().needQueryAvailable().needLoadAvailable().build();
+        List<Long> backendIds = 
Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 
currentConcurrentTaskNum);
+        if (backendIds.isEmpty() || backendIds.size() != 
currentConcurrentTaskNum) {
+            throw new AnalysisException("No available backends or incorrect 
number of backends"
+                    + ", policy: " + policy);
+        }
+
+        for (int i = 0; i < currentConcurrentTaskNum; i++) {
+            TKafkaTvfTask tKafkaLoadInfo = 
createTKafkaTvfTask(ConnectContext.get().queryId(),
+                    currentConcurrentTaskNum, i);
+            kafkaTvfTaskMap.put(backendIds.get(i), tKafkaLoadInfo);
+        }
+
+    }
+
+    public int calculateConcurrentTaskNum() throws AnalysisException {
+        int aliveBeNums = 
Env.getCurrentSystemInfo().getAllBackendIds(true).size();
+        int desireTaskConcurrentNum = jobProperties.get(
+                
CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY).intValue();
+        int partitionNum = kafkaPartitions.size();
+
+        int[] numbers = {aliveBeNums, desireTaskConcurrentNum, partitionNum,
+                Config.max_routine_load_task_concurrent_num};
+        int concurrentTaskNum = Arrays.stream(numbers).min().getAsInt();
+        if (concurrentTaskNum == 0) {
+            throw new AnalysisException("concurrent task number is 0");
+        }
+
+        return concurrentTaskNum;
+    }
+
+    private TKafkaTvfTask createTKafkaTvfTask(TUniqueId uniqueId, int 
currentConcurrentTaskNum, int i) {
+        Map<Integer, Long> taskKafkaProgress = Maps.newHashMap();
+        List<Pair<Integer, Long>> kafkaPartitionOffsets = 
kafkaDataSourceProperties.getKafkaPartitionOffsets();
+        for (int j = i; j < kafkaPartitionOffsets.size(); j = j + 
currentConcurrentTaskNum) {
+            Pair<Integer, Long> pair = kafkaPartitionOffsets.get(j);
+            taskKafkaProgress.put(pair.key(), pair.value());
+        }
+
+        TKafkaLoadInfo tKafkaLoadInfo  = new TKafkaLoadInfo();
+        tKafkaLoadInfo.setBrokers(kafkaDataSourceProperties.getBrokerList());
+        tKafkaLoadInfo.setTopic(kafkaDataSourceProperties.getTopic());
+        
tKafkaLoadInfo.setProperties(kafkaDataSourceProperties.getCustomKafkaProperties());
+        tKafkaLoadInfo.setPartitionBeginOffset(taskKafkaProgress);
+
+        TKafkaTvfTask tKafkaTvfTask = new TKafkaTvfTask(TLoadSourceType.KAFKA, 
uniqueId, tKafkaLoadInfo);
+        
tKafkaTvfTask.setMaxIntervalS(jobProperties.get(CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY));
+        
tKafkaTvfTask.setMaxBatchRows(jobProperties.get(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY));
+        
tKafkaTvfTask.setMaxBatchSize(jobProperties.get(CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY));
+
+        return tKafkaTvfTask;
+    }
+
+    public void transferPartitionListToBe() throws AnalysisException {
+        for (Entry<Long, TKafkaTvfTask> entry : kafkaTvfTaskMap.entrySet()) {
+            Long beId = entry.getKey();
+            TKafkaTvfTask task = entry.getValue();
+
+            Backend backend = Env.getCurrentSystemInfo().getBackend(beId);
+            if (backend == null) {
+                throw new AnalysisException("failed to send tasks to backend " 
+ beId + " because not exist");
+            }
+
+            TNetworkAddress address = new TNetworkAddress(backend.getHost(), 
backend.getBePort());
+
+            boolean ok = false;
+            BackendService.Client client = null;
+            try {
+                client = ClientPool.backendPool.borrowObject(address);
+                TStatus tStatus = client.sendKafkaTvfTask(task);
+                ok = true;
+
+                if (tStatus.getStatusCode() != TStatusCode.OK) {
+                    throw new AnalysisException("failed to send task. error 
code: " + tStatus.getStatusCode()
+                            + ", msg: " + (tStatus.getErrorMsgsSize() > 0 ? 
tStatus.getErrorMsgs().get(0) : "NaN"));
+                }
+                LOG.debug("send kafka tvf task {} to BE: {}", 
DebugUtil.printId(task.id), beId);
+            } catch (Exception e) {
+                throw new AnalysisException("failed to send task: " + 
e.getMessage(), e);
+            } finally {
+                if (ok) {
+                    ClientPool.backendPool.returnObject(address, client);
+                } else {
+                    ClientPool.backendPool.invalidateObject(address, client);
+                }
+            }
+
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
index eb9ac858b3e..684758db357 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
@@ -35,6 +35,7 @@ import org.apache.doris.thrift.TGetTopNHotPartitionsRequest;
 import org.apache.doris.thrift.TGetTopNHotPartitionsResponse;
 import org.apache.doris.thrift.TIngestBinlogRequest;
 import org.apache.doris.thrift.TIngestBinlogResult;
+import org.apache.doris.thrift.TKafkaTvfTask;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TPreCacheAsyncRequest;
 import org.apache.doris.thrift.TPreCacheAsyncResponse;
@@ -206,6 +207,10 @@ public class GenericPoolTest {
             return null;
         }
 
+        public TStatus sendKafkaTvfTask(TKafkaTvfTask tasks) throws TException 
{
+            return null;
+        }
+
         @Override
         public TScanOpenResult openScanner(TScanOpenParams params) throws 
TException {
             return null;
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
index c705893c672..431396e008e 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
@@ -53,6 +53,7 @@ import org.apache.doris.thrift.TGetTopNHotPartitionsResponse;
 import org.apache.doris.thrift.THeartbeatResult;
 import org.apache.doris.thrift.TIngestBinlogRequest;
 import org.apache.doris.thrift.TIngestBinlogResult;
+import org.apache.doris.thrift.TKafkaTvfTask;
 import org.apache.doris.thrift.TMasterInfo;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TPreCacheAsyncRequest;
@@ -391,6 +392,10 @@ public class MockedBackendFactory {
             return new TStatus(TStatusCode.OK);
         }
 
+        public TStatus sendKafkaTvfTask(TKafkaTvfTask tasks) throws TException 
{
+            return new TStatus(TStatusCode.OK);
+        }
+
         @Override
         public TScanOpenResult openScanner(TScanOpenParams params) throws 
TException {
             return null;
diff --git a/gensrc/thrift/BackendService.thrift 
b/gensrc/thrift/BackendService.thrift
index 376e2a34df9..7ed4516c4b6 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -70,6 +70,15 @@ struct TRoutineLoadTask {
     17: optional bool memtable_on_sink_node;
 }
 
+struct TKafkaTvfTask {
+    1: required Types.TLoadSourceType type
+    2: required Types.TUniqueId id 
+    3: required TKafkaLoadInfo info
+    4: optional i64 max_interval_s
+    5: optional i64 max_batch_rows
+    6: optional i64 max_batch_size
+}
+
 struct TKafkaMetaProxyRequest {
     1: optional TKafkaLoadInfo kafka_info
 }
@@ -353,6 +362,8 @@ service BackendService {
 
     Status.TStatus submit_routine_load_task(1:list<TRoutineLoadTask> tasks);
 
+    Status.TStatus send_kafka_tvf_task(1:TKafkaTvfTask task);
+
     // doris will build  a scan context for this session, context_id returned 
if success
     DorisExternalService.TScanOpenResult open_scanner(1: 
DorisExternalService.TScanOpenParams params);
 
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index 04a1fd35163..8c01dd086dd 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -666,6 +666,7 @@ enum TFileType {
     FILE_S3,
     FILE_HDFS,
     FILE_NET,       // read file by network, such as http
+    FILE_KAFKA,
 }
 
 struct TTabletCommitInfo {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to