This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch streaming-job-dev
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/streaming-job-dev by this push:
new 0fbe6836bbe [revert](streaming job) revert "implement offset
persistence and replay in cloud mode" (#56149)
0fbe6836bbe is described below
commit 0fbe6836bbe33c81e00995d89d934a5089f483bd
Author: hui lai <[email protected]>
AuthorDate: Wed Sep 17 15:53:01 2025 +0800
[revert](streaming job) revert "implement offset persistence and replay in
cloud mode" (#56149)
### What problem does this PR solve?
Revert "implement offset persistence and replay in cloud mode"
---
cloud/src/common/bvars.cpp | 5 -
cloud/src/common/bvars.h | 3 -
cloud/src/meta-service/meta_service.h | 13 --
cloud/src/meta-service/meta_service_txn.cpp | 137 ---------------------
cloud/src/meta-store/keys.cpp | 13 +-
cloud/src/meta-store/keys.h | 5 -
.../apache/doris/cloud/rpc/MetaServiceClient.java | 6 -
.../apache/doris/cloud/rpc/MetaServiceProxy.java | 5 -
.../transaction/CloudGlobalTransactionMgr.java | 14 ---
.../apache/doris/cloud/transaction/TxnUtil.java | 38 ------
.../insert/streaming/StreamingInsertJob.java | 44 -------
.../StreamingTaskTxnCommitAttachment.java | 10 --
.../java/org/apache/doris/job/offset/Offset.java | 4 -
.../org/apache/doris/job/offset/s3/S3Offset.java | 10 --
gensrc/proto/cloud.proto | 31 +----
15 files changed, 3 insertions(+), 335 deletions(-)
diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp
index 8ea24c4a1ba..e495b5ea95b 100644
--- a/cloud/src/common/bvars.cpp
+++ b/cloud/src/common/bvars.cpp
@@ -85,7 +85,6 @@ BvarLatencyRecorderWithTag
g_bvar_ms_remove_delete_bitmap("ms", "remove_delete_b
BvarLatencyRecorderWithTag g_bvar_ms_remove_delete_bitmap_update_lock("ms",
"remove_delete_bitmap_update_lock");
BvarLatencyRecorderWithTag g_bvar_ms_get_instance("ms", "get_instance");
BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach("ms",
"get_rl_task_commit_attach");
-BvarLatencyRecorderWithTag g_bvar_ms_get_streaming_task_commit_attach("ms",
"get_streaming_task_commit_attach");
BvarLatencyRecorderWithTag g_bvar_ms_reset_rl_progress("ms",
"reset_rl_progress");
BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id("ms", "get_txn_id");
BvarLatencyRecorderWithTag g_bvar_ms_start_tablet_job("ms",
"start_tablet_job");
@@ -365,8 +364,6 @@ mBvarInt64Adder
g_bvar_rpc_kv_precommit_txn_get_counter("rpc_kv_precommit_txn_ge
mBvarInt64Adder
g_bvar_rpc_kv_precommit_txn_put_counter("rpc_kv_precommit_txn_put_counter",{"instance_id"});
// get_rl_task_commit_attach
mBvarInt64Adder
g_bvar_rpc_kv_get_rl_task_commit_attach_get_counter("rpc_kv_get_rl_task_commit_attach_get_counter",{"instance_id"});
-// get_streaming_task_commit_attach
-mBvarInt64Adder
g_bvar_rpc_kv_get_streaming_task_commit_attach_get_counter("rpc_kv_get_streaming_task_commit_attach_get_counter",{"instance_id"});
// reset_rl_progress
mBvarInt64Adder
g_bvar_rpc_kv_reset_rl_progress_get_counter("rpc_kv_reset_rl_progress_get_counter",{"instance_id"});
mBvarInt64Adder
g_bvar_rpc_kv_reset_rl_progress_put_counter("rpc_kv_reset_rl_progress_put_counter",{"instance_id"});
@@ -530,8 +527,6 @@ mBvarInt64Adder
g_bvar_rpc_kv_precommit_txn_get_bytes("rpc_kv_precommit_txn_get_
mBvarInt64Adder
g_bvar_rpc_kv_precommit_txn_put_bytes("rpc_kv_precommit_txn_put_bytes",{"instance_id"});
// get_rl_task_commit_attach
mBvarInt64Adder
g_bvar_rpc_kv_get_rl_task_commit_attach_get_bytes("rpc_kv_get_rl_task_commit_attach_get_bytes",{"instance_id"});
-// get_streaming_task_commit_attach
-mBvarInt64Adder
g_bvar_rpc_kv_get_streaming_task_commit_attach_get_bytes("rpc_kv_get_streaming_task_commit_attach_get_bytes",{"instance_id"});
// reset_rl_progress
mBvarInt64Adder
g_bvar_rpc_kv_reset_rl_progress_get_bytes("rpc_kv_reset_rl_progress_get_bytes",{"instance_id"});
mBvarInt64Adder
g_bvar_rpc_kv_reset_rl_progress_put_bytes("rpc_kv_reset_rl_progress_put_bytes",{"instance_id"});
diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h
index e72d0fcd376..8fb5973249f 100644
--- a/cloud/src/common/bvars.h
+++ b/cloud/src/common/bvars.h
@@ -251,7 +251,6 @@ extern BvarLatencyRecorderWithTag
g_bvar_ms_get_cluster_status;
extern BvarLatencyRecorderWithTag g_bvar_ms_set_cluster_status;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_instance;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach;
-extern BvarLatencyRecorderWithTag g_bvar_ms_get_streaming_task_commit_attach;
extern BvarLatencyRecorderWithTag g_bvar_ms_reset_rl_progress;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id;
extern BvarLatencyRecorderWithTag g_bvar_ms_check_kv;
@@ -471,7 +470,6 @@ extern mBvarInt64Adder g_bvar_rpc_kv_begin_txn_put_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_precommit_txn_get_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_precommit_txn_put_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_get_rl_task_commit_attach_get_counter;
-extern mBvarInt64Adder
g_bvar_rpc_kv_get_streaming_task_commit_attach_get_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_get_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_put_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_del_counter;
@@ -584,7 +582,6 @@ extern mBvarInt64Adder g_bvar_rpc_kv_begin_txn_put_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_precommit_txn_get_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_precommit_txn_put_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_get_rl_task_commit_attach_get_bytes;
-extern mBvarInt64Adder
g_bvar_rpc_kv_get_streaming_task_commit_attach_get_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_get_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_put_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_del_bytes;
diff --git a/cloud/src/meta-service/meta_service.h
b/cloud/src/meta-service/meta_service.h
index c7a37277fdc..38cc77aebb0 100644
--- a/cloud/src/meta-service/meta_service.h
+++ b/cloud/src/meta-service/meta_service.h
@@ -319,11 +319,6 @@ public:
GetRLTaskCommitAttachResponse* response,
::google::protobuf::Closure* done) override;
- void get_streaming_task_commit_attach(::google::protobuf::RpcController*
controller,
- const
GetStreamingTaskCommitAttachRequest* request,
-
GetStreamingTaskCommitAttachResponse* response,
- ::google::protobuf::Closure* done)
override;
-
void reset_rl_progress(::google::protobuf::RpcController* controller,
const ResetRLProgressRequest* request,
ResetRLProgressResponse* response,
::google::protobuf::Closure* done) override;
@@ -825,14 +820,6 @@ public:
done);
}
- void get_streaming_task_commit_attach(::google::protobuf::RpcController*
controller,
- const
GetStreamingTaskCommitAttachRequest* request,
-
GetStreamingTaskCommitAttachResponse* response,
- ::google::protobuf::Closure* done)
override {
- call_impl(&cloud::MetaService::get_streaming_task_commit_attach,
controller, request,
- response, done);
- }
-
void reset_rl_progress(::google::protobuf::RpcController* controller,
const ResetRLProgressRequest* request,
ResetRLProgressResponse* response,
::google::protobuf::Closure* done) override {
diff --git a/cloud/src/meta-service/meta_service_txn.cpp
b/cloud/src/meta-service/meta_service_txn.cpp
index bd4cf6fb036..ce128f6d30e 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -610,75 +610,6 @@ void put_routine_load_progress(MetaServiceCode& code,
std::string& msg,
<< " routine load new progress: " <<
new_progress_info.ShortDebugString();
}
-void put_streaming_job_meta(MetaServiceCode& code, std::string& msg, const
std::string& instance_id,
- const CommitTxnRequest* request, Transaction* txn,
int64_t db_id) {
- std::stringstream ss;
- int64_t txn_id = request->txn_id();
- if (!request->has_commit_attachment()) {
- ss << "failed to get commit attachment from req, db_id=" << db_id << "
txn_id=" << txn_id;
- msg = ss.str();
- return;
- }
- TxnCommitAttachmentPB txn_commit_attachment = request->commit_attachment();
- StreamingTaskCommitAttachmentPB commit_attachment =
- txn_commit_attachment.streaming_task_txn_commit_attachment();
- int64_t job_id = commit_attachment.job_id();
-
- std::string streaming_meta_key;
- std::string streaming_meta_val;
- bool prev_meta_existed = true;
- StreamingJobMetaKeyInfo streaming_meta_key_info {instance_id, db_id,
job_id};
- streaming_job_meta_key_info(streaming_meta_key_info, &streaming_meta_key);
- TxnErrorCode err = txn->get(streaming_meta_key, &streaming_meta_val);
- if (err != TxnErrorCode::TXN_OK) {
- if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
- prev_meta_existed = false;
- } else {
- code = cast_as<ErrCategory::READ>(err);
- ss << "failed to get streaming job meta, db_id=" << db_id << "
txn_id=" << txn_id
- << " err=" << err;
- msg = ss.str();
- return;
- }
- }
-
- StreamingTaskCommitAttachmentPB new_meta_info;
- if (prev_meta_existed) {
- if (!new_meta_info.ParseFromString(streaming_meta_val)) {
- code = MetaServiceCode::PROTOBUF_PARSE_ERR;
- ss << "failed to parse streaming job meta, db_id=" << db_id << "
txn_id=" << txn_id;
- msg = ss.str();
- return;
- }
- new_meta_info.set_scanned_rows(new_meta_info.scanned_rows() +
- commit_attachment.scanned_rows());
- new_meta_info.set_load_bytes(new_meta_info.load_bytes() +
commit_attachment.load_bytes());
- new_meta_info.set_file_number(new_meta_info.file_number() +
- commit_attachment.file_number());
- new_meta_info.set_file_size(new_meta_info.file_size() +
commit_attachment.file_size());
- } else {
- new_meta_info.set_job_id(commit_attachment.job_id());
- new_meta_info.set_scanned_rows(commit_attachment.scanned_rows());
- new_meta_info.set_load_bytes(commit_attachment.load_bytes());
- new_meta_info.set_file_number(commit_attachment.file_number());
- new_meta_info.set_file_size(commit_attachment.file_size());
- }
- if (commit_attachment.has_offset()) {
- new_meta_info.set_offset(commit_attachment.offset());
- }
- std::string new_meta_val;
- if (!new_meta_info.SerializeToString(&new_meta_val)) {
- code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
- ss << "failed to serialize new streaming meta val, txn_id=" << txn_id;
- msg = ss.str();
- return;
- }
-
- txn->put(streaming_meta_key, new_meta_val);
- LOG(INFO) << "put streaming_meta_key key=" << hex(streaming_meta_key)
- << " streaming job new meta: " <<
new_meta_info.ShortDebugString();
-}
-
void
MetaServiceImpl::get_rl_task_commit_attach(::google::protobuf::RpcController*
controller,
const
GetRLTaskCommitAttachRequest* request,
GetRLTaskCommitAttachResponse*
response,
@@ -747,64 +678,6 @@ void
MetaServiceImpl::get_rl_task_commit_attach(::google::protobuf::RpcControlle
}
}
-void MetaServiceImpl::get_streaming_task_commit_attach(
- ::google::protobuf::RpcController* controller,
- const GetStreamingTaskCommitAttachRequest* request,
- GetStreamingTaskCommitAttachResponse* response,
::google::protobuf::Closure* done) {
- RPC_PREPROCESS(get_streaming_task_commit_attach, get);
- instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
- if (instance_id.empty()) {
- code = MetaServiceCode::INVALID_ARGUMENT;
- msg = "empty instance_id";
- LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
- return;
- }
- RPC_RATE_LIMIT(get_streaming_task_commit_attach)
-
- TxnErrorCode err = txn_kv_->create_txn(&txn);
- if (err != TxnErrorCode::TXN_OK) {
- code = cast_as<ErrCategory::CREATE>(err);
- ss << "filed to create txn, err=" << err;
- msg = ss.str();
- return;
- }
-
- if (!request->has_db_id() || !request->has_job_id()) {
- code = MetaServiceCode::INVALID_ARGUMENT;
- msg = "empty db_id or job_id";
- LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
- return;
- }
-
- int64_t db_id = request->db_id();
- int64_t job_id = request->job_id();
- std::string streaming_meta_key;
- std::string streaming_meta_val;
- StreamingJobMetaKeyInfo streaming_meta_key_info {instance_id, db_id,
job_id};
- streaming_job_meta_key_info(streaming_meta_key_info, &streaming_meta_key);
- err = txn->get(streaming_meta_key, &streaming_meta_val);
- if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
- code = MetaServiceCode::STREAMING_JOB_PROGRESS_NOT_FOUND;
- ss << "progress info not found, db_id=" << db_id << " job_id=" <<
job_id << " err=" << err;
- msg = ss.str();
- return;
- } else if (err != TxnErrorCode::TXN_OK) {
- code = cast_as<ErrCategory::READ>(err);
- ss << "failed to get progress info, db_id=" << db_id << " job_id=" <<
job_id
- << " err=" << err;
- msg = ss.str();
- return;
- }
-
- StreamingTaskCommitAttachmentPB* commit_attach =
response->mutable_commit_attach();
- if (!commit_attach->ParseFromString(streaming_meta_val)) {
- code = MetaServiceCode::PROTOBUF_PARSE_ERR;
- ss << "failed to parse meta info, db_id=" << db_id << " job_id=" <<
job_id;
- msg = ss.str();
- return;
- }
-}
-
void MetaServiceImpl::reset_rl_progress(::google::protobuf::RpcController*
controller,
const ResetRLProgressRequest* request,
ResetRLProgressResponse* response,
@@ -1699,11 +1572,6 @@ void MetaServiceImpl::commit_txn_immediately(
put_routine_load_progress(code, msg, instance_id, request,
txn.get(), db_id);
}
- if (txn_info.load_job_source_type() ==
- LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_STREAMING_JOB) {
- put_streaming_job_meta(code, msg, instance_id, request, txn.get(),
db_id);
- }
-
LOG(INFO) << "xxx commit_txn put recycle_key key=" << hex(recycle_key)
<< " txn_id=" << txn_id;
LOG(INFO) << "commit_txn put_size=" << txn->put_bytes()
@@ -2097,11 +1965,6 @@ void MetaServiceImpl::commit_txn_eventually(
put_routine_load_progress(code, msg, instance_id, request,
txn.get(), db_id);
}
- if (txn_info.load_job_source_type() ==
- LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_STREAMING_JOB) {
- put_streaming_job_meta(code, msg, instance_id, request, txn.get(),
db_id);
- }
-
// save versions for partition
int64_t version_update_time_ms =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
diff --git a/cloud/src/meta-store/keys.cpp b/cloud/src/meta-store/keys.cpp
index 7b2b75c4d55..e23f84771ab 100644
--- a/cloud/src/meta-store/keys.cpp
+++ b/cloud/src/meta-store/keys.cpp
@@ -64,7 +64,6 @@ static const char* STATS_KEY_INFIX_TABLET =
"tablet";
static const char* JOB_KEY_INFIX_TABLET = "tablet";
static const char* JOB_KEY_INFIX_RL_PROGRESS =
"routine_load_progress";
-static const char* JOB_KEY_INFIX_STREAMING_JOB_META = "streaming_job_meta";
static const char* JOB_KEY_INFIX_RESTORE_TABLET = "restore_tablet";
static const char* JOB_KEY_INFIX_RESTORE_ROWSET = "restore_rowset";
@@ -145,7 +144,7 @@ static void encode_prefix(const T& t, std::string* key) {
MetaDeleteBitmapInfo, MetaDeleteBitmapUpdateLockInfo,
MetaPendingDeleteBitmapInfo, PartitionVersionKeyInfo,
RecycleIndexKeyInfo, RecyclePartKeyInfo, RecycleRowsetKeyInfo,
RecycleTxnKeyInfo, RecycleStageKeyInfo,
StatsTabletKeyInfo, TableVersionKeyInfo, JobRestoreTabletKeyInfo,
JobRestoreRowsetKeyInfo,
- JobTabletKeyInfo, JobRecycleKeyInfo, RLJobProgressKeyInfo,
StreamingJobMetaKeyInfo,
+ JobTabletKeyInfo, JobRecycleKeyInfo, RLJobProgressKeyInfo,
CopyJobKeyInfo, CopyFileKeyInfo, StorageVaultKeyInfo,
MetaSchemaPBDictionaryInfo,
MowTabletJobInfo>);
@@ -182,8 +181,7 @@ static void encode_prefix(const T& t, std::string* key) {
encode_bytes(STATS_KEY_PREFIX, key);
} else if constexpr (std::is_same_v<T, JobTabletKeyInfo>
|| std::is_same_v<T, JobRecycleKeyInfo>
- || std::is_same_v<T, RLJobProgressKeyInfo>
- || std::is_same_v<T, StreamingJobMetaKeyInfo>) {
+ || std::is_same_v<T, RLJobProgressKeyInfo>) {
encode_bytes(JOB_KEY_PREFIX, key);
} else if constexpr (std::is_same_v<T, CopyJobKeyInfo>
|| std::is_same_v<T, CopyFileKeyInfo>) {
@@ -465,13 +463,6 @@ void rl_job_progress_key_info(const RLJobProgressKeyInfo&
in, std::string* out)
encode_int64(std::get<2>(in), out); // job_id
}
-void streaming_job_meta_key_info(const StreamingJobMetaKeyInfo& in,
std::string* out) {
- encode_prefix(in, out); // 0x01 "job"
${instance_id}
- encode_bytes(JOB_KEY_INFIX_STREAMING_JOB_META, out); //
"streaming_job_meta"
- encode_int64(std::get<1>(in), out); // db_id
- encode_int64(std::get<2>(in), out); // job_id
-}
-
void job_restore_tablet_key(const JobRestoreTabletKeyInfo& in, std::string*
out) {
encode_prefix(in, out); // 0x01 "job"
${instance_id}
encode_bytes(JOB_KEY_INFIX_RESTORE_TABLET, out); // "restore_tablet"
diff --git a/cloud/src/meta-store/keys.h b/cloud/src/meta-store/keys.h
index 3b9b234c574..8ccd974e0b7 100644
--- a/cloud/src/meta-store/keys.h
+++ b/cloud/src/meta-store/keys.h
@@ -218,9 +218,6 @@ using MetaPendingDeleteBitmapInfo = BasicKeyInfo<24 ,
std::tuple<std::string, in
// 0:instance_id 1:db_id
2:job_id
using RLJobProgressKeyInfo = BasicKeyInfo<25, std::tuple<std::string, int64_t,
int64_t>>;
-// 0:instance_id 1:db_id
2:job_id
-using StreamingJobMetaKeyInfo = BasicKeyInfo<52, std::tuple<std::string,
int64_t, int64_t>>;
-
// 0:instance_id
1:vault_id
using StorageVaultKeyInfo = BasicKeyInfo<26, std::tuple<std::string,
std::string>>;
@@ -410,8 +407,6 @@ void job_tablet_key(const JobTabletKeyInfo& in,
std::string* out);
static inline std::string job_tablet_key(const JobTabletKeyInfo& in) {
std::string s; job_tablet_key(in, &s); return s; }
void rl_job_progress_key_info(const RLJobProgressKeyInfo& in, std::string*
out);
static inline std::string rl_job_progress_key_info(const RLJobProgressKeyInfo&
in) { std::string s; rl_job_progress_key_info(in, &s); return s; }
-void streaming_job_meta_key_info(const StreamingJobMetaKeyInfo& in,
std::string* out);
-static inline std::string streaming_job_meta_key_info(const
StreamingJobMetaKeyInfo& in) { std::string s; streaming_job_meta_key_info(in,
&s); return s; }
std::string copy_key_prefix(std::string_view instance_id);
void copy_job_key(const CopyJobKeyInfo& in, std::string* out);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
index 2b0673d6453..f17625a89ea 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
@@ -492,10 +492,4 @@ public class MetaServiceClient {
return
blockingStub.withDeadlineAfter(Config.meta_service_brpc_timeout_ms,
TimeUnit.MILLISECONDS)
.createInstance(request);
}
-
- public Cloud.GetStreamingTaskCommitAttachResponse
-
getStreamingTaskCommitAttach(Cloud.GetStreamingTaskCommitAttachRequest request)
{
- return
blockingStub.withDeadlineAfter(Config.meta_service_brpc_timeout_ms,
TimeUnit.MILLISECONDS)
- .getStreamingTaskCommitAttach(request);
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
index 8710209ff8a..b351942cbe2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
@@ -432,9 +432,4 @@ public class MetaServiceProxy {
public Cloud.CreateInstanceResponse
createInstance(Cloud.CreateInstanceRequest request) throws RpcException {
return w.executeRequest((client) -> client.createInstance(request));
}
-
- public Cloud.GetStreamingTaskCommitAttachResponse
getStreamingTaskCommitAttach(
- Cloud.GetStreamingTaskCommitAttachRequest request) throws
RpcException {
- return w.executeRequest((client) ->
client.getStreamingTaskCommitAttach(request));
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index c3b9c321fa9..fddb6ed7208 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -85,7 +85,6 @@ import org.apache.doris.common.util.InternalDatabaseUtil;
import org.apache.doris.common.util.MetaLockUtils;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.event.DataChangeEvent;
-import
org.apache.doris.job.extensions.insert.streaming.StreamingTaskTxnCommitAttachment;
import org.apache.doris.load.loadv2.LoadJobFinalOperation;
import org.apache.doris.load.routineload.RLTaskTxnCommitAttachment;
import org.apache.doris.metric.MetricRepo;
@@ -620,19 +619,6 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
}
builder.setCommitAttachment(TxnUtil
.rlTaskTxnCommitAttachmentToPb(rlTaskTxnCommitAttachment));
- } else if (txnCommitAttachment instanceof
StreamingTaskTxnCommitAttachment) {
- StreamingTaskTxnCommitAttachment
streamingTaskTxnCommitAttachment =
- (StreamingTaskTxnCommitAttachment)
txnCommitAttachment;
- TxnStateChangeCallback cb =
callbackFactory.getCallback(streamingTaskTxnCommitAttachment.getTaskId());
- if (cb != null) {
- // use a temporary transaction state to do before commit
check,
- // what actually works is the transactionId
- TransactionState tmpTxnState = new TransactionState();
- tmpTxnState.setTransactionId(transactionId);
- cb.beforeCommitted(tmpTxnState);
- }
- builder.setCommitAttachment(TxnUtil
-
.streamingTaskTxnCommitAttachmentToPb(streamingTaskTxnCommitAttachment));
} else {
throw new UserException("invalid txnCommitAttachment");
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java
index 4155e6c5e67..3aca54cd150 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java
@@ -19,7 +19,6 @@ package org.apache.doris.cloud.transaction;
import org.apache.doris.cloud.proto.Cloud.RLTaskTxnCommitAttachmentPB;
import org.apache.doris.cloud.proto.Cloud.RoutineLoadProgressPB;
-import org.apache.doris.cloud.proto.Cloud.StreamingTaskCommitAttachmentPB;
import org.apache.doris.cloud.proto.Cloud.TxnCommitAttachmentPB;
import
org.apache.doris.cloud.proto.Cloud.TxnCommitAttachmentPB.LoadJobFinalOperationPB;
import
org.apache.doris.cloud.proto.Cloud.TxnCommitAttachmentPB.LoadJobFinalOperationPB.EtlStatusPB;
@@ -29,7 +28,6 @@ import org.apache.doris.cloud.proto.Cloud.TxnCoordinatorPB;
import org.apache.doris.cloud.proto.Cloud.TxnInfoPB;
import org.apache.doris.cloud.proto.Cloud.TxnSourceTypePB;
import org.apache.doris.cloud.proto.Cloud.UniqueIdPB;
-import
org.apache.doris.job.extensions.insert.streaming.StreamingTaskTxnCommitAttachment;
import org.apache.doris.load.EtlStatus;
import org.apache.doris.load.FailMsg;
import org.apache.doris.load.loadv2.JobState;
@@ -270,42 +268,6 @@ public class TxnUtil {
return new
RLTaskTxnCommitAttachment(txnCommitAttachmentPB.getRlTaskTxnCommitAttachment());
}
- public static TxnCommitAttachmentPB
streamingTaskTxnCommitAttachmentToPb(StreamingTaskTxnCommitAttachment
- streamingTaskTxnCommitAttachment) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("streamingTaskTxnCommitAttachment:{}",
streamingTaskTxnCommitAttachment);
- }
- TxnCommitAttachmentPB.Builder attachementBuilder =
TxnCommitAttachmentPB.newBuilder();
-
attachementBuilder.setType(TxnCommitAttachmentPB.Type.STREAMING_TASK_TXN_COMMIT_ATTACHMENT);
-
- StreamingTaskCommitAttachmentPB.Builder builder =
- StreamingTaskCommitAttachmentPB.newBuilder();
-
- builder.setJobId(streamingTaskTxnCommitAttachment.getJobId())
- .setTaskId(streamingTaskTxnCommitAttachment.getTaskId())
-
.setScannedRows(streamingTaskTxnCommitAttachment.getScannedRows())
- .setLoadBytes(streamingTaskTxnCommitAttachment.getLoadBytes())
-
.setFileNumber(streamingTaskTxnCommitAttachment.getFileNumber())
- .setFileSize(streamingTaskTxnCommitAttachment.getFileSize());
-
- if (streamingTaskTxnCommitAttachment.getOffset() != null) {
-
builder.setOffset(streamingTaskTxnCommitAttachment.getOffset().endOffset());
- }
-
-
attachementBuilder.setStreamingTaskTxnCommitAttachment(builder.build());
- return attachementBuilder.build();
- }
-
- public static StreamingTaskTxnCommitAttachment
streamingTaskTxnCommitAttachmentFromPb(
- TxnCommitAttachmentPB txnCommitAttachmentPB) {
- StreamingTaskCommitAttachmentPB streamingTaskCommitAttachmentPB =
- txnCommitAttachmentPB.getStreamingTaskTxnCommitAttachment();
- if (LOG.isDebugEnabled()) {
- LOG.debug("StreamingTaskCommitAttachmentPB={}",
streamingTaskCommitAttachmentPB);
- }
- return new
StreamingTaskTxnCommitAttachment(streamingTaskCommitAttachmentPB);
- }
-
public static LoadJobFinalOperation
loadJobFinalOperationFromPb(TxnCommitAttachmentPB txnCommitAttachmentPB) {
LoadJobFinalOperationPB loadJobFinalOperationPB =
txnCommitAttachmentPB.getLoadJobFinalOperation();
if (LOG.isDebugEnabled()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index b872040c4c3..f2ce1ad5e02 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -19,10 +19,7 @@ package org.apache.doris.job.extensions.insert.streaming;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Env;
-import org.apache.doris.cloud.proto.Cloud;
-import org.apache.doris.cloud.rpc.MetaServiceProxy;
import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.UserException;
@@ -52,7 +49,6 @@ import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSetMetaData;
-import org.apache.doris.rpc.RpcException;
import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TRow;
import org.apache.doris.transaction.TransactionException;
@@ -269,17 +265,6 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
offsetProvider.updateOffset(attachment.getOffset());
}
- @Override
- public void onRegister() throws JobException {
-
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(this);
- }
-
- @Override
- public void onReplayCreate() throws JobException {
- onRegister();
- super.onReplayCreate();
- }
-
@Override
public ShowResultSetMetaData getTaskMetaData() {
return InsertJob.TASK_META_DATA;
@@ -405,35 +390,6 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
updateJobStatisticAndOffset(attachment);
}
- public void replayOnCloudMode() throws UserException {
- Cloud.GetStreamingTaskCommitAttachRequest.Builder builder =
- Cloud.GetStreamingTaskCommitAttachRequest.newBuilder();
- builder.setCloudUniqueId(Config.cloud_unique_id);
- builder.setDbId(dbId);
- builder.setJobId(getJobId());
-
- Cloud.GetStreamingTaskCommitAttachResponse response;
- try {
- response =
MetaServiceProxy.getInstance().getStreamingTaskCommitAttach(builder.build());
- if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
- log.warn("failed to get streaming task commit attach,
response: {}", response);
- if (response.getStatus().getCode() ==
Cloud.MetaServiceCode.STREAMING_JOB_PROGRESS_NOT_FOUND) {
- log.warn("not found streaming job progress, response: {}",
response);
- return;
- } else {
- throw new UserException(response.getStatus().getMsg());
- }
- }
- } catch (RpcException e) {
- log.info("failed to get streaming task commit attach {}", e);
- throw new UserException(e.getMessage());
- }
-
- StreamingTaskTxnCommitAttachment commitAttach =
- new
StreamingTaskTxnCommitAttachment(response.getCommitAttach());
- updateJobStatisticAndOffset(commitAttach);
- }
-
@Override
public void afterAborted(TransactionState txnState, boolean txnOperated,
String txnStatusChangeReason)
throws UserException {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java
index 8660ed94739..744f83080aa 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java
@@ -17,7 +17,6 @@
package org.apache.doris.job.extensions.insert.streaming;
-import org.apache.doris.cloud.proto.Cloud.StreamingTaskCommitAttachmentPB;
import org.apache.doris.job.offset.Offset;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TxnCommitAttachment;
@@ -39,15 +38,6 @@ public class StreamingTaskTxnCommitAttachment extends
TxnCommitAttachment {
this.offset = offset;
}
- public StreamingTaskTxnCommitAttachment(StreamingTaskCommitAttachmentPB
pb) {
- super(TransactionState.LoadJobSourceType.STREAMING_JOB);
- this.scannedRows = pb.getScannedRows();
- this.loadBytes = pb.getLoadBytes();
- this.fileNumber = pb.getFileNumber();
- this.fileSize = pb.getFileSize();
- this.offset.setEndOffset(pb.getOffset());
- }
-
@Getter
private long jobId;
@Getter
diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/Offset.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/Offset.java
index a3b0689bfc5..095f0a5e6bf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/Offset.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/Offset.java
@@ -19,8 +19,4 @@ package org.apache.doris.job.offset;
public interface Offset {
String toJson();
-
- void setEndOffset(String endOffset);
-
- String endOffset();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
index 2ab2030fbbb..f76707b2453 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
@@ -30,16 +30,6 @@ public class S3Offset implements Offset {
String endFile;
String fileLists;
- @Override
- public void setEndOffset(String endOffset) {
- this.endFile = endOffset;
- }
-
- @Override
- public String endOffset() {
- return endFile;
- }
-
@Override
public String toJson() {
return GsonUtils.GSON.toJson(this);
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index aa6bcd28359..e277c24f8d7 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -322,7 +322,6 @@ enum LoadJobSourceTypePB {
LOAD_JOB_SRC_TYPE_INSERT_STREAMING = 3; // insert stmt (streaming type),
update stmt use this type
LOAD_JOB_SRC_TYPE_ROUTINE_LOAD_TASK = 4; // routine load task use this type
LOAD_JOB_SRC_TYPE_BATCH_LOAD_JOB = 5; // load job v2 for broker load
- LOAD_JOB_SRC_TYPE_STREAMING_JOB = 6; // streaming job use this type
}
enum TxnStatusPB {
@@ -366,21 +365,10 @@ message RoutineLoadJobStatisticPB {
optional int64 task_execution_time_ms = 5;
}
-message StreamingTaskCommitAttachmentPB {
- optional int64 job_id = 1;
- optional int64 task_id = 2;
- optional string offset = 3;
- optional int64 scanned_rows = 4;
- optional int64 load_bytes = 5;
- optional int64 file_number = 6;
- optional int64 file_size = 7;
-}
-
message TxnCommitAttachmentPB {
enum Type {
LODD_JOB_FINAL_OPERATION = 0;
RT_TASK_TXN_COMMIT_ATTACHMENT = 1;
- STREAMING_TASK_TXN_COMMIT_ATTACHMENT = 2;
}
message LoadJobFinalOperationPB {
message EtlStatusPB {
@@ -437,7 +425,6 @@ message TxnCommitAttachmentPB {
optional Type type = 1;
optional LoadJobFinalOperationPB load_job_final_operation = 2;
optional RLTaskTxnCommitAttachmentPB rl_task_txn_commit_attachment = 3;
- optional StreamingTaskCommitAttachmentPB
streaming_task_txn_commit_attachment = 4;
}
// For storing label -> txn_ids
@@ -1711,8 +1698,7 @@ enum MetaServiceCode {
JOB_ALREADY_SUCCESS = 5002;
ROUTINE_LOAD_DATA_INCONSISTENT = 5003;
ROUTINE_LOAD_PROGRESS_NOT_FOUND = 5004;
- STREAMING_JOB_PROGRESS_NOT_FOUND = 5005;
- JOB_CHECK_ALTER_VERSION = 5006;
+ JOB_CHECK_ALTER_VERSION = 5005;
// Rate limit
MAX_QPS_LIMIT = 6001;
@@ -1890,18 +1876,6 @@ message ResetRLProgressResponse {
optional MetaServiceResponseStatus status = 1;
}
-message GetStreamingTaskCommitAttachRequest {
- optional string cloud_unique_id = 1; // For auth
- optional int64 db_id = 2;
- optional int64 job_id = 3;
- optional string request_ip = 4;
-}
-
-message GetStreamingTaskCommitAttachResponse {
- optional MetaServiceResponseStatus status = 1;
- optional StreamingTaskCommitAttachmentPB commit_attach = 2;
-}
-
message CheckKeyInfos {
repeated int64 db_ids = 1;
repeated int64 table_ids = 2;
@@ -2095,9 +2069,6 @@ service MetaService {
rpc get_rl_task_commit_attach(GetRLTaskCommitAttachRequest) returns
(GetRLTaskCommitAttachResponse);
rpc reset_rl_progress(ResetRLProgressRequest) returns
(ResetRLProgressResponse);
- // streaming job meta
- rpc get_streaming_task_commit_attach(GetStreamingTaskCommitAttachRequest)
returns (GetStreamingTaskCommitAttachResponse);
-
// check KV
rpc check_kv(CheckKVRequest) returns (CheckKVResponse);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]