This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 4387f47fb5 [pipeline](load) support pipeline load (#20217) 4387f47fb5 is described below commit 4387f47fb52a9e4813e0b1551bc173d9d7186ad4 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Thu Jun 1 11:42:43 2023 +0800 [pipeline](load) support pipeline load (#20217) --- be/src/runtime/fragment_mgr.cpp | 36 +++- .../routine_load/routine_load_task_executor.cpp | 9 +- .../runtime/stream_load/stream_load_executor.cpp | 201 +++++++++++++------- .../doris/load/routineload/KafkaTaskInfo.java | 17 +- .../doris/load/routineload/RoutineLoadJob.java | 21 +++ .../apache/doris/planner/StreamLoadPlanner.java | 202 +++++++++++++++++++++ .../apache/doris/qe/InsertStreamTxnExecutor.java | 114 ++++++++---- .../apache/doris/service/FrontendServiceImpl.java | 46 ++++- gensrc/thrift/BackendService.thrift | 1 + gensrc/thrift/FrontendService.thrift | 1 + gensrc/thrift/PaloInternalService.thrift | 1 + 11 files changed, 547 insertions(+), 102 deletions(-) diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 5ac0554104..26646bbd0e 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -559,6 +559,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) { stream_load_ctx->txn_id = params.txn_conf.txn_id; stream_load_ctx->id = UniqueId(params.params.query_id); stream_load_ctx->put_result.params = params; + stream_load_ctx->put_result.__isset.params = true; stream_load_ctx->use_streaming = true; stream_load_ctx->load_type = TLoadType::MANUL_LOAD; stream_load_ctx->load_src_type = TLoadSourceType::RAW; @@ -586,8 +587,39 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) { } Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params) { - // TODO - return exec_plan_fragment(params, empty_function); + if (params.txn_conf.need_txn) { + std::shared_ptr<StreamLoadContext> stream_load_ctx = + std::make_shared<StreamLoadContext>(_exec_env); + stream_load_ctx->db = params.txn_conf.db; + stream_load_ctx->db_id = params.txn_conf.db_id; + stream_load_ctx->table = params.txn_conf.tbl; + stream_load_ctx->txn_id = params.txn_conf.txn_id; + stream_load_ctx->id = UniqueId(params.query_id); + stream_load_ctx->put_result.pipeline_params = params; + stream_load_ctx->use_streaming = true; + stream_load_ctx->load_type = TLoadType::MANUL_LOAD; + stream_load_ctx->load_src_type = TLoadSourceType::RAW; + stream_load_ctx->label = params.import_label; + stream_load_ctx->format = TFileFormatType::FORMAT_CSV_PLAIN; + stream_load_ctx->timeout_second = 3600; + stream_load_ctx->auth.token = params.txn_conf.token; + stream_load_ctx->need_commit_self = true; + stream_load_ctx->need_rollback = true; + auto pipe = std::make_shared<io::StreamLoadPipe>( + io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */, + -1 /* total_length */, true /* use_proto */); + stream_load_ctx->body_sink = pipe; + stream_load_ctx->pipe = pipe; + stream_load_ctx->max_filter_ratio = params.txn_conf.max_filter_ratio; + + RETURN_IF_ERROR( + _exec_env->new_load_stream_mgr()->put(stream_load_ctx->id, stream_load_ctx)); + + RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(stream_load_ctx)); + return Status::OK(); + } else { + return exec_plan_fragment(params, empty_function); + } } Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* request) { diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index 4db6d42e18..7837c240e0 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -213,8 +213,13 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) { TStatus tstatus; tstatus.status_code = TStatusCode::OK; put_result.status = tstatus; - put_result.params = task.params; - put_result.__isset.params = true; + if (task.__isset.params) { + put_result.params = task.params; + put_result.__isset.params = true; + } else { + put_result.pipeline_params = task.pipeline_params; + put_result.__isset.pipeline_params = true; + } ctx->put_result = put_result; if (task.__isset.format) { ctx->format = task.format; diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index dc03195c3e..05c68941b9 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -69,77 +69,154 @@ Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte ctx->start_write_data_nanos = MonotonicNanos(); LOG(INFO) << "begin to execute job. label=" << ctx->label << ", txn_id=" << ctx->txn_id << ", query_id=" << print_id(ctx->put_result.params.params.query_id); - auto st = _exec_env->fragment_mgr()->exec_plan_fragment( - ctx->put_result.params, [ctx, this](RuntimeState* state, Status* status) { - ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id); - ctx->commit_infos = std::move(state->tablet_commit_infos()); - if (status->ok()) { - ctx->number_total_rows = state->num_rows_load_total(); - ctx->number_loaded_rows = state->num_rows_load_success(); - ctx->number_filtered_rows = state->num_rows_load_filtered(); - ctx->number_unselected_rows = state->num_rows_load_unselected(); - - int64_t num_selected_rows = - ctx->number_total_rows - ctx->number_unselected_rows; - if (num_selected_rows > 0 && - (double)ctx->number_filtered_rows / num_selected_rows > - ctx->max_filter_ratio) { - // NOTE: Do not modify the error message here, for historical reasons, - // some users may rely on this error message. - *status = Status::InternalError("too many filtered rows"); + Status st; + if (ctx->put_result.__isset.params) { + st = _exec_env->fragment_mgr()->exec_plan_fragment( + ctx->put_result.params, [ctx, this](RuntimeState* state, Status* status) { + ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id); + ctx->commit_infos = std::move(state->tablet_commit_infos()); + if (status->ok()) { + ctx->number_total_rows = state->num_rows_load_total(); + ctx->number_loaded_rows = state->num_rows_load_success(); + ctx->number_filtered_rows = state->num_rows_load_filtered(); + ctx->number_unselected_rows = state->num_rows_load_unselected(); + + int64_t num_selected_rows = + ctx->number_total_rows - ctx->number_unselected_rows; + if (num_selected_rows > 0 && + (double)ctx->number_filtered_rows / num_selected_rows > + ctx->max_filter_ratio) { + // NOTE: Do not modify the error message here, for historical reasons, + // some users may rely on this error message. + *status = Status::InternalError("too many filtered rows"); + } + if (ctx->number_filtered_rows > 0 && + !state->get_error_log_file_path().empty()) { + ctx->error_url = + to_load_error_http_path(state->get_error_log_file_path()); + } + + if (status->ok()) { + DorisMetrics::instance()->stream_receive_bytes_total->increment( + ctx->receive_bytes); + DorisMetrics::instance()->stream_load_rows_total->increment( + ctx->number_loaded_rows); + } + } else { + LOG(WARNING) + << "fragment execute failed" + << ", query_id=" << UniqueId(ctx->put_result.params.params.query_id) + << ", err_msg=" << status->to_string() << ", " << ctx->brief(); + // cancel body_sink, make sender known it + if (ctx->body_sink != nullptr) { + ctx->body_sink->cancel(status->to_string()); + } + + switch (ctx->load_src_type) { + // reset the stream load ctx's kafka commit offset + case TLoadSourceType::KAFKA: + ctx->kafka_info->reset_offset(); + break; + default: + break; + } } - if (ctx->number_filtered_rows > 0 && - !state->get_error_log_file_path().empty()) { - ctx->error_url = to_load_error_http_path(state->get_error_log_file_path()); + ctx->write_data_cost_nanos = MonotonicNanos() - ctx->start_write_data_nanos; + ctx->promise.set_value(*status); + + if (!status->ok() && ctx->body_sink != nullptr) { + // In some cases, the load execution is exited early. + // For example, when max_filter_ratio is 0 and illegal data is encountered + // during stream loading, the entire load process is terminated early. + // However, the http connection may still be sending data to stream_load_pipe + // and waiting for it to be consumed. + // Therefore, we need to actively cancel to end the pipe. + ctx->body_sink->cancel(status->to_string()); } + if (ctx->need_commit_self && ctx->body_sink != nullptr) { + if (ctx->body_sink->cancelled() || !status->ok()) { + ctx->status = *status; + this->rollback_txn(ctx.get()); + } else { + this->commit_txn(ctx.get()); + } + } + }); + } else { + st = _exec_env->fragment_mgr()->exec_plan_fragment( + ctx->put_result.pipeline_params, [ctx, this](RuntimeState* state, Status* status) { + ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id); + ctx->commit_infos = std::move(state->tablet_commit_infos()); if (status->ok()) { - DorisMetrics::instance()->stream_receive_bytes_total->increment( - ctx->receive_bytes); - DorisMetrics::instance()->stream_load_rows_total->increment( - ctx->number_loaded_rows); + ctx->number_total_rows = state->num_rows_load_total(); + ctx->number_loaded_rows = state->num_rows_load_success(); + ctx->number_filtered_rows = state->num_rows_load_filtered(); + ctx->number_unselected_rows = state->num_rows_load_unselected(); + + int64_t num_selected_rows = + ctx->number_total_rows - ctx->number_unselected_rows; + if (num_selected_rows > 0 && + (double)ctx->number_filtered_rows / num_selected_rows > + ctx->max_filter_ratio) { + // NOTE: Do not modify the error message here, for historical reasons, + // some users may rely on this error message. + *status = Status::InternalError("too many filtered rows"); + } + if (ctx->number_filtered_rows > 0 && + !state->get_error_log_file_path().empty()) { + ctx->error_url = + to_load_error_http_path(state->get_error_log_file_path()); + } + + if (status->ok()) { + DorisMetrics::instance()->stream_receive_bytes_total->increment( + ctx->receive_bytes); + DorisMetrics::instance()->stream_load_rows_total->increment( + ctx->number_loaded_rows); + } + } else { + LOG(WARNING) + << "fragment execute failed" + << ", query_id=" << UniqueId(ctx->put_result.params.params.query_id) + << ", err_msg=" << status->to_string() << ", " << ctx->brief(); + // cancel body_sink, make sender known it + if (ctx->body_sink != nullptr) { + ctx->body_sink->cancel(status->to_string()); + } + + switch (ctx->load_src_type) { + // reset the stream load ctx's kafka commit offset + case TLoadSourceType::KAFKA: + ctx->kafka_info->reset_offset(); + break; + default: + break; + } } - } else { - LOG(WARNING) << "fragment execute failed" - << ", query_id=" - << UniqueId(ctx->put_result.params.params.query_id) - << ", err_msg=" << status->to_string() << ", " << ctx->brief(); - // cancel body_sink, make sender known it - if (ctx->body_sink != nullptr) { + ctx->write_data_cost_nanos = MonotonicNanos() - ctx->start_write_data_nanos; + ctx->promise.set_value(*status); + + if (!status->ok() && ctx->body_sink != nullptr) { + // In some cases, the load execution is exited early. + // For example, when max_filter_ratio is 0 and illegal data is encountered + // during stream loading, the entire load process is terminated early. + // However, the http connection may still be sending data to stream_load_pipe + // and waiting for it to be consumed. + // Therefore, we need to actively cancel to end the pipe. ctx->body_sink->cancel(status->to_string()); } - switch (ctx->load_src_type) { - // reset the stream load ctx's kafka commit offset - case TLoadSourceType::KAFKA: - ctx->kafka_info->reset_offset(); - break; - default: - break; - } - } - ctx->write_data_cost_nanos = MonotonicNanos() - ctx->start_write_data_nanos; - ctx->promise.set_value(*status); - - if (!status->ok() && ctx->body_sink != nullptr) { - // In some cases, the load execution is exited early. - // For example, when max_filter_ratio is 0 and illegal data is encountered - // during stream loading, the entire load process is terminated early. - // However, the http connection may still be sending data to stream_load_pipe - // and waiting for it to be consumed. - // Therefore, we need to actively cancel to end the pipe. - ctx->body_sink->cancel(status->to_string()); - } - - if (ctx->need_commit_self && ctx->body_sink != nullptr) { - if (ctx->body_sink->cancelled() || !status->ok()) { - ctx->status = *status; - this->rollback_txn(ctx.get()); - } else { - this->commit_txn(ctx.get()); + if (ctx->need_commit_self && ctx->body_sink != nullptr) { + if (ctx->body_sink->cancelled() || !status->ok()) { + ctx->status = *status; + this->rollback_txn(ctx.get()); + } else { + this->commit_txn(ctx.get()); + } } - } - }); + }); + } if (!st.ok()) { // no need to check unref's return value return st; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java index fa4be7855a..f11d2ad373 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java @@ -20,12 +20,14 @@ package org.apache.doris.load.routineload; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Table; +import org.apache.doris.common.Config; import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.thrift.TExecPlanFragmentParams; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TKafkaLoadInfo; import org.apache.doris.thrift.TLoadSourceType; +import org.apache.doris.thrift.TPipelineFragmentParams; import org.apache.doris.thrift.TPlanFragment; import org.apache.doris.thrift.TRoutineLoadTask; import org.apache.doris.thrift.TUniqueId; @@ -87,7 +89,11 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo { tKafkaLoadInfo.setProperties(routineLoadJob.getConvertedCustomProperties()); tRoutineLoadTask.setKafkaLoadInfo(tKafkaLoadInfo); tRoutineLoadTask.setType(TLoadSourceType.KAFKA); - tRoutineLoadTask.setParams(rePlan(routineLoadJob)); + if (Config.enable_pipeline_load) { + tRoutineLoadTask.setPipelineParams(rePlanForPipeline(routineLoadJob)); + } else { + tRoutineLoadTask.setParams(rePlan(routineLoadJob)); + } tRoutineLoadTask.setMaxIntervalS(routineLoadJob.getMaxBatchIntervalS()); tRoutineLoadTask.setMaxBatchRows(routineLoadJob.getMaxBatchRows()); tRoutineLoadTask.setMaxBatchSize(routineLoadJob.getMaxBatchSizeBytes()); @@ -120,6 +126,15 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo { return tExecPlanFragmentParams; } + private TPipelineFragmentParams rePlanForPipeline(RoutineLoadJob routineLoadJob) throws UserException { + TUniqueId loadId = new TUniqueId(id.getMostSignificantBits(), id.getLeastSignificantBits()); + // plan for each task, in case table has change(rollup or schema change) + TPipelineFragmentParams tExecPlanFragmentParams = routineLoadJob.planForPipeline(loadId, txnId); + TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment(); + tPlanFragment.getOutputSink().getOlapTableSink().setTxnId(txnId); + return tExecPlanFragmentParams; + } + // implement method for compatibility public String getHeaderType() { return ""; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 5d4d2ecf8c..56186cbda2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -61,6 +61,7 @@ import org.apache.doris.task.LoadTaskInfo; import org.apache.doris.thrift.TExecPlanFragmentParams; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TPipelineFragmentParams; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.AbstractTxnStateChangeCallback; import org.apache.doris.transaction.TransactionException; @@ -829,6 +830,26 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl } } + public TPipelineFragmentParams planForPipeline(TUniqueId loadId, long txnId) throws UserException { + Preconditions.checkNotNull(planner); + Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId); + Table table = db.getTableOrMetaException(tableId, Table.TableType.OLAP); + table.readLock(); + try { + TPipelineFragmentParams planParams = planner.planForPipeline(loadId); + // add table indexes to transaction state + TransactionState txnState = Env.getCurrentGlobalTransactionMgr().getTransactionState(db.getId(), txnId); + if (txnState == null) { + throw new MetaNotFoundException("txn does not exist: " + txnId); + } + txnState.addTableIndexes(planner.getDestTable()); + + return planParams; + } finally { + table.readUnlock(); + } + } + // if task not exists, before aborted will reset the txn attachment to null, task will not be updated // if task pass the checker, task will be updated by attachment // *** Please do not call before individually. It must be combined use with after *** diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index e5789e784f..ce65cf3564 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -54,6 +54,8 @@ import org.apache.doris.thrift.TBrokerFileStatus; import org.apache.doris.thrift.TExecPlanFragmentParams; import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TPipelineFragmentParams; +import org.apache.doris.thrift.TPipelineInstanceParams; import org.apache.doris.thrift.TPlanFragmentExecParams; import org.apache.doris.thrift.TQueryGlobals; import org.apache.doris.thrift.TQueryOptions; @@ -309,6 +311,206 @@ public class StreamLoadPlanner { return params; } + public TPipelineFragmentParams planForPipeline(TUniqueId loadId) throws UserException { + if (destTable.getKeysType() != KeysType.UNIQUE_KEYS + && taskInfo.getMergeType() != LoadTask.MergeType.APPEND) { + throw new AnalysisException("load by MERGE or DELETE is only supported in unique tables."); + } + if (taskInfo.getMergeType() != LoadTask.MergeType.APPEND + && !destTable.hasDeleteSign()) { + throw new AnalysisException("load by MERGE or DELETE need to upgrade table to support batch delete."); + } + + if (destTable.hasSequenceCol() && !taskInfo.hasSequenceCol() && destTable.getSequenceMapCol() == null) { + throw new UserException("Table " + destTable.getName() + + " has sequence column, need to specify the sequence column"); + } + if (!destTable.hasSequenceCol() && taskInfo.hasSequenceCol()) { + throw new UserException("There is no sequence column in the table " + destTable.getName()); + } + resetAnalyzer(); + // construct tuple descriptor, used for dataSink + tupleDesc = descTable.createTupleDescriptor("DstTableTuple"); + TupleDescriptor scanTupleDesc = tupleDesc; + // note: we use two tuples separately for Scan and Sink here to avoid wrong nullable info. + // construct tuple descriptor, used for scanNode + scanTupleDesc = descTable.createTupleDescriptor("ScanTuple"); + boolean negative = taskInfo.getNegative(); + // get partial update related info + boolean isPartialUpdate = taskInfo.isPartialUpdate(); + if (isPartialUpdate && !destTable.getEnableUniqueKeyMergeOnWrite()) { + throw new UserException("Only unique key merge on write support partial update"); + } + HashSet<String> partialUpdateInputColumns = new HashSet<>(); + if (isPartialUpdate) { + for (Column col : destTable.getFullSchema()) { + boolean existInExpr = false; + for (ImportColumnDesc importColumnDesc : taskInfo.getColumnExprDescs().descs) { + if (importColumnDesc.getColumnName() != null + && importColumnDesc.getColumnName().equals(col.getName())) { + if (!col.isVisible()) { + throw new UserException("Partial update should not include invisible column: " + + col.getName()); + } + partialUpdateInputColumns.add(col.getName()); + existInExpr = true; + break; + } + } + if (col.isKey() && !existInExpr) { + throw new UserException("Partial update should include all key columns, missing: " + col.getName()); + } + } + } + // here we should be full schema to fill the descriptor table + for (Column col : destTable.getFullSchema()) { + if (isPartialUpdate && !partialUpdateInputColumns.contains(col.getName())) { + continue; + } + SlotDescriptor slotDesc = descTable.addSlotDescriptor(tupleDesc); + slotDesc.setIsMaterialized(true); + slotDesc.setColumn(col); + slotDesc.setIsNullable(col.isAllowNull()); + SlotDescriptor scanSlotDesc = descTable.addSlotDescriptor(scanTupleDesc); + scanSlotDesc.setIsMaterialized(true); + scanSlotDesc.setColumn(col); + scanSlotDesc.setIsNullable(col.isAllowNull()); + for (ImportColumnDesc importColumnDesc : taskInfo.getColumnExprDescs().descs) { + try { + if (!importColumnDesc.isColumn() && importColumnDesc.getColumnName() != null + && importColumnDesc.getColumnName().equals(col.getName())) { + scanSlotDesc.setIsNullable(importColumnDesc.getExpr().isNullable()); + break; + } + } catch (Exception e) { + // An exception may be thrown here because the `importColumnDesc.getExpr()` is not analyzed now. + // We just skip this case here. + } + } + if (negative && !col.isKey() && col.getAggregationType() != AggregateType.SUM) { + throw new DdlException("Column is not SUM AggregateType. column:" + col.getName()); + } + } + + // Plan scan tuple of dynamic table + if (destTable.isDynamicSchema()) { + descTable.addReferencedTable(destTable); + scanTupleDesc.setTable(destTable); + // add a implict container column "DORIS_DYNAMIC_COL" for dynamic columns + SlotDescriptor slotDesc = descTable.addSlotDescriptor(scanTupleDesc); + Column col = new Column(Column.DYNAMIC_COLUMN_NAME, Type.VARIANT, false, null, false, "", + "stream load auto dynamic column"); + slotDesc.setIsMaterialized(true); + slotDesc.setColumn(col); + // Non-nullable slots will have 0 for the byte offset and -1 for the bit mask + slotDesc.setNullIndicatorBit(-1); + slotDesc.setNullIndicatorByte(0); + slotDesc.setIsNullable(false); + LOG.debug("plan tupleDesc {}", scanTupleDesc.toString()); + } + + // create scan node + FileLoadScanNode fileScanNode = new FileLoadScanNode(new PlanNodeId(0), scanTupleDesc); + // 1. create file group + DataDescription dataDescription = new DataDescription(destTable.getName(), taskInfo); + dataDescription.analyzeWithoutCheckPriv(db.getFullName()); + BrokerFileGroup fileGroup = new BrokerFileGroup(dataDescription); + fileGroup.parse(db, dataDescription); + // 2. create dummy file status + TBrokerFileStatus fileStatus = new TBrokerFileStatus(); + if (taskInfo.getFileType() == TFileType.FILE_LOCAL) { + fileStatus.setPath(taskInfo.getPath()); + fileStatus.setIsDir(false); + fileStatus.setSize(taskInfo.getFileSize()); // must set to -1, means stream. + } else { + fileStatus.setPath(""); + fileStatus.setIsDir(false); + fileStatus.setSize(-1); // must set to -1, means stream. + } + // The load id will pass to csv reader to find the stream load context from new load stream manager + fileScanNode.setLoadInfo(loadId, taskInfo.getTxnId(), destTable, BrokerDesc.createForStreamLoad(), + fileGroup, fileStatus, taskInfo.isStrictMode(), taskInfo.getFileType(), taskInfo.getHiddenColumns(), + taskInfo.isPartialUpdate()); + scanNode = fileScanNode; + + scanNode.init(analyzer); + scanNode.finalize(analyzer); + descTable.computeStatAndMemLayout(); + + int timeout = taskInfo.getTimeout(); + if (taskInfo instanceof RoutineLoadJob) { + // For routine load, make the timeout fo plan fragment larger than MaxIntervalS config. + // So that the execution won't be killed before consuming finished. + timeout *= 2; + } + + // create dest sink + List<Long> partitionIds = getAllPartitionIds(); + OlapTableSink olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds, + Config.enable_single_replica_load); + olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout, + taskInfo.getSendBatchParallelism(), taskInfo.isLoadToSingleTablet()); + olapTableSink.setPartialUpdateInputColumns(isPartialUpdate, partialUpdateInputColumns); + olapTableSink.complete(); + + // for stream load, we only need one fragment, ScanNode -> DataSink. + // OlapTableSink can dispatch data to corresponding node. + PlanFragment fragment = new PlanFragment(new PlanFragmentId(0), scanNode, DataPartition.UNPARTITIONED); + fragment.setSink(olapTableSink); + + fragment.finalize(null); + + TPipelineFragmentParams pipParams = new TPipelineFragmentParams(); + pipParams.setProtocolVersion(PaloInternalServiceVersion.V1); + pipParams.setFragment(fragment.toThrift()); + + pipParams.setDescTbl(analyzer.getDescTbl().toThrift()); + pipParams.setCoord(new TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port)); + pipParams.setQueryId(loadId); + pipParams.per_exch_num_senders = Maps.newHashMap(); + pipParams.destinations = Lists.newArrayList(); + pipParams.setNumSenders(1); + + TPipelineInstanceParams localParams = new TPipelineInstanceParams(); + localParams.setFragmentInstanceId(new TUniqueId(loadId.hi, loadId.lo + 1)); + + Map<Integer, List<TScanRangeParams>> perNodeScanRange = Maps.newHashMap(); + List<TScanRangeParams> scanRangeParams = Lists.newArrayList(); + for (TScanRangeLocations locations : scanNode.getScanRangeLocations(0)) { + scanRangeParams.add(new TScanRangeParams(locations.getScanRange())); + } + // For stream load, only one sender + localParams.setSenderId(0); + perNodeScanRange.put(scanNode.getId().asInt(), scanRangeParams); + localParams.setPerNodeScanRanges(perNodeScanRange); + pipParams.setLocalParams(Lists.newArrayList()); + pipParams.getLocalParams().add(localParams); + TQueryOptions queryOptions = new TQueryOptions(); + queryOptions.setQueryType(TQueryType.LOAD); + queryOptions.setQueryTimeout(timeout); + queryOptions.setExecutionTimeout(timeout); + queryOptions.setMemLimit(taskInfo.getMemLimit()); + // for stream load, we use exec_mem_limit to limit the memory usage of load channel. + queryOptions.setLoadMemLimit(taskInfo.getMemLimit()); + //load + queryOptions.setEnablePipelineEngine(Config.enable_pipeline_load); + queryOptions.setBeExecVersion(Config.be_exec_version); + queryOptions.setIsReportSuccess(taskInfo.getEnableProfile()); + + pipParams.setQueryOptions(queryOptions); + TQueryGlobals queryGlobals = new TQueryGlobals(); + queryGlobals.setNowString(TimeUtils.DATETIME_FORMAT.format(LocalDateTime.now())); + queryGlobals.setTimestampMs(System.currentTimeMillis()); + queryGlobals.setTimeZone(taskInfo.getTimezone()); + queryGlobals.setLoadZeroTolerance(taskInfo.getMaxFilterRatio() <= 0.0); + queryGlobals.setNanoSeconds(LocalDateTime.now().getNano()); + + pipParams.setQueryGlobals(queryGlobals); + + // LOG.debug("stream load txn id: {}, plan: {}", streamLoadTask.getTxnId(), params); + return pipParams; + } + // get all specified partition ids. // if no partition specified, return null private List<Long> getAllPartitionIds() throws DdlException, AnalysisException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java index 6f156030e7..77d4c52ac9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java @@ -19,6 +19,7 @@ package org.apache.doris.qe; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.common.Config; import org.apache.doris.common.UserException; import org.apache.doris.planner.StreamLoadPlanner; import org.apache.doris.proto.InternalService; @@ -33,6 +34,8 @@ import org.apache.doris.thrift.TExecPlanFragmentParamsList; import org.apache.doris.thrift.TFileCompressType; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TPipelineFragmentParams; +import org.apache.doris.thrift.TPipelineFragmentParamsList; import org.apache.doris.thrift.TScanRangeParams; import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TStreamLoadPutRequest; @@ -66,44 +69,87 @@ public class InsertStreamTxnExecutor { StreamLoadPlanner planner = new StreamLoadPlanner( txnEntry.getDb(), (OlapTable) txnEntry.getTable(), streamLoadTask); // Will using load id as query id in fragment - TExecPlanFragmentParams tRequest = planner.plan(streamLoadTask.getId()); - BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().needQueryAvailable().build(); - List<Long> beIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1); - if (beIds.isEmpty()) { - throw new UserException("No available backend to match the policy: " + policy); - } + if (Config.enable_pipeline_load) { + TPipelineFragmentParams tRequest = planner.planForPipeline(streamLoadTask.getId()); + BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().needQueryAvailable().build(); + List<Long> beIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1); + if (beIds.isEmpty()) { + throw new UserException("No available backend to match the policy: " + policy); + } - tRequest.setTxnConf(txnConf).setImportLabel(txnEntry.getLabel()); - for (Map.Entry<Integer, List<TScanRangeParams>> entry : tRequest.params.per_node_scan_ranges.entrySet()) { - for (TScanRangeParams scanRangeParams : entry.getValue()) { - scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setFormatType( - TFileFormatType.FORMAT_PROTO); - scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setCompressType( - TFileCompressType.PLAIN); + tRequest.setTxnConf(txnConf).setImportLabel(txnEntry.getLabel()); + for (Map.Entry<Integer, List<TScanRangeParams>> entry : tRequest.local_params.get(0) + .per_node_scan_ranges.entrySet()) { + for (TScanRangeParams scanRangeParams : entry.getValue()) { + scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setFormatType( + TFileFormatType.FORMAT_PROTO); + scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setCompressType( + TFileCompressType.PLAIN); + } + } + txnConf.setFragmentInstanceId(tRequest.local_params.get(0).fragment_instance_id); + this.loadId = request.getLoadId(); + this.txnEntry.setpLoadId(Types.PUniqueId.newBuilder() + .setHi(loadId.getHi()) + .setLo(loadId.getLo()).build()); + + Backend backend = Env.getCurrentSystemInfo().getIdToBackend().get(beIds.get(0)); + txnConf.setUserIp(backend.getHost()); + txnEntry.setBackend(backend); + TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); + try { + TPipelineFragmentParamsList paramsList = new TPipelineFragmentParamsList(); + paramsList.addToParamsList(tRequest); + Future<InternalService.PExecPlanFragmentResult> future = + BackendServiceProxy.getInstance().execPlanFragmentsAsync(address, paramsList, false); + InternalService.PExecPlanFragmentResult result = future.get(5, TimeUnit.SECONDS); + TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); + if (code != TStatusCode.OK) { + throw new TException("failed to execute plan fragment: " + result.getStatus().getErrorMsgsList()); + } + } catch (RpcException e) { + throw new TException(e); + } + } else { + TExecPlanFragmentParams tRequest = planner.plan(streamLoadTask.getId()); + BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().needQueryAvailable().build(); + List<Long> beIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1); + if (beIds.isEmpty()) { + throw new UserException("No available backend to match the policy: " + policy); } - } - txnConf.setFragmentInstanceId(tRequest.params.fragment_instance_id); - this.loadId = request.getLoadId(); - this.txnEntry.setpLoadId(Types.PUniqueId.newBuilder() - .setHi(loadId.getHi()) - .setLo(loadId.getLo()).build()); - Backend backend = Env.getCurrentSystemInfo().getIdToBackend().get(beIds.get(0)); - txnConf.setUserIp(backend.getHost()); - txnEntry.setBackend(backend); - TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); - try { - TExecPlanFragmentParamsList paramsList = new TExecPlanFragmentParamsList(); - paramsList.addToParamsList(tRequest); - Future<InternalService.PExecPlanFragmentResult> future = - BackendServiceProxy.getInstance().execPlanFragmentsAsync(address, paramsList, false); - InternalService.PExecPlanFragmentResult result = future.get(5, TimeUnit.SECONDS); - TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); - if (code != TStatusCode.OK) { - throw new TException("failed to execute plan fragment: " + result.getStatus().getErrorMsgsList()); + tRequest.setTxnConf(txnConf).setImportLabel(txnEntry.getLabel()); + for (Map.Entry<Integer, List<TScanRangeParams>> entry : tRequest.params.per_node_scan_ranges.entrySet()) { + for (TScanRangeParams scanRangeParams : entry.getValue()) { + scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setFormatType( + TFileFormatType.FORMAT_PROTO); + scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setCompressType( + TFileCompressType.PLAIN); + } + } + txnConf.setFragmentInstanceId(tRequest.params.fragment_instance_id); + this.loadId = request.getLoadId(); + this.txnEntry.setpLoadId(Types.PUniqueId.newBuilder() + .setHi(loadId.getHi()) + .setLo(loadId.getLo()).build()); + + Backend backend = Env.getCurrentSystemInfo().getIdToBackend().get(beIds.get(0)); + txnConf.setUserIp(backend.getHost()); + txnEntry.setBackend(backend); + TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); + try { + TExecPlanFragmentParamsList paramsList = new TExecPlanFragmentParamsList(); + paramsList.addToParamsList(tRequest); + Future<InternalService.PExecPlanFragmentResult> future = + BackendServiceProxy.getInstance().execPlanFragmentsAsync(address, paramsList, false); + InternalService.PExecPlanFragmentResult result = future.get(5, TimeUnit.SECONDS); + TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); + if (code != TStatusCode.OK) { + throw new TException("failed to execute plan fragment: " + result.getStatus().getErrorMsgsList()); + } + } catch (RpcException e) { + throw new TException(e); } - } catch (RpcException e) { - throw new TException(e); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 8f21f64b2f..b650100823 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -127,6 +127,7 @@ import org.apache.doris.thrift.TMasterOpResult; import org.apache.doris.thrift.TMasterResult; import org.apache.doris.thrift.TMySqlLoadAcquireTokenResult; import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TPipelineFragmentParams; import org.apache.doris.thrift.TPrivilegeCtrl; import org.apache.doris.thrift.TPrivilegeHier; import org.apache.doris.thrift.TPrivilegeStatus; @@ -1568,7 +1569,11 @@ public class FrontendServiceImpl implements FrontendService.Iface { TStatus status = new TStatus(TStatusCode.OK); result.setStatus(status); try { - result.setParams(streamLoadPutImpl(request)); + if (Config.enable_pipeline_load) { + result.setPipelineParams(pipelineStreamLoadPutImpl(request)); + } else { + result.setParams(streamLoadPutImpl(request)); + } } catch (UserException e) { LOG.warn("failed to get stream load plan: {}", e.getMessage()); status.setStatusCode(TStatusCode.ANALYSIS_ERROR); @@ -1621,6 +1626,45 @@ public class FrontendServiceImpl implements FrontendService.Iface { } } + private TPipelineFragmentParams pipelineStreamLoadPutImpl(TStreamLoadPutRequest request) throws UserException { + String cluster = request.getCluster(); + if (Strings.isNullOrEmpty(cluster)) { + cluster = SystemInfoService.DEFAULT_CLUSTER; + } + + Env env = Env.getCurrentEnv(); + String fullDbName = ClusterNamespace.getFullName(cluster, request.getDb()); + Database db = env.getInternalCatalog().getDbNullable(fullDbName); + if (db == null) { + String dbName = fullDbName; + if (Strings.isNullOrEmpty(request.getCluster())) { + dbName = request.getDb(); + } + throw new UserException("unknown database, database=" + dbName); + } + long timeoutMs = request.isSetThriftRpcTimeoutMs() ? request.getThriftRpcTimeoutMs() : 5000; + Table table = db.getTableOrMetaException(request.getTbl(), TableType.OLAP); + if (!table.tryReadLock(timeoutMs, TimeUnit.MILLISECONDS)) { + throw new UserException( + "get table read lock timeout, database=" + fullDbName + ",table=" + table.getName()); + } + try { + StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request); + StreamLoadPlanner planner = new StreamLoadPlanner(db, (OlapTable) table, streamLoadTask); + TPipelineFragmentParams plan = planner.planForPipeline(streamLoadTask.getId()); + // add table indexes to transaction state + TransactionState txnState = Env.getCurrentGlobalTransactionMgr() + .getTransactionState(db.getId(), request.getTxnId()); + if (txnState == null) { + throw new UserException("txn does not exist: " + request.getTxnId()); + } + txnState.addTableIndexes((OlapTable) table); + return plan; + } finally { + table.readUnlock(); + } + } + @Override public TStatus snapshotLoaderReport(TSnapshotLoaderReportRequest request) throws TException { if (Env.getCurrentEnv().getBackupHandler().report(request.getTaskType(), request.getJobId(), diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index 60beb0b27e..62f7909304 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -65,6 +65,7 @@ struct TRoutineLoadTask { 12: optional TKafkaLoadInfo kafka_load_info 13: optional PaloInternalService.TExecPlanFragmentParams params 14: optional PlanNodes.TFileFormatType format + 15: optional PaloInternalService.TPipelineFragmentParams pipeline_params } struct TKafkaMetaProxyRequest { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 1f64965a1c..5aecdc4ba2 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -609,6 +609,7 @@ struct TStreamLoadPutResult { 1: required Status.TStatus status // valid when status is OK 2: optional PaloInternalService.TExecPlanFragmentParams params + 3: optional PaloInternalService.TPipelineFragmentParams pipeline_params } struct TKafkaRLTaskProgress { diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 0c1a4feee8..734d70bb1a 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -625,6 +625,7 @@ struct TPipelineFragmentParams { 23: optional Planner.TPlanFragment fragment 24: list<TPipelineInstanceParams> local_params 26: optional list<TPipelineWorkloadGroup> workload_groups + 27: optional TTxnParams txn_conf } struct TPipelineFragmentParamsList { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org