github-actions[bot] commented on code in PR #60051:
URL: https://github.com/apache/doris/pull/60051#discussion_r2887079157
##########
be/src/runtime/routine_load/data_consumer.cpp:
##########
@@ -556,4 +571,442 @@ bool
KafkaDataConsumer::match(std::shared_ptr<StreamLoadContext> ctx) {
return true;
}
+// ==================== AWS Kinesis Data Consumer Implementation
====================
+
+KinesisDataConsumer::KinesisDataConsumer(std::shared_ptr<StreamLoadContext>
ctx)
+ : _region(ctx->kinesis_info->region),
+ _stream(ctx->kinesis_info->stream),
+ _endpoint(ctx->kinesis_info->endpoint) {
+ VLOG_NOTICE << "construct Kinesis consumer: stream=" << _stream << ",
region=" << _region;
+}
+
+KinesisDataConsumer::~KinesisDataConsumer() {
+ VLOG_NOTICE << "destruct Kinesis consumer: stream=" << _stream;
+ // AWS SDK client managed by shared_ptr, will be automatically cleaned up
+}
+
+Status KinesisDataConsumer::init(std::shared_ptr<StreamLoadContext> ctx) {
+ std::unique_lock<std::mutex> l(_lock);
+ if (_init) {
+ return Status::OK(); // Already initialized (idempotent)
+ }
+
+ // Store custom properties (AWS credentials, etc.)
+ _custom_properties.insert(ctx->kinesis_info->properties.begin(),
+ ctx->kinesis_info->properties.end());
+
+ // Create AWS Kinesis client
+ RETURN_IF_ERROR(_create_kinesis_client(ctx));
+
+ VLOG_NOTICE << "finished to init Kinesis consumer. stream=" << _stream <<
", region=" << _region
+ << ", " << ctx->brief();
+ _init = true;
+ return Status::OK();
+}
+
+Status
KinesisDataConsumer::_create_kinesis_client(std::shared_ptr<StreamLoadContext>
ctx) {
+ // Reuse S3ClientFactory's credential provider logic
+ // This supports all AWS authentication methods:
+ // - Simple AK/SK
+ // - IAM instance profile (EC2)
+ // - STS assume role
+ // - Session tokens
+ // - Environment variables
+ // - Default credential chain
+
+ S3ClientConf s3_conf;
+ s3_conf.region = _region;
+ s3_conf.endpoint = _endpoint;
+
+ // Parse AWS credentials from properties
+ auto it_ak = _custom_properties.find("aws.access.key.id");
+ auto it_sk = _custom_properties.find("aws.secret.access.key");
Review Comment:
**CRITICAL: Property key mismatch between FE and BE — credentials will never
reach AWS SDK.**
The FE stores AWS credentials with `kinesis_*` keys (e.g.,
`kinesis_access_key`, `kinesis_secret_key`) via
`KinesisDataSourceProperties.storeCredentialInCustomProperties()`, and these
are passed through `TKinesisLoadInfo.properties` to the BE.
But here in `_create_kinesis_client()`, the BE looks for completely
different keys: `aws.access.key.id`, `aws.secret.access.key`,
`aws.session.token`, etc.
This means AWS credentials provided by the user will **never** be found by
the BE consumer, and all Kinesis connections will fall back to the default
credential chain (which may or may not work depending on the deployment
environment).
Either the FE needs to map `kinesis_*` keys to `aws.*` keys before sending,
or the BE needs to look for `kinesis_*` keys.
##########
be/src/runtime/routine_load/data_consumer.cpp:
##########
@@ -556,4 +571,442 @@ bool
KafkaDataConsumer::match(std::shared_ptr<StreamLoadContext> ctx) {
return true;
}
+// ==================== AWS Kinesis Data Consumer Implementation
====================
+
+KinesisDataConsumer::KinesisDataConsumer(std::shared_ptr<StreamLoadContext>
ctx)
+ : _region(ctx->kinesis_info->region),
+ _stream(ctx->kinesis_info->stream),
+ _endpoint(ctx->kinesis_info->endpoint) {
+ VLOG_NOTICE << "construct Kinesis consumer: stream=" << _stream << ",
region=" << _region;
+}
+
+KinesisDataConsumer::~KinesisDataConsumer() {
+ VLOG_NOTICE << "destruct Kinesis consumer: stream=" << _stream;
+ // AWS SDK client managed by shared_ptr, will be automatically cleaned up
+}
+
+Status KinesisDataConsumer::init(std::shared_ptr<StreamLoadContext> ctx) {
+ std::unique_lock<std::mutex> l(_lock);
+ if (_init) {
+ return Status::OK(); // Already initialized (idempotent)
+ }
+
+ // Store custom properties (AWS credentials, etc.)
+ _custom_properties.insert(ctx->kinesis_info->properties.begin(),
+ ctx->kinesis_info->properties.end());
+
+ // Create AWS Kinesis client
+ RETURN_IF_ERROR(_create_kinesis_client(ctx));
+
+ VLOG_NOTICE << "finished to init Kinesis consumer. stream=" << _stream <<
", region=" << _region
+ << ", " << ctx->brief();
+ _init = true;
+ return Status::OK();
+}
+
+Status
KinesisDataConsumer::_create_kinesis_client(std::shared_ptr<StreamLoadContext>
ctx) {
+ // Reuse S3ClientFactory's credential provider logic
+ // This supports all AWS authentication methods:
+ // - Simple AK/SK
+ // - IAM instance profile (EC2)
+ // - STS assume role
+ // - Session tokens
+ // - Environment variables
+ // - Default credential chain
+
+ S3ClientConf s3_conf;
+ s3_conf.region = _region;
+ s3_conf.endpoint = _endpoint;
+
+ // Parse AWS credentials from properties
+ auto it_ak = _custom_properties.find("aws.access.key.id");
+ auto it_sk = _custom_properties.find("aws.secret.access.key");
+ auto it_token = _custom_properties.find("aws.session.token");
+ auto it_role_arn = _custom_properties.find("aws.iam.role.arn");
+ auto it_external_id = _custom_properties.find("aws.external.id");
+ auto it_provider = _custom_properties.find("aws.credentials.provider");
+
+ if (it_ak != _custom_properties.end()) {
+ s3_conf.ak = it_ak->second;
+ }
+ if (it_sk != _custom_properties.end()) {
+ s3_conf.sk = it_sk->second;
+ }
+ if (it_token != _custom_properties.end()) {
+ s3_conf.token = it_token->second;
+ }
+ if (it_role_arn != _custom_properties.end()) {
+ s3_conf.role_arn = it_role_arn->second;
+ }
+ if (it_external_id != _custom_properties.end()) {
+ s3_conf.external_id = it_external_id->second;
+ }
+ if (it_provider != _custom_properties.end()) {
+ // Map provider type string to enum
+ if (it_provider->second == "instance_profile") {
+ s3_conf.cred_provider_type = CredProviderType::InstanceProfile;
+ } else if (it_provider->second == "env") {
+ s3_conf.cred_provider_type = CredProviderType::Env;
+ } else if (it_provider->second == "simple") {
+ s3_conf.cred_provider_type = CredProviderType::Simple;
+ }
+ }
+
+ // Create AWS ClientConfiguration
+ Aws::Client::ClientConfiguration aws_config =
S3ClientFactory::getClientConfiguration();
+ aws_config.region = _region;
+
+ if (!_endpoint.empty()) {
+ aws_config.endpointOverride = _endpoint;
+ }
+
+ // Set timeouts from properties or use defaults
+ auto it_request_timeout =
_custom_properties.find("aws.request.timeout.ms");
+ if (it_request_timeout != _custom_properties.end()) {
+ aws_config.requestTimeoutMs = std::stoi(it_request_timeout->second);
+ } else {
+ aws_config.requestTimeoutMs = 30000; // 30s default
+ }
+
+ auto it_conn_timeout =
_custom_properties.find("aws.connection.timeout.ms");
+ if (it_conn_timeout != _custom_properties.end()) {
+ aws_config.connectTimeoutMs = std::stoi(it_conn_timeout->second);
+ }
+
+ // Get credentials provider (reuses S3 infrastructure)
+ auto credentials_provider =
S3ClientFactory::instance().get_aws_credentials_provider(s3_conf);
+
+ // Create Kinesis client
+ _kinesis_client =
+
std::make_shared<Aws::Kinesis::KinesisClient>(credentials_provider, aws_config);
+
+ if (!_kinesis_client) {
+ return Status::InternalError(
Review Comment:
**Defensive programming violation:** `std::make_shared` never returns null —
it throws `std::bad_alloc` on allocation failure. This `if (!_kinesis_client)`
check is dead code.
Per coding standards, remove this check or replace with
`DORIS_CHECK(_kinesis_client)` if you want an assertion.
##########
fe/fe-core/src/main/java/org/apache/doris/load/routineload/KinesisProgress.java:
##########
@@ -0,0 +1,295 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.routineload;
+
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.Pair;
+import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.thrift.TKinesisRLTaskProgress;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import com.google.gson.annotations.SerializedName;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Progress tracking for Kinesis Routine Load jobs.
+ *
+ * Kinesis uses sequence numbers instead of offsets like Kafka.
+ * A sequence number is a unique identifier for each record within a shard.
+ * Sequence numbers are string representations of 128-bit integers.
+ *
+ * Special position values:
+ * - TRIM_HORIZON: Start from the oldest record in the shard
+ * - LATEST: Start from the newest record (records arriving after the iterator
is created)
+ * - AT_TIMESTAMP: Start from a specific timestamp
+ * - Specific sequence number: Start from or after a specific sequence number
+ */
+public class KinesisProgress extends RoutineLoadProgress {
+ private static final Logger LOG =
LogManager.getLogger(KinesisProgress.class);
+
+ // Special position constants
+ public static final String POSITION_TRIM_HORIZON = "TRIM_HORIZON";
+ public static final String POSITION_LATEST = "LATEST";
+ public static final String POSITION_AT_TIMESTAMP = "AT_TIMESTAMP";
+
+ // Internal representation for special positions
+ // Using negative values since sequence numbers are always positive
+ public static final String TRIM_HORIZON_VAL = "-2";
+ public static final String LATEST_VAL = "-1";
+
+ /**
+ * Map from shard ID to sequence number.
+ * The sequence number saved here is the next sequence number to be
consumed.
+ *
+ * Note: Unlike Kafka partitions which are integers, Kinesis shard IDs are
strings
+ * like "shardId-000000000000".
+ */
+ @SerializedName(value = "shardToSeqNum")
+ private ConcurrentMap<String, String> shardIdToSequenceNumber =
Maps.newConcurrentMap();
+
+ // MillisBehindLatest per shard, reported by BE from GetRecords response.
+ // Not persisted — refreshed every task commit. Used only for lag display
and scheduling.
+ private ConcurrentMap<String, Long> shardIdToMillsBehindLatest =
Maps.newConcurrentMap();
+
+ private ReentrantLock lock = new ReentrantLock(true);
+
Review Comment:
**CRITICAL: `lock` will be null after Gson deserialization, causing NPE on
FE restart/failover.**
Gson creates objects using `sun.misc.Unsafe.allocateInstance()` which
bypasses constructors entirely. This means the inline initialization `new
ReentrantLock(true)` is never executed. After the progress is deserialized from
EditLog or image, `lock` will be `null`.
Any subsequent call to `addShardPosition()`, `modifyPosition()`, or
`update()` will throw `NullPointerException`, breaking progress tracking after
FE restart or master failover.
Fix options:
1. Add a `@PostConstruct`-style `gsonPostProcess()` method (if the base
class supports it) that initializes `lock`
2. Make `lock` `transient` and initialize it lazily or in a deserialization
hook
3. Follow the pattern used by other Doris classes that need locks after Gson
deserialization
##########
be/src/runtime/routine_load/data_consumer.cpp:
##########
@@ -556,4 +571,442 @@ bool
KafkaDataConsumer::match(std::shared_ptr<StreamLoadContext> ctx) {
return true;
}
+// ==================== AWS Kinesis Data Consumer Implementation
====================
+
+KinesisDataConsumer::KinesisDataConsumer(std::shared_ptr<StreamLoadContext>
ctx)
+ : _region(ctx->kinesis_info->region),
+ _stream(ctx->kinesis_info->stream),
+ _endpoint(ctx->kinesis_info->endpoint) {
+ VLOG_NOTICE << "construct Kinesis consumer: stream=" << _stream << ",
region=" << _region;
+}
+
+KinesisDataConsumer::~KinesisDataConsumer() {
+ VLOG_NOTICE << "destruct Kinesis consumer: stream=" << _stream;
+ // AWS SDK client managed by shared_ptr, will be automatically cleaned up
+}
+
+Status KinesisDataConsumer::init(std::shared_ptr<StreamLoadContext> ctx) {
+ std::unique_lock<std::mutex> l(_lock);
+ if (_init) {
+ return Status::OK(); // Already initialized (idempotent)
+ }
+
+ // Store custom properties (AWS credentials, etc.)
+ _custom_properties.insert(ctx->kinesis_info->properties.begin(),
+ ctx->kinesis_info->properties.end());
+
+ // Create AWS Kinesis client
+ RETURN_IF_ERROR(_create_kinesis_client(ctx));
+
+ VLOG_NOTICE << "finished to init Kinesis consumer. stream=" << _stream <<
", region=" << _region
+ << ", " << ctx->brief();
+ _init = true;
+ return Status::OK();
+}
+
+Status
KinesisDataConsumer::_create_kinesis_client(std::shared_ptr<StreamLoadContext>
ctx) {
+ // Reuse S3ClientFactory's credential provider logic
+ // This supports all AWS authentication methods:
+ // - Simple AK/SK
+ // - IAM instance profile (EC2)
+ // - STS assume role
+ // - Session tokens
+ // - Environment variables
+ // - Default credential chain
+
+ S3ClientConf s3_conf;
+ s3_conf.region = _region;
+ s3_conf.endpoint = _endpoint;
+
+ // Parse AWS credentials from properties
+ auto it_ak = _custom_properties.find("aws.access.key.id");
+ auto it_sk = _custom_properties.find("aws.secret.access.key");
+ auto it_token = _custom_properties.find("aws.session.token");
+ auto it_role_arn = _custom_properties.find("aws.iam.role.arn");
+ auto it_external_id = _custom_properties.find("aws.external.id");
+ auto it_provider = _custom_properties.find("aws.credentials.provider");
+
+ if (it_ak != _custom_properties.end()) {
+ s3_conf.ak = it_ak->second;
+ }
+ if (it_sk != _custom_properties.end()) {
+ s3_conf.sk = it_sk->second;
+ }
+ if (it_token != _custom_properties.end()) {
+ s3_conf.token = it_token->second;
+ }
+ if (it_role_arn != _custom_properties.end()) {
+ s3_conf.role_arn = it_role_arn->second;
+ }
+ if (it_external_id != _custom_properties.end()) {
+ s3_conf.external_id = it_external_id->second;
+ }
+ if (it_provider != _custom_properties.end()) {
+ // Map provider type string to enum
+ if (it_provider->second == "instance_profile") {
+ s3_conf.cred_provider_type = CredProviderType::InstanceProfile;
+ } else if (it_provider->second == "env") {
+ s3_conf.cred_provider_type = CredProviderType::Env;
+ } else if (it_provider->second == "simple") {
+ s3_conf.cred_provider_type = CredProviderType::Simple;
+ }
+ }
+
+ // Create AWS ClientConfiguration
+ Aws::Client::ClientConfiguration aws_config =
S3ClientFactory::getClientConfiguration();
+ aws_config.region = _region;
+
+ if (!_endpoint.empty()) {
+ aws_config.endpointOverride = _endpoint;
+ }
+
+ // Set timeouts from properties or use defaults
+ auto it_request_timeout =
_custom_properties.find("aws.request.timeout.ms");
+ if (it_request_timeout != _custom_properties.end()) {
+ aws_config.requestTimeoutMs = std::stoi(it_request_timeout->second);
+ } else {
+ aws_config.requestTimeoutMs = 30000; // 30s default
+ }
+
+ auto it_conn_timeout =
_custom_properties.find("aws.connection.timeout.ms");
+ if (it_conn_timeout != _custom_properties.end()) {
+ aws_config.connectTimeoutMs = std::stoi(it_conn_timeout->second);
+ }
+
+ // Get credentials provider (reuses S3 infrastructure)
+ auto credentials_provider =
S3ClientFactory::instance().get_aws_credentials_provider(s3_conf);
+
+ // Create Kinesis client
+ _kinesis_client =
+
std::make_shared<Aws::Kinesis::KinesisClient>(credentials_provider, aws_config);
+
+ if (!_kinesis_client) {
+ return Status::InternalError(
+ "Failed to create AWS Kinesis client for stream: {}, region:
{}", _stream, _region);
+ }
+
+ LOG(INFO) << "Created Kinesis client for stream: " << _stream << ",
region: " << _region;
+ return Status::OK();
+}
+
+Status KinesisDataConsumer::assign_shards(
+ const std::map<std::string, std::string>& shard_sequence_numbers,
+ const std::string& stream_name, std::shared_ptr<StreamLoadContext>
ctx) {
+ DORIS_CHECK(_kinesis_client);
+
+ std::stringstream ss;
+ ss << "Assigning shards to Kinesis consumer " << _id << ": ";
+
+ for (auto& entry : shard_sequence_numbers) {
+ const std::string& shard_id = entry.first;
+ const std::string& sequence_number = entry.second;
+
+ // Get shard iterator for this shard
+ std::string iterator;
+ RETURN_IF_ERROR(_get_shard_iterator(shard_id, sequence_number,
&iterator));
+
+ _shard_iterators[shard_id] = iterator;
+ _consuming_shard_ids.insert(shard_id);
+
+ ss << "[" << shard_id << ": " << sequence_number << "] ";
+ }
+
+ LOG(INFO) << ss.str();
+ return Status::OK();
+}
+
+Status KinesisDataConsumer::_get_shard_iterator(const std::string& shard_id,
+ const std::string&
sequence_number,
+ std::string* iterator) {
+ Aws::Kinesis::Model::GetShardIteratorRequest request;
+ request.SetStreamName(_stream);
+ request.SetShardId(shard_id);
+
+ // Determine iterator type based on sequence number
+ if (sequence_number.empty() || sequence_number == "TRIM_HORIZON" ||
sequence_number == "-2") {
+ // Start from oldest record in shard
+
request.SetShardIteratorType(Aws::Kinesis::Model::ShardIteratorType::TRIM_HORIZON);
+ } else if (sequence_number == "LATEST" || sequence_number == "-1") {
+ // Start from newest record (records arriving after iterator creation)
+
request.SetShardIteratorType(Aws::Kinesis::Model::ShardIteratorType::LATEST);
+ } else {
+ // Resume from specific sequence number
+
request.SetShardIteratorType(Aws::Kinesis::Model::ShardIteratorType::AFTER_SEQUENCE_NUMBER);
+ request.SetStartingSequenceNumber(sequence_number);
+ }
+
+ auto outcome = _kinesis_client->GetShardIterator(request);
+ if (!outcome.IsSuccess()) {
+ auto& error = outcome.GetError();
+ return Status::InternalError("Failed to get shard iterator for shard
{}: {} ({})", shard_id,
+ error.GetMessage(),
static_cast<int>(error.GetErrorType()));
+ }
+
+ *iterator = outcome.GetResult().GetShardIterator();
+ VLOG_NOTICE << "Got shard iterator for shard: " << shard_id;
+ return Status::OK();
+}
+
+Status KinesisDataConsumer::group_consume(
+ BlockingQueue<std::shared_ptr<Aws::Kinesis::Model::Record>>* queue,
+ int64_t max_running_time_ms) {
+ static constexpr int MAX_RETRY_TIMES_FOR_TRANSPORT_FAILURE = 3;
+ static constexpr int RATE_LIMIT_BACKOFF_MS = 1000; // 1 second
+ static constexpr int KINESIS_GET_RECORDS_LIMIT = 1000; // Max 10000
+ static constexpr int INTER_SHARD_SLEEP_MS = 10; // Small sleep
between shards
+
+ int64_t left_time = max_running_time_ms;
+ LOG(INFO) << "start Kinesis consumer: " << _id << ", grp: " << _grp_id
+ << ", stream: " << _stream << ", max running time(ms): " <<
left_time;
+
+ int64_t received_rows = 0;
+ int64_t put_rows = 0;
+ int32_t retry_times = 0;
+ Status st = Status::OK();
+ bool done = false;
+
+ MonotonicStopWatch consumer_watch;
+ MonotonicStopWatch watch;
+ watch.start();
+
+ while (true) {
+ // Check cancellation flag
+ {
+ std::unique_lock<std::mutex> l(_lock);
+ if (_cancelled) {
+ break;
+ }
+ }
+
+ if (left_time <= 0) {
+ break;
+ }
+
+ // Round-robin through all active shards
+ for (auto it = _consuming_shard_ids.begin(); it !=
_consuming_shard_ids.end() && !done;) {
+ const std::string& shard_id = *it;
+ auto iter_it = _shard_iterators.find(shard_id);
+
+ if (iter_it == _shard_iterators.end() || iter_it->second.empty()) {
+ // Shard exhausted (closed due to split/merge), remove from
active set
+ LOG(INFO) << "Shard exhausted: " << shard_id;
+ it = _consuming_shard_ids.erase(it);
+ continue;
+ }
+
+ // Call Kinesis GetRecords API
+ consumer_watch.start();
+
+ Aws::Kinesis::Model::GetRecordsRequest request;
+ request.SetShardIterator(iter_it->second);
+ request.SetLimit(KINESIS_GET_RECORDS_LIMIT);
+
+ auto outcome = _kinesis_client->GetRecords(request);
+ consumer_watch.stop();
+
+ // Track metrics (reuse Kafka metrics, they're generic)
+ DorisMetrics::instance()->routine_load_get_msg_count->increment(1);
+ DorisMetrics::instance()->routine_load_get_msg_latency->increment(
+ consumer_watch.elapsed_time() / 1000 / 1000);
+
+ if (!outcome.IsSuccess()) {
+ auto& error = outcome.GetError();
+
+ // Handle throttling (ProvisionedThroughputExceededException)
+ if (error.GetErrorType() ==
+
Aws::Kinesis::KinesisErrors::PROVISIONED_THROUGHPUT_EXCEEDED) {
+ LOG(INFO) << "Kinesis rate limit exceeded for shard: " <<
shard_id
+ << ", backing off " << RATE_LIMIT_BACKOFF_MS <<
"ms";
+
std::this_thread::sleep_for(std::chrono::milliseconds(RATE_LIMIT_BACKOFF_MS));
+ ++it; // Move to next shard, will retry this one next round
+ continue;
+ }
+
+ // Handle retriable errors
+ if (_is_retriable_error(error)) {
+ LOG(INFO) << "Kinesis retriable error for shard " <<
shard_id << ": "
+ << error.GetMessage() << ", retry times: " <<
retry_times++;
+ if (retry_times <= MAX_RETRY_TIMES_FOR_TRANSPORT_FAILURE) {
+
std::this_thread::sleep_for(std::chrono::milliseconds(200));
+ continue;
+ }
+ }
+
+ // Fatal error
+ LOG(WARNING) << "Kinesis consume failed for shard " <<
shard_id << ": "
+ << error.GetMessage() << " (" <<
static_cast<int>(error.GetErrorType())
+ << ")";
+ st = Status::InternalError("Kinesis GetRecords failed for
shard {}: {}", shard_id,
+ error.GetMessage());
+ done = true;
+ break;
+ }
+
+ // Reset retry counter on success
+ retry_times = 0;
+
+ // Process records - move result to allow moving individual records
+ auto result = outcome.GetResultWithOwnership();
+ auto millis_behind = result.GetMillisBehindLatest();
+ std::string next_iterator = result.GetNextShardIterator();
+ RETURN_IF_ERROR(_process_records(shard_id, std::move(result),
queue, &received_rows,
+ &put_rows));
+
+ // Track MillisBehindLatest for this shard (used by FE for lag
monitoring & scheduling)
+ _millis_behind_latest[shard_id] = millis_behind;
+
+ // Update shard iterator for next call
+ if (next_iterator.empty()) {
+ // Shard is closed (split/merge), remove from active set
+ LOG(INFO) << "Shard closed: " << shard_id << " (split/merge
detected)";
+ _shard_iterators.erase(shard_id);
+ it = _consuming_shard_ids.erase(it);
+ } else {
+ _shard_iterators[shard_id] = next_iterator;
+ ++it;
+ }
+
+ // Check if all shards are exhausted
+ if (_consuming_shard_ids.empty()) {
+ LOG(INFO) << "All shards exhausted for consumer: " << _id;
+ done = true;
+ break;
+ }
+
+ // Small sleep to avoid tight loop
+
std::this_thread::sleep_for(std::chrono::milliseconds(INTER_SHARD_SLEEP_MS));
+ }
+
+ left_time = max_running_time_ms - watch.elapsed_time() / 1000 / 1000;
+ if (done) {
+ break;
+ }
+ }
+
+ LOG(INFO) << "Kinesis consumer done: " << _id << ", grp: " << _grp_id
+ << ". cancelled: " << _cancelled << ", left time(ms): " <<
left_time
+ << ", total cost(ms): " << watch.elapsed_time() / 1000 / 1000
+ << ", consume cost(ms): " << consumer_watch.elapsed_time() /
1000 / 1000
+ << ", received rows: " << received_rows << ", put rows: " <<
put_rows;
+
+ return st;
+}
+
+Status KinesisDataConsumer::_process_records(
+ const std::string& shard_id, Aws::Kinesis::Model::GetRecordsResult
result,
+ BlockingQueue<std::shared_ptr<Aws::Kinesis::Model::Record>>* queue,
int64_t* received_rows,
+ int64_t* put_rows) {
+ // result is owned by value, safe to get mutable access to its records
+ auto records =
+
std::move(const_cast<Aws::Vector<Aws::Kinesis::Model::Record>&>(result.GetRecords()));
+
+ for (auto& record : records) {
+ DorisMetrics::instance()->routine_load_consume_bytes->increment(
+ record.GetData().GetLength());
+
+ if (record.GetData().GetLength() == 0) {
+ // Skip empty records
+ continue;
+ }
+
+ // Track the last sequence number for this shard
+ _committed_sequence_numbers[shard_id] = record.GetSequenceNumber();
+
+ // Move record into shared_ptr to avoid expensive copy
+ auto record_ptr =
std::make_shared<Aws::Kinesis::Model::Record>(std::move(record));
+
+ if (!queue->controlled_blocking_put(record_ptr,
+
config::blocking_queue_cv_wait_timeout_ms)) {
+ // Queue shutdown
+ return Status::InternalError("Queue shutdown during record
processing");
+ }
+
+ (*put_rows)++;
+ (*received_rows)++;
+ DorisMetrics::instance()->routine_load_consume_rows->increment(1);
+ }
+
+ return Status::OK();
+}
+
+bool KinesisDataConsumer::_is_retriable_error(
+ const Aws::Client::AWSError<Aws::Kinesis::KinesisErrors>& error) {
+ auto error_type = error.GetErrorType();
+
+ return error_type ==
Aws::Kinesis::KinesisErrors::PROVISIONED_THROUGHPUT_EXCEEDED ||
+ error_type == Aws::Kinesis::KinesisErrors::SERVICE_UNAVAILABLE ||
+ error_type == Aws::Kinesis::KinesisErrors::INTERNAL_FAILURE ||
+ error_type == Aws::Kinesis::KinesisErrors::NETWORK_CONNECTION ||
error.ShouldRetry();
+}
+
+Status KinesisDataConsumer::reset() {
+ std::unique_lock<std::mutex> l(_lock);
+ _cancelled = false;
+ _shard_iterators.clear();
+ _consuming_shard_ids.clear();
+ _last_visit_time = time(nullptr);
+ LOG(INFO) << "Kinesis consumer reset: " << _id;
+ return Status::OK();
+}
+
+Status KinesisDataConsumer::cancel(std::shared_ptr<StreamLoadContext> ctx) {
+ std::unique_lock<std::mutex> l(_lock);
+ if (!_init) {
+ return Status::InternalError("Kinesis consumer is not initialized");
+ }
+ _cancelled = true;
+ LOG(INFO) << "Kinesis consumer cancelled: " << _id << ", " << ctx->brief();
+ return Status::OK();
+}
+
+bool KinesisDataConsumer::match(std::shared_ptr<StreamLoadContext> ctx) {
+ if (ctx->load_src_type != TLoadSourceType::KINESIS) {
+ return false;
+ }
+
+ if (_region != ctx->kinesis_info->region || _stream !=
ctx->kinesis_info->stream ||
+ _endpoint != ctx->kinesis_info->endpoint) {
+ return false;
+ }
+
+ // Check that properties match
+ if (_custom_properties.size() != ctx->kinesis_info->properties.size()) {
+ return false;
+ }
+
+ for (auto& item : ctx->kinesis_info->properties) {
+ auto itr = _custom_properties.find(item.first);
+ if (itr == _custom_properties.end() || itr->second != item.second) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+Status KinesisDataConsumer::get_shard_list(std::vector<std::string>*
shard_ids) {
+ DORIS_CHECK(_kinesis_client);
+
+ Aws::Kinesis::Model::ListShardsRequest request;
+ request.SetStreamName(_stream);
+
Review Comment:
**MEDIUM: `get_shard_list()` does not handle ListShards pagination.** The
AWS Kinesis ListShards API returns a maximum of 1000 shards per response and
provides a `NextToken` for continuation. This implementation only fetches the
first page.
For streams with many shards (e.g., after repeated splits), this will
silently return an incomplete shard list, causing some shards to never be
consumed.
Add a loop that continues calling `ListShards` with the `NextToken` until
all shards are retrieved.
##########
fe/fe-core/src/main/java/org/apache/doris/load/routineload/KinesisTaskInfo.java:
##########
@@ -0,0 +1,258 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.routineload;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.nereids.load.NereidsLoadTaskInfo;
+import org.apache.doris.nereids.load.NereidsStreamLoadPlanner;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TKinesisLoadInfo;
+import org.apache.doris.thrift.TLoadSourceType;
+import org.apache.doris.thrift.TPipelineFragmentParams;
+import org.apache.doris.thrift.TPipelineWorkloadGroup;
+import org.apache.doris.thrift.TPlanFragment;
+import org.apache.doris.thrift.TRoutineLoadTask;
+import org.apache.doris.thrift.TUniqueId;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
+
+/**
+ * Task info for Kinesis Routine Load.
+ *
+ * Each task is responsible for consuming data from one or more Kinesis shards.
+ * The task tracks the sequence number for each shard and reports progress back
+ * to the FE after successful consumption.
+ */
+public class KinesisTaskInfo extends RoutineLoadTaskInfo {
+ private static final Logger LOG =
LogManager.getLogger(KinesisTaskInfo.class);
+
+ private RoutineLoadManager getRoutineLoadManager() {
+ return Env.getCurrentEnv().getRoutineLoadManager();
+ }
+
+ /**
+ * Map from shard ID to starting sequence number for this task.
+ */
+ private Map<String, String> shardIdToSequenceNumber = Maps.newHashMap();
+
+ /**
+ * Create a new KinesisTaskInfo.
+ *
+ * @param id Task ID
+ * @param jobId Job ID
+ * @param timeoutMs Timeout in milliseconds
+ * @param shardIdToSequenceNumber Initial shard positions
+ * @param isMultiTable Whether this is a multi-table job
+ */
+ public KinesisTaskInfo(UUID id, long jobId, long timeoutMs,
+ Map<String, String> shardIdToSequenceNumber,
+ boolean isMultiTable, long taskSubmitTimeMs,
boolean isEof) {
+ super(id, jobId, timeoutMs, isMultiTable, taskSubmitTimeMs, isEof);
+ this.shardIdToSequenceNumber.putAll(shardIdToSequenceNumber);
+ }
+
+ /**
+ * Create a new task from an old task with updated positions.
+ */
+ public KinesisTaskInfo(KinesisTaskInfo oldTask, ConcurrentMap<String,
String> shardIdToSequenceNumber,
+ boolean isMultiTable) {
+ super(UUID.randomUUID(), oldTask.getJobId(), oldTask.getTimeoutMs(),
+ oldTask.getBeId(), isMultiTable,
oldTask.getLastScheduledTime(), oldTask.getIsEof());
+ this.shardIdToSequenceNumber.putAll(shardIdToSequenceNumber);
+ }
+
+ /**
+ * Get the list of shard IDs this task is responsible for.
+ */
+ public List<String> getShards() {
+ return new ArrayList<>(shardIdToSequenceNumber.keySet());
+ }
+
+ /**
+ * Get the shard to sequence number mapping.
+ */
+ public Map<String, String> getShardIdToSequenceNumber() {
+ return shardIdToSequenceNumber;
+ }
+
+ @Override
+ public TRoutineLoadTask createRoutineLoadTask() throws UserException {
+ KinesisRoutineLoadJob routineLoadJob =
+ (KinesisRoutineLoadJob) getRoutineLoadManager().getJob(jobId);
+
Review Comment:
**No null check on `getJob(jobId)` return value.** If the routine load job
is deleted between task creation and task execution, `getJob()` returns `null`
and this cast will throw `NullPointerException`.
Add a null check:
```java
KinesisRoutineLoadJob routineLoadJob = (KinesisRoutineLoadJob)
getRoutineLoadManager().getJob(jobId);
if (routineLoadJob == null) {
throw new UserException("Kinesis routine load job " + jobId + " not
found");
}
```
##########
be/src/runtime/routine_load/data_consumer.cpp:
##########
@@ -556,4 +571,442 @@ bool
KafkaDataConsumer::match(std::shared_ptr<StreamLoadContext> ctx) {
return true;
}
+// ==================== AWS Kinesis Data Consumer Implementation
====================
+
+KinesisDataConsumer::KinesisDataConsumer(std::shared_ptr<StreamLoadContext>
ctx)
+ : _region(ctx->kinesis_info->region),
+ _stream(ctx->kinesis_info->stream),
+ _endpoint(ctx->kinesis_info->endpoint) {
+ VLOG_NOTICE << "construct Kinesis consumer: stream=" << _stream << ",
region=" << _region;
+}
+
+KinesisDataConsumer::~KinesisDataConsumer() {
+ VLOG_NOTICE << "destruct Kinesis consumer: stream=" << _stream;
+ // AWS SDK client managed by shared_ptr, will be automatically cleaned up
+}
+
+Status KinesisDataConsumer::init(std::shared_ptr<StreamLoadContext> ctx) {
+ std::unique_lock<std::mutex> l(_lock);
+ if (_init) {
+ return Status::OK(); // Already initialized (idempotent)
+ }
+
+ // Store custom properties (AWS credentials, etc.)
+ _custom_properties.insert(ctx->kinesis_info->properties.begin(),
+ ctx->kinesis_info->properties.end());
+
+ // Create AWS Kinesis client
+ RETURN_IF_ERROR(_create_kinesis_client(ctx));
+
+ VLOG_NOTICE << "finished to init Kinesis consumer. stream=" << _stream <<
", region=" << _region
+ << ", " << ctx->brief();
+ _init = true;
+ return Status::OK();
+}
+
+Status
KinesisDataConsumer::_create_kinesis_client(std::shared_ptr<StreamLoadContext>
ctx) {
+ // Reuse S3ClientFactory's credential provider logic
+ // This supports all AWS authentication methods:
+ // - Simple AK/SK
+ // - IAM instance profile (EC2)
+ // - STS assume role
+ // - Session tokens
+ // - Environment variables
+ // - Default credential chain
+
+ S3ClientConf s3_conf;
+ s3_conf.region = _region;
+ s3_conf.endpoint = _endpoint;
+
+ // Parse AWS credentials from properties
+ auto it_ak = _custom_properties.find("aws.access.key.id");
+ auto it_sk = _custom_properties.find("aws.secret.access.key");
+ auto it_token = _custom_properties.find("aws.session.token");
+ auto it_role_arn = _custom_properties.find("aws.iam.role.arn");
+ auto it_external_id = _custom_properties.find("aws.external.id");
+ auto it_provider = _custom_properties.find("aws.credentials.provider");
+
+ if (it_ak != _custom_properties.end()) {
+ s3_conf.ak = it_ak->second;
+ }
+ if (it_sk != _custom_properties.end()) {
+ s3_conf.sk = it_sk->second;
+ }
+ if (it_token != _custom_properties.end()) {
+ s3_conf.token = it_token->second;
+ }
+ if (it_role_arn != _custom_properties.end()) {
+ s3_conf.role_arn = it_role_arn->second;
+ }
+ if (it_external_id != _custom_properties.end()) {
+ s3_conf.external_id = it_external_id->second;
+ }
+ if (it_provider != _custom_properties.end()) {
+ // Map provider type string to enum
+ if (it_provider->second == "instance_profile") {
+ s3_conf.cred_provider_type = CredProviderType::InstanceProfile;
+ } else if (it_provider->second == "env") {
+ s3_conf.cred_provider_type = CredProviderType::Env;
+ } else if (it_provider->second == "simple") {
+ s3_conf.cred_provider_type = CredProviderType::Simple;
+ }
+ }
+
+ // Create AWS ClientConfiguration
+ Aws::Client::ClientConfiguration aws_config =
S3ClientFactory::getClientConfiguration();
+ aws_config.region = _region;
+
+ if (!_endpoint.empty()) {
+ aws_config.endpointOverride = _endpoint;
+ }
+
+ // Set timeouts from properties or use defaults
+ auto it_request_timeout =
_custom_properties.find("aws.request.timeout.ms");
+ if (it_request_timeout != _custom_properties.end()) {
+ aws_config.requestTimeoutMs = std::stoi(it_request_timeout->second);
+ } else {
Review Comment:
**HIGH: `std::stoi` without exception handling — will crash BE on invalid
user input.**
`std::stoi` throws `std::invalid_argument` if the string cannot be parsed as
an integer, and `std::out_of_range` if the value exceeds int range. Since these
timeout values come from user-provided properties, malformed input will crash
the BE process.
Wrap in try/catch or use a safe parsing function that returns a `Status` on
failure.
##########
be/src/runtime/routine_load/data_consumer.cpp:
##########
@@ -556,4 +571,442 @@ bool
KafkaDataConsumer::match(std::shared_ptr<StreamLoadContext> ctx) {
return true;
}
+// ==================== AWS Kinesis Data Consumer Implementation
====================
+
+KinesisDataConsumer::KinesisDataConsumer(std::shared_ptr<StreamLoadContext>
ctx)
+ : _region(ctx->kinesis_info->region),
+ _stream(ctx->kinesis_info->stream),
+ _endpoint(ctx->kinesis_info->endpoint) {
+ VLOG_NOTICE << "construct Kinesis consumer: stream=" << _stream << ",
region=" << _region;
+}
+
+KinesisDataConsumer::~KinesisDataConsumer() {
+ VLOG_NOTICE << "destruct Kinesis consumer: stream=" << _stream;
+ // AWS SDK client managed by shared_ptr, will be automatically cleaned up
+}
+
+Status KinesisDataConsumer::init(std::shared_ptr<StreamLoadContext> ctx) {
+ std::unique_lock<std::mutex> l(_lock);
+ if (_init) {
+ return Status::OK(); // Already initialized (idempotent)
+ }
+
+ // Store custom properties (AWS credentials, etc.)
+ _custom_properties.insert(ctx->kinesis_info->properties.begin(),
+ ctx->kinesis_info->properties.end());
+
+ // Create AWS Kinesis client
+ RETURN_IF_ERROR(_create_kinesis_client(ctx));
+
+ VLOG_NOTICE << "finished to init Kinesis consumer. stream=" << _stream <<
", region=" << _region
+ << ", " << ctx->brief();
+ _init = true;
+ return Status::OK();
+}
+
+Status
KinesisDataConsumer::_create_kinesis_client(std::shared_ptr<StreamLoadContext>
ctx) {
+ // Reuse S3ClientFactory's credential provider logic
+ // This supports all AWS authentication methods:
+ // - Simple AK/SK
+ // - IAM instance profile (EC2)
+ // - STS assume role
+ // - Session tokens
+ // - Environment variables
+ // - Default credential chain
+
+ S3ClientConf s3_conf;
+ s3_conf.region = _region;
+ s3_conf.endpoint = _endpoint;
+
+ // Parse AWS credentials from properties
+ auto it_ak = _custom_properties.find("aws.access.key.id");
+ auto it_sk = _custom_properties.find("aws.secret.access.key");
+ auto it_token = _custom_properties.find("aws.session.token");
+ auto it_role_arn = _custom_properties.find("aws.iam.role.arn");
+ auto it_external_id = _custom_properties.find("aws.external.id");
+ auto it_provider = _custom_properties.find("aws.credentials.provider");
+
+ if (it_ak != _custom_properties.end()) {
+ s3_conf.ak = it_ak->second;
+ }
+ if (it_sk != _custom_properties.end()) {
+ s3_conf.sk = it_sk->second;
+ }
+ if (it_token != _custom_properties.end()) {
+ s3_conf.token = it_token->second;
+ }
+ if (it_role_arn != _custom_properties.end()) {
+ s3_conf.role_arn = it_role_arn->second;
+ }
+ if (it_external_id != _custom_properties.end()) {
+ s3_conf.external_id = it_external_id->second;
+ }
+ if (it_provider != _custom_properties.end()) {
+ // Map provider type string to enum
+ if (it_provider->second == "instance_profile") {
+ s3_conf.cred_provider_type = CredProviderType::InstanceProfile;
+ } else if (it_provider->second == "env") {
+ s3_conf.cred_provider_type = CredProviderType::Env;
+ } else if (it_provider->second == "simple") {
+ s3_conf.cred_provider_type = CredProviderType::Simple;
+ }
+ }
+
+ // Create AWS ClientConfiguration
+ Aws::Client::ClientConfiguration aws_config =
S3ClientFactory::getClientConfiguration();
+ aws_config.region = _region;
+
+ if (!_endpoint.empty()) {
+ aws_config.endpointOverride = _endpoint;
+ }
+
+ // Set timeouts from properties or use defaults
+ auto it_request_timeout =
_custom_properties.find("aws.request.timeout.ms");
+ if (it_request_timeout != _custom_properties.end()) {
+ aws_config.requestTimeoutMs = std::stoi(it_request_timeout->second);
+ } else {
+ aws_config.requestTimeoutMs = 30000; // 30s default
+ }
+
+ auto it_conn_timeout =
_custom_properties.find("aws.connection.timeout.ms");
+ if (it_conn_timeout != _custom_properties.end()) {
+ aws_config.connectTimeoutMs = std::stoi(it_conn_timeout->second);
+ }
+
+ // Get credentials provider (reuses S3 infrastructure)
+ auto credentials_provider =
S3ClientFactory::instance().get_aws_credentials_provider(s3_conf);
+
+ // Create Kinesis client
+ _kinesis_client =
+
std::make_shared<Aws::Kinesis::KinesisClient>(credentials_provider, aws_config);
+
+ if (!_kinesis_client) {
+ return Status::InternalError(
+ "Failed to create AWS Kinesis client for stream: {}, region:
{}", _stream, _region);
+ }
+
+ LOG(INFO) << "Created Kinesis client for stream: " << _stream << ",
region: " << _region;
+ return Status::OK();
+}
+
+Status KinesisDataConsumer::assign_shards(
+ const std::map<std::string, std::string>& shard_sequence_numbers,
+ const std::string& stream_name, std::shared_ptr<StreamLoadContext>
ctx) {
+ DORIS_CHECK(_kinesis_client);
+
+ std::stringstream ss;
+ ss << "Assigning shards to Kinesis consumer " << _id << ": ";
+
+ for (auto& entry : shard_sequence_numbers) {
+ const std::string& shard_id = entry.first;
+ const std::string& sequence_number = entry.second;
+
+ // Get shard iterator for this shard
+ std::string iterator;
+ RETURN_IF_ERROR(_get_shard_iterator(shard_id, sequence_number,
&iterator));
+
+ _shard_iterators[shard_id] = iterator;
+ _consuming_shard_ids.insert(shard_id);
+
+ ss << "[" << shard_id << ": " << sequence_number << "] ";
+ }
+
+ LOG(INFO) << ss.str();
+ return Status::OK();
+}
+
+Status KinesisDataConsumer::_get_shard_iterator(const std::string& shard_id,
+ const std::string&
sequence_number,
+ std::string* iterator) {
+ Aws::Kinesis::Model::GetShardIteratorRequest request;
+ request.SetStreamName(_stream);
+ request.SetShardId(shard_id);
+
+ // Determine iterator type based on sequence number
+ if (sequence_number.empty() || sequence_number == "TRIM_HORIZON" ||
sequence_number == "-2") {
+ // Start from oldest record in shard
+
request.SetShardIteratorType(Aws::Kinesis::Model::ShardIteratorType::TRIM_HORIZON);
+ } else if (sequence_number == "LATEST" || sequence_number == "-1") {
+ // Start from newest record (records arriving after iterator creation)
+
request.SetShardIteratorType(Aws::Kinesis::Model::ShardIteratorType::LATEST);
+ } else {
+ // Resume from specific sequence number
+
request.SetShardIteratorType(Aws::Kinesis::Model::ShardIteratorType::AFTER_SEQUENCE_NUMBER);
+ request.SetStartingSequenceNumber(sequence_number);
+ }
+
+ auto outcome = _kinesis_client->GetShardIterator(request);
+ if (!outcome.IsSuccess()) {
+ auto& error = outcome.GetError();
+ return Status::InternalError("Failed to get shard iterator for shard
{}: {} ({})", shard_id,
+ error.GetMessage(),
static_cast<int>(error.GetErrorType()));
+ }
+
+ *iterator = outcome.GetResult().GetShardIterator();
+ VLOG_NOTICE << "Got shard iterator for shard: " << shard_id;
+ return Status::OK();
+}
+
+Status KinesisDataConsumer::group_consume(
+ BlockingQueue<std::shared_ptr<Aws::Kinesis::Model::Record>>* queue,
+ int64_t max_running_time_ms) {
+ static constexpr int MAX_RETRY_TIMES_FOR_TRANSPORT_FAILURE = 3;
+ static constexpr int RATE_LIMIT_BACKOFF_MS = 1000; // 1 second
+ static constexpr int KINESIS_GET_RECORDS_LIMIT = 1000; // Max 10000
+ static constexpr int INTER_SHARD_SLEEP_MS = 10; // Small sleep
between shards
+
+ int64_t left_time = max_running_time_ms;
+ LOG(INFO) << "start Kinesis consumer: " << _id << ", grp: " << _grp_id
+ << ", stream: " << _stream << ", max running time(ms): " <<
left_time;
+
+ int64_t received_rows = 0;
+ int64_t put_rows = 0;
+ int32_t retry_times = 0;
+ Status st = Status::OK();
+ bool done = false;
+
+ MonotonicStopWatch consumer_watch;
+ MonotonicStopWatch watch;
+ watch.start();
+
+ while (true) {
+ // Check cancellation flag
+ {
+ std::unique_lock<std::mutex> l(_lock);
+ if (_cancelled) {
+ break;
+ }
+ }
+
+ if (left_time <= 0) {
+ break;
+ }
+
+ // Round-robin through all active shards
+ for (auto it = _consuming_shard_ids.begin(); it !=
_consuming_shard_ids.end() && !done;) {
+ const std::string& shard_id = *it;
+ auto iter_it = _shard_iterators.find(shard_id);
+
+ if (iter_it == _shard_iterators.end() || iter_it->second.empty()) {
+ // Shard exhausted (closed due to split/merge), remove from
active set
+ LOG(INFO) << "Shard exhausted: " << shard_id;
+ it = _consuming_shard_ids.erase(it);
+ continue;
+ }
+
+ // Call Kinesis GetRecords API
+ consumer_watch.start();
+
+ Aws::Kinesis::Model::GetRecordsRequest request;
+ request.SetShardIterator(iter_it->second);
+ request.SetLimit(KINESIS_GET_RECORDS_LIMIT);
+
+ auto outcome = _kinesis_client->GetRecords(request);
+ consumer_watch.stop();
+
+ // Track metrics (reuse Kafka metrics, they're generic)
+ DorisMetrics::instance()->routine_load_get_msg_count->increment(1);
+ DorisMetrics::instance()->routine_load_get_msg_latency->increment(
+ consumer_watch.elapsed_time() / 1000 / 1000);
+
+ if (!outcome.IsSuccess()) {
+ auto& error = outcome.GetError();
+
+ // Handle throttling (ProvisionedThroughputExceededException)
+ if (error.GetErrorType() ==
+
Aws::Kinesis::KinesisErrors::PROVISIONED_THROUGHPUT_EXCEEDED) {
+ LOG(INFO) << "Kinesis rate limit exceeded for shard: " <<
shard_id
+ << ", backing off " << RATE_LIMIT_BACKOFF_MS <<
"ms";
+
std::this_thread::sleep_for(std::chrono::milliseconds(RATE_LIMIT_BACKOFF_MS));
+ ++it; // Move to next shard, will retry this one next round
+ continue;
+ }
+
+ // Handle retriable errors
+ if (_is_retriable_error(error)) {
+ LOG(INFO) << "Kinesis retriable error for shard " <<
shard_id << ": "
+ << error.GetMessage() << ", retry times: " <<
retry_times++;
+ if (retry_times <= MAX_RETRY_TIMES_FOR_TRANSPORT_FAILURE) {
Review Comment:
**MEDIUM: `retry_times` is shared across all shards in the round-robin
loop.** A retriable error on shard A increments the counter, and then a
retriable error on shard B further increments it. The total retries across all
shards combined is limited to `MAX_RETRY_TIMES_FOR_TRANSPORT_FAILURE` (3), not
3 per shard.
Also note: `retry_times++` is post-increment in the LOG statement, so the
logged value is the pre-increment value, but the comparison on the next line
uses the already-incremented value. This is confusing. Consider separating the
increment from the log statement.
##########
be/src/runtime/routine_load/data_consumer.cpp:
##########
@@ -556,4 +571,442 @@ bool
KafkaDataConsumer::match(std::shared_ptr<StreamLoadContext> ctx) {
return true;
}
+// ==================== AWS Kinesis Data Consumer Implementation
====================
+
+KinesisDataConsumer::KinesisDataConsumer(std::shared_ptr<StreamLoadContext>
ctx)
+ : _region(ctx->kinesis_info->region),
+ _stream(ctx->kinesis_info->stream),
+ _endpoint(ctx->kinesis_info->endpoint) {
+ VLOG_NOTICE << "construct Kinesis consumer: stream=" << _stream << ",
region=" << _region;
+}
+
+KinesisDataConsumer::~KinesisDataConsumer() {
+ VLOG_NOTICE << "destruct Kinesis consumer: stream=" << _stream;
+ // AWS SDK client managed by shared_ptr, will be automatically cleaned up
+}
+
+Status KinesisDataConsumer::init(std::shared_ptr<StreamLoadContext> ctx) {
+ std::unique_lock<std::mutex> l(_lock);
+ if (_init) {
+ return Status::OK(); // Already initialized (idempotent)
+ }
+
+ // Store custom properties (AWS credentials, etc.)
+ _custom_properties.insert(ctx->kinesis_info->properties.begin(),
+ ctx->kinesis_info->properties.end());
+
+ // Create AWS Kinesis client
+ RETURN_IF_ERROR(_create_kinesis_client(ctx));
+
+ VLOG_NOTICE << "finished to init Kinesis consumer. stream=" << _stream <<
", region=" << _region
+ << ", " << ctx->brief();
+ _init = true;
+ return Status::OK();
+}
+
+Status
KinesisDataConsumer::_create_kinesis_client(std::shared_ptr<StreamLoadContext>
ctx) {
+ // Reuse S3ClientFactory's credential provider logic
+ // This supports all AWS authentication methods:
+ // - Simple AK/SK
+ // - IAM instance profile (EC2)
+ // - STS assume role
+ // - Session tokens
+ // - Environment variables
+ // - Default credential chain
+
+ S3ClientConf s3_conf;
+ s3_conf.region = _region;
+ s3_conf.endpoint = _endpoint;
+
+ // Parse AWS credentials from properties
+ auto it_ak = _custom_properties.find("aws.access.key.id");
+ auto it_sk = _custom_properties.find("aws.secret.access.key");
+ auto it_token = _custom_properties.find("aws.session.token");
+ auto it_role_arn = _custom_properties.find("aws.iam.role.arn");
+ auto it_external_id = _custom_properties.find("aws.external.id");
+ auto it_provider = _custom_properties.find("aws.credentials.provider");
+
+ if (it_ak != _custom_properties.end()) {
+ s3_conf.ak = it_ak->second;
+ }
+ if (it_sk != _custom_properties.end()) {
+ s3_conf.sk = it_sk->second;
+ }
+ if (it_token != _custom_properties.end()) {
+ s3_conf.token = it_token->second;
+ }
+ if (it_role_arn != _custom_properties.end()) {
+ s3_conf.role_arn = it_role_arn->second;
+ }
+ if (it_external_id != _custom_properties.end()) {
+ s3_conf.external_id = it_external_id->second;
+ }
+ if (it_provider != _custom_properties.end()) {
+ // Map provider type string to enum
+ if (it_provider->second == "instance_profile") {
+ s3_conf.cred_provider_type = CredProviderType::InstanceProfile;
+ } else if (it_provider->second == "env") {
+ s3_conf.cred_provider_type = CredProviderType::Env;
+ } else if (it_provider->second == "simple") {
+ s3_conf.cred_provider_type = CredProviderType::Simple;
+ }
+ }
+
+ // Create AWS ClientConfiguration
+ Aws::Client::ClientConfiguration aws_config =
S3ClientFactory::getClientConfiguration();
+ aws_config.region = _region;
+
+ if (!_endpoint.empty()) {
+ aws_config.endpointOverride = _endpoint;
+ }
+
+ // Set timeouts from properties or use defaults
+ auto it_request_timeout =
_custom_properties.find("aws.request.timeout.ms");
+ if (it_request_timeout != _custom_properties.end()) {
+ aws_config.requestTimeoutMs = std::stoi(it_request_timeout->second);
+ } else {
+ aws_config.requestTimeoutMs = 30000; // 30s default
+ }
+
+ auto it_conn_timeout =
_custom_properties.find("aws.connection.timeout.ms");
+ if (it_conn_timeout != _custom_properties.end()) {
+ aws_config.connectTimeoutMs = std::stoi(it_conn_timeout->second);
+ }
+
+ // Get credentials provider (reuses S3 infrastructure)
+ auto credentials_provider =
S3ClientFactory::instance().get_aws_credentials_provider(s3_conf);
+
+ // Create Kinesis client
+ _kinesis_client =
+
std::make_shared<Aws::Kinesis::KinesisClient>(credentials_provider, aws_config);
+
+ if (!_kinesis_client) {
+ return Status::InternalError(
+ "Failed to create AWS Kinesis client for stream: {}, region:
{}", _stream, _region);
+ }
+
+ LOG(INFO) << "Created Kinesis client for stream: " << _stream << ",
region: " << _region;
+ return Status::OK();
+}
+
+Status KinesisDataConsumer::assign_shards(
+ const std::map<std::string, std::string>& shard_sequence_numbers,
+ const std::string& stream_name, std::shared_ptr<StreamLoadContext>
ctx) {
+ DORIS_CHECK(_kinesis_client);
+
+ std::stringstream ss;
+ ss << "Assigning shards to Kinesis consumer " << _id << ": ";
+
+ for (auto& entry : shard_sequence_numbers) {
+ const std::string& shard_id = entry.first;
+ const std::string& sequence_number = entry.second;
+
+ // Get shard iterator for this shard
+ std::string iterator;
+ RETURN_IF_ERROR(_get_shard_iterator(shard_id, sequence_number,
&iterator));
+
+ _shard_iterators[shard_id] = iterator;
+ _consuming_shard_ids.insert(shard_id);
+
+ ss << "[" << shard_id << ": " << sequence_number << "] ";
+ }
+
+ LOG(INFO) << ss.str();
+ return Status::OK();
+}
+
+Status KinesisDataConsumer::_get_shard_iterator(const std::string& shard_id,
+ const std::string&
sequence_number,
+ std::string* iterator) {
+ Aws::Kinesis::Model::GetShardIteratorRequest request;
+ request.SetStreamName(_stream);
+ request.SetShardId(shard_id);
+
+ // Determine iterator type based on sequence number
+ if (sequence_number.empty() || sequence_number == "TRIM_HORIZON" ||
sequence_number == "-2") {
+ // Start from oldest record in shard
+
request.SetShardIteratorType(Aws::Kinesis::Model::ShardIteratorType::TRIM_HORIZON);
+ } else if (sequence_number == "LATEST" || sequence_number == "-1") {
+ // Start from newest record (records arriving after iterator creation)
+
request.SetShardIteratorType(Aws::Kinesis::Model::ShardIteratorType::LATEST);
+ } else {
+ // Resume from specific sequence number
+
request.SetShardIteratorType(Aws::Kinesis::Model::ShardIteratorType::AFTER_SEQUENCE_NUMBER);
+ request.SetStartingSequenceNumber(sequence_number);
+ }
+
+ auto outcome = _kinesis_client->GetShardIterator(request);
+ if (!outcome.IsSuccess()) {
+ auto& error = outcome.GetError();
+ return Status::InternalError("Failed to get shard iterator for shard
{}: {} ({})", shard_id,
+ error.GetMessage(),
static_cast<int>(error.GetErrorType()));
+ }
+
+ *iterator = outcome.GetResult().GetShardIterator();
+ VLOG_NOTICE << "Got shard iterator for shard: " << shard_id;
+ return Status::OK();
+}
+
+Status KinesisDataConsumer::group_consume(
+ BlockingQueue<std::shared_ptr<Aws::Kinesis::Model::Record>>* queue,
+ int64_t max_running_time_ms) {
+ static constexpr int MAX_RETRY_TIMES_FOR_TRANSPORT_FAILURE = 3;
+ static constexpr int RATE_LIMIT_BACKOFF_MS = 1000; // 1 second
+ static constexpr int KINESIS_GET_RECORDS_LIMIT = 1000; // Max 10000
+ static constexpr int INTER_SHARD_SLEEP_MS = 10; // Small sleep
between shards
+
+ int64_t left_time = max_running_time_ms;
+ LOG(INFO) << "start Kinesis consumer: " << _id << ", grp: " << _grp_id
+ << ", stream: " << _stream << ", max running time(ms): " <<
left_time;
+
+ int64_t received_rows = 0;
+ int64_t put_rows = 0;
+ int32_t retry_times = 0;
+ Status st = Status::OK();
+ bool done = false;
+
+ MonotonicStopWatch consumer_watch;
+ MonotonicStopWatch watch;
+ watch.start();
+
+ while (true) {
+ // Check cancellation flag
+ {
+ std::unique_lock<std::mutex> l(_lock);
+ if (_cancelled) {
+ break;
+ }
+ }
+
+ if (left_time <= 0) {
+ break;
+ }
+
+ // Round-robin through all active shards
+ for (auto it = _consuming_shard_ids.begin(); it !=
_consuming_shard_ids.end() && !done;) {
+ const std::string& shard_id = *it;
+ auto iter_it = _shard_iterators.find(shard_id);
+
+ if (iter_it == _shard_iterators.end() || iter_it->second.empty()) {
+ // Shard exhausted (closed due to split/merge), remove from
active set
+ LOG(INFO) << "Shard exhausted: " << shard_id;
+ it = _consuming_shard_ids.erase(it);
+ continue;
+ }
+
+ // Call Kinesis GetRecords API
+ consumer_watch.start();
+
+ Aws::Kinesis::Model::GetRecordsRequest request;
+ request.SetShardIterator(iter_it->second);
+ request.SetLimit(KINESIS_GET_RECORDS_LIMIT);
+
+ auto outcome = _kinesis_client->GetRecords(request);
+ consumer_watch.stop();
+
+ // Track metrics (reuse Kafka metrics, they're generic)
+ DorisMetrics::instance()->routine_load_get_msg_count->increment(1);
+ DorisMetrics::instance()->routine_load_get_msg_latency->increment(
+ consumer_watch.elapsed_time() / 1000 / 1000);
+
+ if (!outcome.IsSuccess()) {
+ auto& error = outcome.GetError();
+
+ // Handle throttling (ProvisionedThroughputExceededException)
+ if (error.GetErrorType() ==
+
Aws::Kinesis::KinesisErrors::PROVISIONED_THROUGHPUT_EXCEEDED) {
+ LOG(INFO) << "Kinesis rate limit exceeded for shard: " <<
shard_id
+ << ", backing off " << RATE_LIMIT_BACKOFF_MS <<
"ms";
+
std::this_thread::sleep_for(std::chrono::milliseconds(RATE_LIMIT_BACKOFF_MS));
+ ++it; // Move to next shard, will retry this one next round
+ continue;
+ }
+
+ // Handle retriable errors
+ if (_is_retriable_error(error)) {
+ LOG(INFO) << "Kinesis retriable error for shard " <<
shard_id << ": "
+ << error.GetMessage() << ", retry times: " <<
retry_times++;
+ if (retry_times <= MAX_RETRY_TIMES_FOR_TRANSPORT_FAILURE) {
+
std::this_thread::sleep_for(std::chrono::milliseconds(200));
+ continue;
+ }
+ }
+
+ // Fatal error
+ LOG(WARNING) << "Kinesis consume failed for shard " <<
shard_id << ": "
+ << error.GetMessage() << " (" <<
static_cast<int>(error.GetErrorType())
+ << ")";
+ st = Status::InternalError("Kinesis GetRecords failed for
shard {}: {}", shard_id,
+ error.GetMessage());
+ done = true;
+ break;
+ }
+
+ // Reset retry counter on success
+ retry_times = 0;
+
+ // Process records - move result to allow moving individual records
+ auto result = outcome.GetResultWithOwnership();
+ auto millis_behind = result.GetMillisBehindLatest();
+ std::string next_iterator = result.GetNextShardIterator();
+ RETURN_IF_ERROR(_process_records(shard_id, std::move(result),
queue, &received_rows,
+ &put_rows));
+
+ // Track MillisBehindLatest for this shard (used by FE for lag
monitoring & scheduling)
+ _millis_behind_latest[shard_id] = millis_behind;
+
+ // Update shard iterator for next call
+ if (next_iterator.empty()) {
+ // Shard is closed (split/merge), remove from active set
+ LOG(INFO) << "Shard closed: " << shard_id << " (split/merge
detected)";
+ _shard_iterators.erase(shard_id);
+ it = _consuming_shard_ids.erase(it);
+ } else {
+ _shard_iterators[shard_id] = next_iterator;
+ ++it;
+ }
+
+ // Check if all shards are exhausted
+ if (_consuming_shard_ids.empty()) {
+ LOG(INFO) << "All shards exhausted for consumer: " << _id;
+ done = true;
+ break;
+ }
+
+ // Small sleep to avoid tight loop
+
std::this_thread::sleep_for(std::chrono::milliseconds(INTER_SHARD_SLEEP_MS));
+ }
+
+ left_time = max_running_time_ms - watch.elapsed_time() / 1000 / 1000;
+ if (done) {
+ break;
+ }
+ }
+
+ LOG(INFO) << "Kinesis consumer done: " << _id << ", grp: " << _grp_id
+ << ". cancelled: " << _cancelled << ", left time(ms): " <<
left_time
+ << ", total cost(ms): " << watch.elapsed_time() / 1000 / 1000
+ << ", consume cost(ms): " << consumer_watch.elapsed_time() /
1000 / 1000
+ << ", received rows: " << received_rows << ", put rows: " <<
put_rows;
+
+ return st;
+}
+
+Status KinesisDataConsumer::_process_records(
+ const std::string& shard_id, Aws::Kinesis::Model::GetRecordsResult
result,
+ BlockingQueue<std::shared_ptr<Aws::Kinesis::Model::Record>>* queue,
int64_t* received_rows,
+ int64_t* put_rows) {
+ // result is owned by value, safe to get mutable access to its records
+ auto records =
+
std::move(const_cast<Aws::Vector<Aws::Kinesis::Model::Record>&>(result.GetRecords()));
+
Review Comment:
**HIGH: `const_cast` to move records out of `GetRecordsResult` is fragile.**
```cpp
auto records =
std::move(const_cast<Aws::Vector<Aws::Kinesis::Model::Record>&>(result.GetRecords()));
```
While the comment says "result is owned by value, safe to get mutable
access", this relies on `GetRecords()` returning a reference to an internal
member (not a copy). If the AWS SDK changes the return type to return by value,
this `const_cast` + move would operate on a temporary and leave the actual
records intact.
Since `result` is passed by value and is a local, a safer approach would be
to iterate `result.GetRecords()` directly, or to check if the AWS SDK provides
a mutable accessor (some SDK versions have `GetRecords()` non-const overloads).
##########
be/src/runtime/routine_load/data_consumer_pool.cpp:
##########
@@ -77,22 +80,34 @@ Status
DataConsumerPool::get_consumer(std::shared_ptr<StreamLoadContext> ctx,
Status DataConsumerPool::get_consumer_grp(std::shared_ptr<StreamLoadContext>
ctx,
std::shared_ptr<DataConsumerGroup>*
ret) {
- if (ctx->load_src_type != TLoadSourceType::KAFKA) {
- return Status::InternalError(
- "PAUSE: Currently only support consumer group for Kafka data
source");
+ // one data consumer group contains at least one data consumers.
+ size_t consumer_num = config::max_consumer_num_per_group;
+ std::shared_ptr<DataConsumerGroup> grp;
+ switch (ctx->load_src_type) {
+ case TLoadSourceType::KAFKA: {
+ DCHECK(ctx->kafka_info);
+ if (ctx->kafka_info->begin_offset.size() == 0) {
+ return Status::InternalError(
+ "PAUSE: The size of begin_offset of task should not be
0.");
+ }
+ consumer_num = std::min(consumer_num,
ctx->kafka_info->begin_offset.size());
+ grp = std::make_shared<KafkaDataConsumerGroup>(consumer_num);
+ break;
}
- DCHECK(ctx->kafka_info);
-
- if (ctx->kafka_info->begin_offset.size() == 0) {
- return Status::InternalError("PAUSE: The size of begin_offset of task
should not be 0.");
+ case TLoadSourceType::KINESIS: {
+ DCHECK(ctx->kinesis_info);
+ if (ctx->kinesis_info->begin_sequence_number.size() == 0) {
+ return Status::InternalError(
+ "PAUSE: The size of begin_sequence_number of task should
not be 0.");
+ }
+ // Kinesis uses a single consumer to handle all shards
+ consumer_num = 1;
+ grp = std::make_shared<KinesisDataConsumerGroup>(consumer_num);
+ break;
+ }
+ default:
+ return Status::Cancelled("unknown routine load task type: {}",
ctx->load_type);
}
Review Comment:
**Wrong error status:** `Status::Cancelled` is used for an unknown routine
load task type, but this represents an internal logic error, not a
user-initiated cancellation. Should be `Status::InternalError` to match the
original behavior and semantics.
##########
be/src/runtime/routine_load/data_consumer.cpp:
##########
@@ -556,4 +571,442 @@ bool
KafkaDataConsumer::match(std::shared_ptr<StreamLoadContext> ctx) {
return true;
}
+// ==================== AWS Kinesis Data Consumer Implementation
====================
+
+KinesisDataConsumer::KinesisDataConsumer(std::shared_ptr<StreamLoadContext>
ctx)
+ : _region(ctx->kinesis_info->region),
+ _stream(ctx->kinesis_info->stream),
+ _endpoint(ctx->kinesis_info->endpoint) {
+ VLOG_NOTICE << "construct Kinesis consumer: stream=" << _stream << ",
region=" << _region;
+}
+
+KinesisDataConsumer::~KinesisDataConsumer() {
+ VLOG_NOTICE << "destruct Kinesis consumer: stream=" << _stream;
+ // AWS SDK client managed by shared_ptr, will be automatically cleaned up
+}
+
+Status KinesisDataConsumer::init(std::shared_ptr<StreamLoadContext> ctx) {
+ std::unique_lock<std::mutex> l(_lock);
+ if (_init) {
+ return Status::OK(); // Already initialized (idempotent)
+ }
+
+ // Store custom properties (AWS credentials, etc.)
+ _custom_properties.insert(ctx->kinesis_info->properties.begin(),
+ ctx->kinesis_info->properties.end());
+
+ // Create AWS Kinesis client
+ RETURN_IF_ERROR(_create_kinesis_client(ctx));
+
+ VLOG_NOTICE << "finished to init Kinesis consumer. stream=" << _stream <<
", region=" << _region
+ << ", " << ctx->brief();
+ _init = true;
+ return Status::OK();
+}
+
+Status
KinesisDataConsumer::_create_kinesis_client(std::shared_ptr<StreamLoadContext>
ctx) {
+ // Reuse S3ClientFactory's credential provider logic
+ // This supports all AWS authentication methods:
+ // - Simple AK/SK
+ // - IAM instance profile (EC2)
+ // - STS assume role
+ // - Session tokens
+ // - Environment variables
+ // - Default credential chain
+
+ S3ClientConf s3_conf;
+ s3_conf.region = _region;
+ s3_conf.endpoint = _endpoint;
+
+ // Parse AWS credentials from properties
+ auto it_ak = _custom_properties.find("aws.access.key.id");
+ auto it_sk = _custom_properties.find("aws.secret.access.key");
+ auto it_token = _custom_properties.find("aws.session.token");
+ auto it_role_arn = _custom_properties.find("aws.iam.role.arn");
+ auto it_external_id = _custom_properties.find("aws.external.id");
+ auto it_provider = _custom_properties.find("aws.credentials.provider");
+
+ if (it_ak != _custom_properties.end()) {
+ s3_conf.ak = it_ak->second;
+ }
+ if (it_sk != _custom_properties.end()) {
+ s3_conf.sk = it_sk->second;
+ }
+ if (it_token != _custom_properties.end()) {
+ s3_conf.token = it_token->second;
+ }
+ if (it_role_arn != _custom_properties.end()) {
+ s3_conf.role_arn = it_role_arn->second;
+ }
+ if (it_external_id != _custom_properties.end()) {
+ s3_conf.external_id = it_external_id->second;
+ }
+ if (it_provider != _custom_properties.end()) {
+ // Map provider type string to enum
+ if (it_provider->second == "instance_profile") {
+ s3_conf.cred_provider_type = CredProviderType::InstanceProfile;
+ } else if (it_provider->second == "env") {
+ s3_conf.cred_provider_type = CredProviderType::Env;
+ } else if (it_provider->second == "simple") {
+ s3_conf.cred_provider_type = CredProviderType::Simple;
+ }
+ }
+
+ // Create AWS ClientConfiguration
+ Aws::Client::ClientConfiguration aws_config =
S3ClientFactory::getClientConfiguration();
+ aws_config.region = _region;
+
+ if (!_endpoint.empty()) {
+ aws_config.endpointOverride = _endpoint;
+ }
+
+ // Set timeouts from properties or use defaults
+ auto it_request_timeout =
_custom_properties.find("aws.request.timeout.ms");
+ if (it_request_timeout != _custom_properties.end()) {
+ aws_config.requestTimeoutMs = std::stoi(it_request_timeout->second);
+ } else {
+ aws_config.requestTimeoutMs = 30000; // 30s default
+ }
+
+ auto it_conn_timeout =
_custom_properties.find("aws.connection.timeout.ms");
+ if (it_conn_timeout != _custom_properties.end()) {
+ aws_config.connectTimeoutMs = std::stoi(it_conn_timeout->second);
+ }
+
+ // Get credentials provider (reuses S3 infrastructure)
+ auto credentials_provider =
S3ClientFactory::instance().get_aws_credentials_provider(s3_conf);
+
+ // Create Kinesis client
+ _kinesis_client =
+
std::make_shared<Aws::Kinesis::KinesisClient>(credentials_provider, aws_config);
+
+ if (!_kinesis_client) {
+ return Status::InternalError(
+ "Failed to create AWS Kinesis client for stream: {}, region:
{}", _stream, _region);
+ }
+
+ LOG(INFO) << "Created Kinesis client for stream: " << _stream << ",
region: " << _region;
+ return Status::OK();
+}
+
+Status KinesisDataConsumer::assign_shards(
+ const std::map<std::string, std::string>& shard_sequence_numbers,
+ const std::string& stream_name, std::shared_ptr<StreamLoadContext>
ctx) {
+ DORIS_CHECK(_kinesis_client);
+
+ std::stringstream ss;
+ ss << "Assigning shards to Kinesis consumer " << _id << ": ";
+
+ for (auto& entry : shard_sequence_numbers) {
+ const std::string& shard_id = entry.first;
+ const std::string& sequence_number = entry.second;
+
+ // Get shard iterator for this shard
+ std::string iterator;
+ RETURN_IF_ERROR(_get_shard_iterator(shard_id, sequence_number,
&iterator));
+
+ _shard_iterators[shard_id] = iterator;
+ _consuming_shard_ids.insert(shard_id);
+
+ ss << "[" << shard_id << ": " << sequence_number << "] ";
+ }
+
+ LOG(INFO) << ss.str();
+ return Status::OK();
+}
+
+Status KinesisDataConsumer::_get_shard_iterator(const std::string& shard_id,
+ const std::string&
sequence_number,
+ std::string* iterator) {
+ Aws::Kinesis::Model::GetShardIteratorRequest request;
+ request.SetStreamName(_stream);
+ request.SetShardId(shard_id);
+
+ // Determine iterator type based on sequence number
+ if (sequence_number.empty() || sequence_number == "TRIM_HORIZON" ||
sequence_number == "-2") {
+ // Start from oldest record in shard
+
request.SetShardIteratorType(Aws::Kinesis::Model::ShardIteratorType::TRIM_HORIZON);
+ } else if (sequence_number == "LATEST" || sequence_number == "-1") {
+ // Start from newest record (records arriving after iterator creation)
+
request.SetShardIteratorType(Aws::Kinesis::Model::ShardIteratorType::LATEST);
+ } else {
+ // Resume from specific sequence number
+
request.SetShardIteratorType(Aws::Kinesis::Model::ShardIteratorType::AFTER_SEQUENCE_NUMBER);
+ request.SetStartingSequenceNumber(sequence_number);
+ }
+
+ auto outcome = _kinesis_client->GetShardIterator(request);
+ if (!outcome.IsSuccess()) {
+ auto& error = outcome.GetError();
+ return Status::InternalError("Failed to get shard iterator for shard
{}: {} ({})", shard_id,
+ error.GetMessage(),
static_cast<int>(error.GetErrorType()));
+ }
+
+ *iterator = outcome.GetResult().GetShardIterator();
+ VLOG_NOTICE << "Got shard iterator for shard: " << shard_id;
+ return Status::OK();
+}
+
+Status KinesisDataConsumer::group_consume(
+ BlockingQueue<std::shared_ptr<Aws::Kinesis::Model::Record>>* queue,
+ int64_t max_running_time_ms) {
+ static constexpr int MAX_RETRY_TIMES_FOR_TRANSPORT_FAILURE = 3;
+ static constexpr int RATE_LIMIT_BACKOFF_MS = 1000; // 1 second
+ static constexpr int KINESIS_GET_RECORDS_LIMIT = 1000; // Max 10000
+ static constexpr int INTER_SHARD_SLEEP_MS = 10; // Small sleep
between shards
+
+ int64_t left_time = max_running_time_ms;
+ LOG(INFO) << "start Kinesis consumer: " << _id << ", grp: " << _grp_id
+ << ", stream: " << _stream << ", max running time(ms): " <<
left_time;
+
+ int64_t received_rows = 0;
+ int64_t put_rows = 0;
+ int32_t retry_times = 0;
+ Status st = Status::OK();
+ bool done = false;
+
+ MonotonicStopWatch consumer_watch;
+ MonotonicStopWatch watch;
+ watch.start();
+
+ while (true) {
+ // Check cancellation flag
+ {
+ std::unique_lock<std::mutex> l(_lock);
+ if (_cancelled) {
+ break;
+ }
+ }
+
+ if (left_time <= 0) {
+ break;
+ }
+
+ // Round-robin through all active shards
+ for (auto it = _consuming_shard_ids.begin(); it !=
_consuming_shard_ids.end() && !done;) {
+ const std::string& shard_id = *it;
+ auto iter_it = _shard_iterators.find(shard_id);
+
+ if (iter_it == _shard_iterators.end() || iter_it->second.empty()) {
+ // Shard exhausted (closed due to split/merge), remove from
active set
+ LOG(INFO) << "Shard exhausted: " << shard_id;
+ it = _consuming_shard_ids.erase(it);
+ continue;
+ }
+
+ // Call Kinesis GetRecords API
+ consumer_watch.start();
+
+ Aws::Kinesis::Model::GetRecordsRequest request;
+ request.SetShardIterator(iter_it->second);
+ request.SetLimit(KINESIS_GET_RECORDS_LIMIT);
+
+ auto outcome = _kinesis_client->GetRecords(request);
+ consumer_watch.stop();
+
+ // Track metrics (reuse Kafka metrics, they're generic)
+ DorisMetrics::instance()->routine_load_get_msg_count->increment(1);
+ DorisMetrics::instance()->routine_load_get_msg_latency->increment(
+ consumer_watch.elapsed_time() / 1000 / 1000);
+
+ if (!outcome.IsSuccess()) {
+ auto& error = outcome.GetError();
+
+ // Handle throttling (ProvisionedThroughputExceededException)
+ if (error.GetErrorType() ==
+
Aws::Kinesis::KinesisErrors::PROVISIONED_THROUGHPUT_EXCEEDED) {
+ LOG(INFO) << "Kinesis rate limit exceeded for shard: " <<
shard_id
+ << ", backing off " << RATE_LIMIT_BACKOFF_MS <<
"ms";
+
std::this_thread::sleep_for(std::chrono::milliseconds(RATE_LIMIT_BACKOFF_MS));
+ ++it; // Move to next shard, will retry this one next round
+ continue;
+ }
+
+ // Handle retriable errors
+ if (_is_retriable_error(error)) {
+ LOG(INFO) << "Kinesis retriable error for shard " <<
shard_id << ": "
+ << error.GetMessage() << ", retry times: " <<
retry_times++;
+ if (retry_times <= MAX_RETRY_TIMES_FOR_TRANSPORT_FAILURE) {
+
std::this_thread::sleep_for(std::chrono::milliseconds(200));
+ continue;
+ }
+ }
+
+ // Fatal error
+ LOG(WARNING) << "Kinesis consume failed for shard " <<
shard_id << ": "
+ << error.GetMessage() << " (" <<
static_cast<int>(error.GetErrorType())
+ << ")";
+ st = Status::InternalError("Kinesis GetRecords failed for
shard {}: {}", shard_id,
+ error.GetMessage());
+ done = true;
+ break;
+ }
+
+ // Reset retry counter on success
+ retry_times = 0;
+
+ // Process records - move result to allow moving individual records
+ auto result = outcome.GetResultWithOwnership();
+ auto millis_behind = result.GetMillisBehindLatest();
+ std::string next_iterator = result.GetNextShardIterator();
+ RETURN_IF_ERROR(_process_records(shard_id, std::move(result),
queue, &received_rows,
+ &put_rows));
Review Comment:
**Inconsistent error handling in `group_consume` loop.** This
`RETURN_IF_ERROR` returns immediately on `_process_records` failure, bypassing
the final LOG statement that reports consumption statistics.
Other error paths in this loop (lines 839-842) set `st = ...; done = true;
break;` which allows the final log to execute. This should follow the same
pattern for consistency and debuggability.
##########
be/src/runtime/routine_load/data_consumer_group.h:
##########
@@ -102,10 +101,30 @@ class KafkaDataConsumerGroup : public DataConsumerGroup {
BlockingQueue<RdKafka::Message*>* queue, int64_t
max_running_time_ms,
ConsumeFinishCallback cb);
-private:
// blocking queue to receive msgs from all consumers
BlockingQueue<RdKafka::Message*> _queue;
};
Review Comment:
**Accidental visibility change:** `_queue` in `KafkaDataConsumerGroup` was
previously `private` but is now in a public section (no access specifier
between `actual_consume` and `_queue`). This appears to be an unintentional
regression from refactoring. Add `private:` before `_queue`.
##########
fe/fe-core/src/main/java/org/apache/doris/load/routineload/KinesisRoutineLoadJob.java:
##########
@@ -0,0 +1,785 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.routineload;
+
+import org.apache.doris.analysis.ImportColumnDesc;
+import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.RandomDistributionInfo;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.InternalErrorCode;
+import org.apache.doris.common.LoadException;
+import org.apache.doris.common.Pair;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.common.util.LogBuilder;
+import org.apache.doris.common.util.LogKey;
+import org.apache.doris.datasource.kinesis.KinesisUtil;
+import org.apache.doris.load.routineload.kinesis.KinesisConfiguration;
+import org.apache.doris.load.routineload.kinesis.KinesisDataSourceProperties;
+import org.apache.doris.nereids.load.NereidsImportColumnDesc;
+import org.apache.doris.nereids.load.NereidsLoadTaskInfo;
+import org.apache.doris.nereids.load.NereidsLoadUtils;
+import org.apache.doris.nereids.load.NereidsRoutineLoadTaskInfo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.plans.commands.AlterRoutineLoadCommand;
+import
org.apache.doris.nereids.trees.plans.commands.info.CreateRoutineLoadInfo;
+import org.apache.doris.persist.AlterRoutineLoadJobOperationLog;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.thrift.TFileCompressType;
+import org.apache.doris.thrift.TPartialUpdateNewRowPolicy;
+import org.apache.doris.transaction.TransactionState;
+import org.apache.doris.transaction.TransactionStatus;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.annotations.SerializedName;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.BooleanUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * KinesisRoutineLoadJob is a RoutineLoadJob that fetches data from AWS
Kinesis streams.
+ *
+ * Key concepts:
+ * - Stream: Named collection of data records (similar to Kafka topic)
+ * - Shard: Sequence of data records in a stream (similar to Kafka partition)
+ * - Sequence Number: Unique identifier for each record within a shard
(similar to Kafka offset)
+ * - Consumer: Application that reads from a stream
+ *
+ * The progress tracks sequence numbers for each shard, represented as:
+ * {"shardId-000000000000":
"49590338271490256608559692538361571095921575989136588802", ...}
+ */
+public class KinesisRoutineLoadJob extends RoutineLoadJob {
+ private static final Logger LOG =
LogManager.getLogger(KinesisRoutineLoadJob.class);
+
+ public static final String KINESIS_FILE_CATALOG = "kinesis";
+
+ @SerializedName("rg")
+ private String region;
+ @SerializedName("stm")
+ private String stream;
+ @SerializedName("ep")
+ private String endpoint;
+
+ // optional, user want to load shards(Kafka's cskp).
+ @SerializedName("csks")
+ private List<String> customKinesisShards = Lists.newArrayList();
+
+ // current shards being consumed.
+ // updated periodically because shards may split or merge.
+ private List<String> currentKinesisShards = Lists.newArrayList();
+
+ // Default starting position for new shards.
+ // Values: TRIM_HORIZON, LATEST, or a timestamp string.
+ private String kinesisDefaultPosition = "";
+
+ // custom Kinesis properties including AWS credentials and client settings.
+ @SerializedName("prop")
+ private Map<String, String> customProperties = Maps.newHashMap();
+ private Map<String, String> convertedCustomProperties = Maps.newHashMap();
+
+ // The latest offset of each partition fetched from kinesis server.
+ // Will be updated periodically by calling hasMoreDataToConsume()
+ private Map<String, Long> cachedShardWithMillsBehindLatest =
Maps.newConcurrentMap();
+
+ // newly discovered shards from Kinesis.
+ private List<String> newCurrentKinesisShards = Lists.newArrayList();
+
+ public KinesisRoutineLoadJob() {
+ // For serialization
+ super(-1, LoadDataSourceType.KINESIS);
+ }
+
+ public KinesisRoutineLoadJob(Long id, String name, long dbId, long tableId,
+ String region, String stream, UserIdentity
userIdentity) {
+ super(id, name, dbId, tableId, LoadDataSourceType.KINESIS,
userIdentity);
+ this.region = region;
+ this.stream = stream;
+ this.progress = new KinesisProgress();
+ }
+
+ public KinesisRoutineLoadJob(Long id, String name, long dbId,
+ String region, String stream,
+ UserIdentity userIdentity, boolean
isMultiTable) {
+ super(id, name, dbId, LoadDataSourceType.KINESIS, userIdentity);
+ this.region = region;
+ this.stream = stream;
+ this.progress = new KinesisProgress();
+ setMultiTable(isMultiTable);
+ }
+
+ public String getRegion() {
+ return region;
+ }
+
+ public String getStream() {
+ return stream;
+ }
+
+ public String getEndpoint() {
+ return endpoint;
+ }
+
+ public Map<String, String> getConvertedCustomProperties() {
+ return convertedCustomProperties;
+ }
+
+ @Override
+ public void prepare() throws UserException {
+ // should reset converted properties each time the job being prepared.
+ // because the file info can be changed anytime.
+ convertCustomProperties(true);
+ }
+
+ private void convertCustomProperties(boolean rebuild) throws DdlException {
+ if (customProperties.isEmpty()) {
+ return;
+ }
+
+ if (!rebuild && !convertedCustomProperties.isEmpty()) {
+ return;
+ }
+
+ if (rebuild) {
+ convertedCustomProperties.clear();
+ }
+
+ for (Map.Entry<String, String> entry : customProperties.entrySet()) {
+ convertedCustomProperties.put(entry.getKey(), entry.getValue());
+ }
+
+ // Handle default position
+ if
(convertedCustomProperties.containsKey(KinesisConfiguration.KINESIS_DEFAULT_POSITION.getName()))
{
+ kinesisDefaultPosition = convertedCustomProperties.remove(
+ KinesisConfiguration.KINESIS_DEFAULT_POSITION.getName());
+ }
+ }
+
+ private String convertedDefaultPosition() {
+ if (this.kinesisDefaultPosition.isEmpty()) {
+ return KinesisProgress.POSITION_LATEST;
+ }
+ return this.kinesisDefaultPosition;
+ }
+
+ @Override
+ public void divideRoutineLoadJob(int currentConcurrentTaskNum) throws
UserException {
+ List<RoutineLoadTaskInfo> result = new ArrayList<>();
+ writeLock();
+ try {
+ if (state == JobState.NEED_SCHEDULE) {
+ // Divide shards into tasks
+ for (int i = 0; i < currentConcurrentTaskNum; i++) {
+ Map<String, String> taskKinesisProgress =
Maps.newHashMap();
+ for (int j = i; j < currentKinesisShards.size(); j = j +
currentConcurrentTaskNum) {
+ String shardId = currentKinesisShards.get(j);
+ taskKinesisProgress.put(shardId,
+ ((KinesisProgress)
progress).getSequenceNumberByShard(shardId));
+ }
+ KinesisTaskInfo kinesisTaskInfo = new
KinesisTaskInfo(UUID.randomUUID(), id,
+ getTimeout() * 1000, taskKinesisProgress,
isMultiTable(), -1, false);
+ routineLoadTaskInfoList.add(kinesisTaskInfo);
+ result.add(kinesisTaskInfo);
+ }
+ // Change job state to running
+ if (!result.isEmpty()) {
+ unprotectUpdateState(JobState.RUNNING, null, false);
+ }
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ignore to divide routine load job while job
state {}", state);
+ }
+ }
+ // Save task into queue of needScheduleTasks
+
Env.getCurrentEnv().getRoutineLoadTaskScheduler().addTasksInQueue(result);
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ @Override
+ public int calculateCurrentConcurrentTaskNum() {
+ int shardNum = currentKinesisShards.size();
+ if (desireTaskConcurrentNum == 0) {
+ desireTaskConcurrentNum =
Config.max_routine_load_task_concurrent_num;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("current concurrent task number is min"
+ + "(shard num: {}, desire task concurrent num: {}, config:
{})",
+ shardNum, desireTaskConcurrentNum,
Config.max_routine_load_task_concurrent_num);
+ }
+ currentTaskConcurrentNum = Math.min(shardNum,
Math.min(desireTaskConcurrentNum,
+ Config.max_routine_load_task_concurrent_num));
+ return currentTaskConcurrentNum;
+ }
+
+ @Override
+ protected boolean checkCommitInfo(RLTaskTxnCommitAttachment
rlTaskTxnCommitAttachment,
+ TransactionState txnState,
+ TransactionState.TxnStatusChangeReason
txnStatusChangeReason) {
+ if (txnState.getTransactionStatus() == TransactionStatus.COMMITTED
+ || txnState.getTransactionStatus() ==
TransactionStatus.VISIBLE) {
+ return true;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("no need to update the progress of kinesis routine load.
txn status: {}, "
+ + "txnStatusChangeReason: {}, task: {}, job: {}",
+ txnState.getTransactionStatus(), txnStatusChangeReason,
+ DebugUtil.printId(rlTaskTxnCommitAttachment.getTaskId()),
id);
+ }
+ return false;
+ }
+
+ private void updateProgressAndOffsetsCache(RLTaskTxnCommitAttachment
attachment) {
+ KinesisProgress taskProgress = (KinesisProgress)
attachment.getProgress();
+
+ // Update cachedShardWithMillsBehindLatest from the MillisBehindLatest
values
+ // reported by BE's GetRecords response. Keep the maximum value across
concurrent tasks
+ // to avoid a stale (lower) value from one task overwriting a fresher
value from another.
+ taskProgress.getShardIdToMillsBehindLatest().forEach((shardId, millis)
->
+ cachedShardWithMillsBehindLatest.merge(shardId, millis,
Math::max));
+
+ this.progress.update(attachment);
+ }
+
+ @Override
+ protected void updateProgress(RLTaskTxnCommitAttachment attachment) throws
UserException {
+ updateProgressAndOffsetsCache(attachment);
+ super.updateProgress(attachment);
+ }
+
+ @Override
+ protected void replayUpdateProgress(RLTaskTxnCommitAttachment attachment) {
+ super.replayUpdateProgress(attachment);
+ updateProgressAndOffsetsCache(attachment);
+ }
+
+ @Override
+ protected RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo
routineLoadTaskInfo, boolean delaySchedule) {
+ KinesisTaskInfo oldKinesisTaskInfo = (KinesisTaskInfo)
routineLoadTaskInfo;
+ // Add new task
+ KinesisTaskInfo kinesisTaskInfo = new
KinesisTaskInfo(oldKinesisTaskInfo,
+ ((KinesisProgress)
progress).getShardIdToSequenceNumber(oldKinesisTaskInfo.getShards()),
+ isMultiTable());
+ kinesisTaskInfo.setDelaySchedule(delaySchedule);
+ // Remove old task
+ routineLoadTaskInfoList.remove(routineLoadTaskInfo);
+ // Add new task
+ routineLoadTaskInfoList.add(kinesisTaskInfo);
+ return kinesisTaskInfo;
+ }
+
+ @Override
+ protected void unprotectUpdateProgress() throws UserException {
+ updateNewShardProgress();
+ }
+
+ @Override
+ protected boolean refreshKafkaPartitions(boolean needAutoResume) throws
UserException {
+ // For Kinesis, we refresh shards instead of Kafka partitions
Review Comment:
**Naming issue:** This Kinesis class overrides a method named
`refreshKafkaPartitions`. While this is inherited from the parent class
interface, the parent method name should be generalized (e.g.,
`refreshDataSourcePartitions`) now that multiple data sources are supported. At
minimum, add a comment explaining why a Kinesis class overrides a Kafka-named
method.
##########
fe/fe-core/src/main/java/org/apache/doris/load/routineload/KinesisRoutineLoadJob.java:
##########
@@ -0,0 +1,785 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.routineload;
+
+import org.apache.doris.analysis.ImportColumnDesc;
+import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.RandomDistributionInfo;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.InternalErrorCode;
+import org.apache.doris.common.LoadException;
+import org.apache.doris.common.Pair;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.common.util.LogBuilder;
+import org.apache.doris.common.util.LogKey;
+import org.apache.doris.datasource.kinesis.KinesisUtil;
+import org.apache.doris.load.routineload.kinesis.KinesisConfiguration;
+import org.apache.doris.load.routineload.kinesis.KinesisDataSourceProperties;
+import org.apache.doris.nereids.load.NereidsImportColumnDesc;
+import org.apache.doris.nereids.load.NereidsLoadTaskInfo;
+import org.apache.doris.nereids.load.NereidsLoadUtils;
+import org.apache.doris.nereids.load.NereidsRoutineLoadTaskInfo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.plans.commands.AlterRoutineLoadCommand;
+import
org.apache.doris.nereids.trees.plans.commands.info.CreateRoutineLoadInfo;
+import org.apache.doris.persist.AlterRoutineLoadJobOperationLog;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.thrift.TFileCompressType;
+import org.apache.doris.thrift.TPartialUpdateNewRowPolicy;
+import org.apache.doris.transaction.TransactionState;
+import org.apache.doris.transaction.TransactionStatus;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.annotations.SerializedName;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.BooleanUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * KinesisRoutineLoadJob is a RoutineLoadJob that fetches data from AWS
Kinesis streams.
+ *
+ * Key concepts:
+ * - Stream: Named collection of data records (similar to Kafka topic)
+ * - Shard: Sequence of data records in a stream (similar to Kafka partition)
+ * - Sequence Number: Unique identifier for each record within a shard
(similar to Kafka offset)
+ * - Consumer: Application that reads from a stream
+ *
+ * The progress tracks sequence numbers for each shard, represented as:
+ * {"shardId-000000000000":
"49590338271490256608559692538361571095921575989136588802", ...}
+ */
+public class KinesisRoutineLoadJob extends RoutineLoadJob {
+ private static final Logger LOG =
LogManager.getLogger(KinesisRoutineLoadJob.class);
+
+ public static final String KINESIS_FILE_CATALOG = "kinesis";
+
+ @SerializedName("rg")
+ private String region;
+ @SerializedName("stm")
+ private String stream;
+ @SerializedName("ep")
+ private String endpoint;
+
+ // optional, user want to load shards(Kafka's cskp).
+ @SerializedName("csks")
+ private List<String> customKinesisShards = Lists.newArrayList();
+
+ // current shards being consumed.
+ // updated periodically because shards may split or merge.
+ private List<String> currentKinesisShards = Lists.newArrayList();
+
+ // Default starting position for new shards.
+ // Values: TRIM_HORIZON, LATEST, or a timestamp string.
+ private String kinesisDefaultPosition = "";
+
+ // custom Kinesis properties including AWS credentials and client settings.
+ @SerializedName("prop")
+ private Map<String, String> customProperties = Maps.newHashMap();
+ private Map<String, String> convertedCustomProperties = Maps.newHashMap();
+
+ // The latest offset of each partition fetched from kinesis server.
+ // Will be updated periodically by calling hasMoreDataToConsume()
+ private Map<String, Long> cachedShardWithMillsBehindLatest =
Maps.newConcurrentMap();
+
+ // newly discovered shards from Kinesis.
+ private List<String> newCurrentKinesisShards = Lists.newArrayList();
+
+ public KinesisRoutineLoadJob() {
+ // For serialization
+ super(-1, LoadDataSourceType.KINESIS);
+ }
+
+ public KinesisRoutineLoadJob(Long id, String name, long dbId, long tableId,
+ String region, String stream, UserIdentity
userIdentity) {
+ super(id, name, dbId, tableId, LoadDataSourceType.KINESIS,
userIdentity);
+ this.region = region;
+ this.stream = stream;
+ this.progress = new KinesisProgress();
+ }
+
+ public KinesisRoutineLoadJob(Long id, String name, long dbId,
+ String region, String stream,
+ UserIdentity userIdentity, boolean
isMultiTable) {
+ super(id, name, dbId, LoadDataSourceType.KINESIS, userIdentity);
+ this.region = region;
+ this.stream = stream;
+ this.progress = new KinesisProgress();
+ setMultiTable(isMultiTable);
+ }
+
+ public String getRegion() {
+ return region;
+ }
+
+ public String getStream() {
+ return stream;
+ }
+
+ public String getEndpoint() {
+ return endpoint;
+ }
+
+ public Map<String, String> getConvertedCustomProperties() {
+ return convertedCustomProperties;
+ }
+
+ @Override
+ public void prepare() throws UserException {
+ // should reset converted properties each time the job being prepared.
+ // because the file info can be changed anytime.
+ convertCustomProperties(true);
+ }
+
+ private void convertCustomProperties(boolean rebuild) throws DdlException {
+ if (customProperties.isEmpty()) {
+ return;
+ }
+
+ if (!rebuild && !convertedCustomProperties.isEmpty()) {
+ return;
+ }
+
+ if (rebuild) {
+ convertedCustomProperties.clear();
+ }
+
+ for (Map.Entry<String, String> entry : customProperties.entrySet()) {
+ convertedCustomProperties.put(entry.getKey(), entry.getValue());
+ }
+
+ // Handle default position
+ if
(convertedCustomProperties.containsKey(KinesisConfiguration.KINESIS_DEFAULT_POSITION.getName()))
{
+ kinesisDefaultPosition = convertedCustomProperties.remove(
+ KinesisConfiguration.KINESIS_DEFAULT_POSITION.getName());
+ }
+ }
+
+ private String convertedDefaultPosition() {
+ if (this.kinesisDefaultPosition.isEmpty()) {
+ return KinesisProgress.POSITION_LATEST;
+ }
+ return this.kinesisDefaultPosition;
+ }
+
+ @Override
+ public void divideRoutineLoadJob(int currentConcurrentTaskNum) throws
UserException {
+ List<RoutineLoadTaskInfo> result = new ArrayList<>();
+ writeLock();
+ try {
+ if (state == JobState.NEED_SCHEDULE) {
+ // Divide shards into tasks
+ for (int i = 0; i < currentConcurrentTaskNum; i++) {
+ Map<String, String> taskKinesisProgress =
Maps.newHashMap();
+ for (int j = i; j < currentKinesisShards.size(); j = j +
currentConcurrentTaskNum) {
+ String shardId = currentKinesisShards.get(j);
+ taskKinesisProgress.put(shardId,
+ ((KinesisProgress)
progress).getSequenceNumberByShard(shardId));
+ }
+ KinesisTaskInfo kinesisTaskInfo = new
KinesisTaskInfo(UUID.randomUUID(), id,
+ getTimeout() * 1000, taskKinesisProgress,
isMultiTable(), -1, false);
+ routineLoadTaskInfoList.add(kinesisTaskInfo);
+ result.add(kinesisTaskInfo);
+ }
+ // Change job state to running
+ if (!result.isEmpty()) {
+ unprotectUpdateState(JobState.RUNNING, null, false);
+ }
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ignore to divide routine load job while job
state {}", state);
+ }
+ }
+ // Save task into queue of needScheduleTasks
+
Env.getCurrentEnv().getRoutineLoadTaskScheduler().addTasksInQueue(result);
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ @Override
+ public int calculateCurrentConcurrentTaskNum() {
+ int shardNum = currentKinesisShards.size();
+ if (desireTaskConcurrentNum == 0) {
+ desireTaskConcurrentNum =
Config.max_routine_load_task_concurrent_num;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("current concurrent task number is min"
+ + "(shard num: {}, desire task concurrent num: {}, config:
{})",
+ shardNum, desireTaskConcurrentNum,
Config.max_routine_load_task_concurrent_num);
+ }
+ currentTaskConcurrentNum = Math.min(shardNum,
Math.min(desireTaskConcurrentNum,
+ Config.max_routine_load_task_concurrent_num));
+ return currentTaskConcurrentNum;
+ }
+
+ @Override
+ protected boolean checkCommitInfo(RLTaskTxnCommitAttachment
rlTaskTxnCommitAttachment,
+ TransactionState txnState,
+ TransactionState.TxnStatusChangeReason
txnStatusChangeReason) {
+ if (txnState.getTransactionStatus() == TransactionStatus.COMMITTED
+ || txnState.getTransactionStatus() ==
TransactionStatus.VISIBLE) {
+ return true;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("no need to update the progress of kinesis routine load.
txn status: {}, "
+ + "txnStatusChangeReason: {}, task: {}, job: {}",
+ txnState.getTransactionStatus(), txnStatusChangeReason,
+ DebugUtil.printId(rlTaskTxnCommitAttachment.getTaskId()),
id);
+ }
+ return false;
+ }
+
+ private void updateProgressAndOffsetsCache(RLTaskTxnCommitAttachment
attachment) {
+ KinesisProgress taskProgress = (KinesisProgress)
attachment.getProgress();
+
+ // Update cachedShardWithMillsBehindLatest from the MillisBehindLatest
values
+ // reported by BE's GetRecords response. Keep the maximum value across
concurrent tasks
+ // to avoid a stale (lower) value from one task overwriting a fresher
value from another.
+ taskProgress.getShardIdToMillsBehindLatest().forEach((shardId, millis)
->
+ cachedShardWithMillsBehindLatest.merge(shardId, millis,
Math::max));
+
+ this.progress.update(attachment);
+ }
+
+ @Override
+ protected void updateProgress(RLTaskTxnCommitAttachment attachment) throws
UserException {
+ updateProgressAndOffsetsCache(attachment);
+ super.updateProgress(attachment);
+ }
+
+ @Override
+ protected void replayUpdateProgress(RLTaskTxnCommitAttachment attachment) {
+ super.replayUpdateProgress(attachment);
+ updateProgressAndOffsetsCache(attachment);
+ }
+
+ @Override
+ protected RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo
routineLoadTaskInfo, boolean delaySchedule) {
+ KinesisTaskInfo oldKinesisTaskInfo = (KinesisTaskInfo)
routineLoadTaskInfo;
+ // Add new task
+ KinesisTaskInfo kinesisTaskInfo = new
KinesisTaskInfo(oldKinesisTaskInfo,
+ ((KinesisProgress)
progress).getShardIdToSequenceNumber(oldKinesisTaskInfo.getShards()),
+ isMultiTable());
+ kinesisTaskInfo.setDelaySchedule(delaySchedule);
+ // Remove old task
+ routineLoadTaskInfoList.remove(routineLoadTaskInfo);
+ // Add new task
+ routineLoadTaskInfoList.add(kinesisTaskInfo);
+ return kinesisTaskInfo;
+ }
+
+ @Override
+ protected void unprotectUpdateProgress() throws UserException {
+ updateNewShardProgress();
+ }
+
+ @Override
+ protected boolean refreshKafkaPartitions(boolean needAutoResume) throws
UserException {
+ // For Kinesis, we refresh shards instead of Kafka partitions
+ if (this.state == JobState.RUNNING || this.state ==
JobState.NEED_SCHEDULE || needAutoResume) {
+ if (customKinesisShards != null && !customKinesisShards.isEmpty())
{
+ return true;
+ }
+ return updateKinesisShards();
+ }
+ return true;
+ }
+
+ private boolean updateKinesisShards() throws UserException {
+ try {
+ this.newCurrentKinesisShards = getAllKinesisShards();
+ } catch (Exception e) {
+ String msg = e.getMessage()
+ + " may be Kinesis properties set in job is error"
+ + " or no shard in this stream that should check Kinesis";
+ LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
+ .add("error_msg", msg)
+ .build(), e);
+ if (this.state == JobState.NEED_SCHEDULE) {
+ unprotectUpdateState(JobState.PAUSED,
+ new ErrorReason(InternalErrorCode.PARTITIONS_ERR, msg),
+ false /* not replay */);
+ }
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ protected boolean unprotectNeedReschedule() throws UserException {
+ if (this.state == JobState.RUNNING || this.state ==
JobState.NEED_SCHEDULE) {
+ return isKinesisShardsChanged();
+ }
+ return false;
+ }
+
+ private boolean isKinesisShardsChanged() throws UserException {
+ if (CollectionUtils.isNotEmpty(customKinesisShards)) {
+ currentKinesisShards = customKinesisShards;
+ return false;
+ }
+
+ Preconditions.checkNotNull(this.newCurrentKinesisShards);
+ if (new
HashSet<>(currentKinesisShards).containsAll(this.newCurrentKinesisShards)) {
+ if (currentKinesisShards.size() >
this.newCurrentKinesisShards.size()) {
+ currentKinesisShards = this.newCurrentKinesisShards;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
+ .add("current_kinesis_shards",
Joiner.on(",").join(currentKinesisShards))
+ .add("msg", "current kinesis shards has been
changed")
+ .build());
+ }
+ return true;
+ } else {
+ // Check if progress is consistent
+ for (String shardId : currentKinesisShards) {
+ if (!((KinesisProgress) progress).containsShard(shardId)) {
+ return true;
+ }
+ }
+ return false;
+ }
+ } else {
+ currentKinesisShards = this.newCurrentKinesisShards;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
+ .add("current_kinesis_shards",
Joiner.on(",").join(currentKinesisShards))
+ .add("msg", "current kinesis shards has been changed")
+ .build());
+ }
+ return true;
+ }
+ }
+
+ @Override
+ protected boolean needAutoResume() {
+ writeLock();
+ try {
+ if (this.state == JobState.PAUSED) {
+ return ScheduleRule.isNeedAutoSchedule(this);
+ }
+ return false;
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ @Override
+ public String getStatistic() {
+ Map<String, Object> summary = this.jobStatistic.summary();
+ Gson gson = new GsonBuilder().disableHtmlEscaping().create();
+ return gson.toJson(summary);
+ }
+
+ /**
+ * Get all shard IDs from the Kinesis stream.
+ * Delegates to a BE node via gRPC, which calls AWS ListShards API using
the SDK.
+ */
+ private List<String> getAllKinesisShards() throws UserException {
+ convertCustomProperties(false);
+ if (!customKinesisShards.isEmpty()) {
+ return customKinesisShards;
+ }
+ return KinesisUtil.getAllKinesisShards(region, stream, endpoint,
convertedCustomProperties);
+ }
+
+ /**
+ * Create a KinesisRoutineLoadJob from CreateRoutineLoadInfo.
+ */
+ public static KinesisRoutineLoadJob fromCreateInfo(CreateRoutineLoadInfo
info, ConnectContext ctx)
+ throws UserException {
+ Database db =
Env.getCurrentInternalCatalog().getDbOrDdlException(info.getDBName());
+
+ long id = Env.getCurrentEnv().getNextId();
+ KinesisDataSourceProperties kinesisProperties =
+ (KinesisDataSourceProperties) info.getDataSourceProperties();
+ KinesisRoutineLoadJob kinesisRoutineLoadJob;
+
+ if (kinesisProperties.isMultiTable()) {
+ kinesisRoutineLoadJob = new KinesisRoutineLoadJob(id,
info.getName(),
+ db.getId(),
+ kinesisProperties.getRegion(),
kinesisProperties.getStream(),
+ ctx.getCurrentUserIdentity(), true);
+ } else {
+ OlapTable olapTable =
db.getOlapTableOrDdlException(info.getTableName());
+ checkMeta(olapTable, info.getRoutineLoadDesc());
+ // Check load_to_single_tablet compatibility
+ if (info.isLoadToSingleTablet()
+ && !(olapTable.getDefaultDistributionInfo() instanceof
RandomDistributionInfo)) {
+ throw new DdlException(
+ "if load_to_single_tablet set to true, the olap table
must be with random distribution");
+ }
+ long tableId = olapTable.getId();
+ kinesisRoutineLoadJob = new KinesisRoutineLoadJob(id,
info.getName(),
+ db.getId(), tableId,
+ kinesisProperties.getRegion(),
kinesisProperties.getStream(),
+ ctx.getCurrentUserIdentity());
+ }
+
+ kinesisRoutineLoadJob.setOptional(info);
+ kinesisRoutineLoadJob.checkCustomProperties();
+
+ return kinesisRoutineLoadJob;
+ }
+
+ private void checkCustomProperties() throws DdlException {
+ // Validate custom properties if needed
+ }
+
+ private void updateNewShardProgress() throws UserException {
+ try {
+ for (String shardId : currentKinesisShards) {
+ if (!((KinesisProgress) progress).containsShard(shardId)) {
+ List<String> newShards = Lists.newArrayList();
+ newShards.add(shardId);
+ List<Pair<String, String>> newShardPositions =
getNewShardPositionsFromDefault(newShards);
+ Preconditions.checkState(newShardPositions.size() == 1);
+ for (Pair<String, String> shardPosition :
newShardPositions) {
+ ((KinesisProgress)
progress).addShardPosition(shardPosition);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB,
id)
+ .add("kinesis_shard_id",
shardPosition.first)
+ .add("begin_position",
shardPosition.second)
+ .add("msg", "The new shard has been added
in job"));
+ }
+ }
+ }
+ }
+ } catch (UserException e) {
+ unprotectUpdateState(JobState.PAUSED,
+ new ErrorReason(InternalErrorCode.PARTITIONS_ERR,
e.getMessage()), false);
+ throw e;
+ }
+ }
+
+ private List<Pair<String, String>>
getNewShardPositionsFromDefault(List<String> newShards)
+ throws UserException {
+ List<Pair<String, String>> shardPositions = Lists.newArrayList();
+ String defaultPosition = convertedDefaultPosition();
+ for (String shardId : newShards) {
+ shardPositions.add(Pair.of(shardId, defaultPosition));
+ }
+ return shardPositions;
+ }
+
+ protected void setOptional(CreateRoutineLoadInfo info) throws
UserException {
+ super.setOptional(info);
+ KinesisDataSourceProperties kinesisDataSourceProperties =
+ (KinesisDataSourceProperties) info.getDataSourceProperties();
+
+ // Set endpoint if provided
+ if (kinesisDataSourceProperties.getEndpoint() != null) {
+ this.endpoint = kinesisDataSourceProperties.getEndpoint();
+ }
+
+ // Set custom shards and positions
+ if
(CollectionUtils.isNotEmpty(kinesisDataSourceProperties.getKinesisShardPositions()))
{
+ setCustomKinesisShards(kinesisDataSourceProperties);
+ }
+
+ // Set custom properties
+ if
(MapUtils.isNotEmpty(kinesisDataSourceProperties.getCustomKinesisProperties()))
{
+
setCustomKinesisProperties(kinesisDataSourceProperties.getCustomKinesisProperties());
+ }
+ }
+
+ private void setCustomKinesisShards(KinesisDataSourceProperties
kinesisDataSourceProperties) throws LoadException {
+ List<Pair<String, String>> shardPositions =
kinesisDataSourceProperties.getKinesisShardPositions();
+ for (Pair<String, String> shardPosition : shardPositions) {
+ this.customKinesisShards.add(shardPosition.first);
+ ((KinesisProgress) progress).addShardPosition(shardPosition);
+ }
+ }
+
+ private void setCustomKinesisProperties(Map<String, String>
kinesisProperties) {
+ this.customProperties = kinesisProperties;
+ }
+
+ @Override
+ public String dataSourcePropertiesJsonToString() {
+ Map<String, String> dataSourceProperties = Maps.newHashMap();
+ dataSourceProperties.put("region", region);
+ dataSourceProperties.put("stream", stream);
+ if (endpoint != null) {
+ dataSourceProperties.put("endpoint", endpoint);
+ }
+ List<String> sortedShards = Lists.newArrayList(currentKinesisShards);
+ Collections.sort(sortedShards);
+ dataSourceProperties.put("currentKinesisShards",
Joiner.on(",").join(sortedShards));
+ Gson gson = new GsonBuilder().disableHtmlEscaping().create();
+ return gson.toJson(dataSourceProperties);
+ }
+
+ @Override
+ public String customPropertiesJsonToString() {
+ Gson gson = new GsonBuilder().disableHtmlEscaping().create();
+ // Mask sensitive information
+ Map<String, String> maskedProperties = new HashMap<>(customProperties);
+ if
(maskedProperties.containsKey(KinesisConfiguration.KINESIS_SECRET_KEY.getName()))
{
+
maskedProperties.put(KinesisConfiguration.KINESIS_SECRET_KEY.getName(),
"******");
+ }
+ if
(maskedProperties.containsKey(KinesisConfiguration.KINESIS_SESSION_TOKEN.getName()))
{
+
maskedProperties.put(KinesisConfiguration.KINESIS_SESSION_TOKEN.getName(),
"******");
+ }
+ return gson.toJson(maskedProperties);
+ }
+
+ @Override
+ public Map<String, String> getDataSourceProperties() {
+ Map<String, String> dataSourceProperties = Maps.newHashMap();
+ dataSourceProperties.put("kinesis_region", region);
+ dataSourceProperties.put("kinesis_stream", stream);
+ if (endpoint != null) {
+ dataSourceProperties.put("kinesis_endpoint", endpoint);
+ }
+ return dataSourceProperties;
+ }
+
+ @Override
+ public Map<String, String> getCustomProperties() {
+ Map<String, String> ret = new HashMap<>();
+ customProperties.forEach((k, v) -> {
+ // Mask sensitive values
+ if (k.equals(KinesisConfiguration.KINESIS_SECRET_KEY.getName())
+ ||
k.equals(KinesisConfiguration.KINESIS_SESSION_TOKEN.getName())) {
+ ret.put("property." + k, "******");
+ } else {
+ ret.put("property." + k, v);
+ }
+ });
+ return ret;
+ }
+
+ @Override
+ public void modifyProperties(AlterRoutineLoadCommand command) throws
UserException {
+ Map<String, String> jobProperties = command.getAnalyzedJobProperties();
+ KinesisDataSourceProperties dataSourceProperties =
+ (KinesisDataSourceProperties)
command.getDataSourceProperties();
+
+ writeLock();
+ try {
+ if (getState() != JobState.PAUSED) {
+ throw new DdlException("Only supports modification of PAUSED
jobs");
+ }
+
+ modifyPropertiesInternal(jobProperties, dataSourceProperties);
+
+ AlterRoutineLoadJobOperationLog log = new
AlterRoutineLoadJobOperationLog(this.id,
+ jobProperties, dataSourceProperties);
+ Env.getCurrentEnv().getEditLog().logAlterRoutineLoadJob(log);
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ private void modifyPropertiesInternal(Map<String, String> jobProperties,
+ KinesisDataSourceProperties
dataSourceProperties)
+ throws UserException {
+ if (dataSourceProperties != null) {
+ List<Pair<String, String>> shardPositions = Lists.newArrayList();
+ Map<String, String> customKinesisProperties = Maps.newHashMap();
+
+ if
(MapUtils.isNotEmpty(dataSourceProperties.getOriginalDataSourceProperties())) {
+ shardPositions =
dataSourceProperties.getKinesisShardPositions();
+ customKinesisProperties =
dataSourceProperties.getCustomKinesisProperties();
+ }
+
+ // Update custom properties
+ if (!customKinesisProperties.isEmpty()) {
+ this.customProperties.putAll(customKinesisProperties);
+ convertCustomProperties(true);
+ }
+
+ // Check and modify shard positions
+ if (!shardPositions.isEmpty()) {
+ ((KinesisProgress) progress).checkShards(shardPositions);
+ ((KinesisProgress) progress).modifyPosition(shardPositions);
+ }
Review Comment:
**HIGH: Shard positions are applied to the old progress before the stream is
changed.**
Lines 632-633 call `checkShards` and `modifyPosition` on the current
`progress` object. Then lines 637-640 check if the stream is being changed and
if so, reset `this.progress = new KinesisProgress()`. This means any shard
position modifications from lines 632-633 are immediately lost.
The stream change check should happen **before** applying shard positions,
or positions should be applied to the new progress object.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]