This is an automated email from the ASF dual-hosted git repository.
hellostephen pushed a commit to branch tmp-3.0.7-rc01
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/tmp-3.0.7-rc01 by this push:
new 9a5b6558b98 [fix](tablet report)Replace tablet report with
ForkJoinPool (#57382) (#58714)
9a5b6558b98 is described below
commit 9a5b6558b9880c43ce7561c6685ee257f609b079
Author: deardeng <[email protected]>
AuthorDate: Thu Dec 4 17:26:45 2025 +0800
[fix](tablet report)Replace tablet report with ForkJoinPool (#57382)
(#58714)
Currently, tablet report logic uses a ForkJoinPool to process tablet
information, but often encounters unexplained hangs in the ForkJoinPool.
The printed stack trace doesn't reveal where the hang occurs, making it
difficult to troubleshoot the issue.
use forkjoin pool, report stuck stack such as
```
"report-thread" #187 daemon prio=5 os_prio=0 cpu=97864.95ms
elapsed=2469428.50s tid=0x00007ff1cb5c8530 nid=0xef2 waiting on condition
[0x00007fef462e1000]
java.lang.Thread.State: WAITING (parking)
at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
- parking to wait for <0x00000006dc3fae00> (a
java.util.concurrent.ForkJoinTask$AdaptedRunnableAction)
at
java.util.concurrent.locks.LockSupport.park([email protected]/LockSupport.java:341)
at
java.util.concurrent.ForkJoinTask.awaitDone([email protected]/ForkJoinTask.java:468)
at
java.util.concurrent.ForkJoinTask.join([email protected]/ForkJoinTask.java:670)
at
org.apache.doris.catalog.TabletInvertedIndex.tabletReport(TabletInvertedIndex.java:370)
at
org.apache.doris.master.ReportHandler.tabletReport(ReportHandler.java:509)
at
org.apache.doris.master.ReportHandler$ReportTask.exec(ReportHandler.java:339)
at
org.apache.doris.master.ReportHandler.runOneCycle(ReportHandler.java:1466)
at org.apache.doris.common.util.Daemon.run(Daemon.java:119)
Locked ownable synchronizers:
- None
```
Can't find where the problem is in this stack
When the tablet report is stuck, the TabletInvertedIndex holds a read
lock, leading to a deadlock.
This pr uses a normal thread pool to replace forkjoinpool
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
.../main/java/org/apache/doris/common/Config.java | 8 +
.../apache/doris/catalog/TabletInvertedIndex.java | 518 +++++++++++++++------
2 files changed, 374 insertions(+), 152 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 94728fe3f9d..8306cfd471b 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -530,6 +530,14 @@ public class Config extends ConfigBase {
"the upper limit of failure logs of PUBLISH_VERSION task"})
public static long publish_version_task_failed_log_threshold = 80;
+ @ConfField(masterOnly = true, description = {"Tablet report 线程池的数目",
+ "Num of thread to handle tablet report task"})
+ public static int tablet_report_thread_pool_num = 10;
+
+ @ConfField(masterOnly = true, description = {"Tablet report 线程池的队列大小",
+ "Queue size to store tablet report task in publish thread pool"})
+ public static int tablet_report_queue_size = 1024;
+
@ConfField(mutable = true, masterOnly = true, description =
{"提交事务的最大超时时间,单位是秒。"
+ "该参数仅用于事务型 insert 操作中。",
"Maximal waiting time for all data inserted before one transaction
to be committed, in seconds. "
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
index a1faada3fea..b774f4615d0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
@@ -56,7 +56,11 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.StampedLock;
import java.util.stream.Collectors;
@@ -107,7 +111,14 @@ public class TabletInvertedIndex {
// Notice only none-cloud use it for be reporting tablets. This map is
empty in cloud mode.
private volatile ImmutableMap<Long, PartitionCollectInfo>
partitionCollectInfoMap = ImmutableMap.of();
- private ForkJoinPool taskPool = new
ForkJoinPool(Runtime.getRuntime().availableProcessors());
+ private final ExecutorService taskPool = new ThreadPoolExecutor(
+ Config.tablet_report_thread_pool_num,
+ 2 * Config.tablet_report_thread_pool_num,
+ // tablet report task default 60s once
+ 120L,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(Config.tablet_report_queue_size),
+ new ThreadPoolExecutor.DiscardOldestPolicy());
public TabletInvertedIndex() {
}
@@ -146,178 +157,381 @@ public class TabletInvertedIndex {
long feTabletNum = 0;
long stamp = readLock();
long start = System.currentTimeMillis();
+
try {
if (LOG.isDebugEnabled()) {
LOG.debug("begin to do tablet diff with backend[{}]. num: {}",
backendId, backendTablets.size());
}
+
Map<Long, Replica> replicaMetaWithBackend =
backingReplicaMetaTable.row(backendId);
if (replicaMetaWithBackend != null) {
feTabletNum = replicaMetaWithBackend.size();
- taskPool.submit(() -> {
- // traverse replicas in meta with this backend
-
replicaMetaWithBackend.entrySet().parallelStream().forEach(entry -> {
- long tabletId = entry.getKey();
-
Preconditions.checkState(tabletMetaMap.containsKey(tabletId),
- "tablet " + tabletId + " not exists, backend "
+ backendId);
- TabletMeta tabletMeta = tabletMetaMap.get(tabletId);
-
- if (backendTablets.containsKey(tabletId)) {
- TTablet backendTablet =
backendTablets.get(tabletId);
- Replica replica = entry.getValue();
- tabletFoundInMeta.add(tabletId);
- TTabletInfo backendTabletInfo =
backendTablet.getTabletInfos().get(0);
- TTabletMetaInfo tabletMetaInfo = null;
- if (backendTabletInfo.getReplicaId() !=
replica.getId()
- && replica.getState() !=
ReplicaState.CLONE) {
- // Need to update replica id in BE
- tabletMetaInfo = new TTabletMetaInfo();
- tabletMetaInfo.setReplicaId(replica.getId());
- }
- PartitionCollectInfo partitionCollectInfo =
-
partitionCollectInfoMap.get(backendTabletInfo.getPartitionId());
- boolean isInMemory = partitionCollectInfo != null
&& partitionCollectInfo.isInMemory();
- if (isInMemory !=
backendTabletInfo.isIsInMemory()) {
- if (tabletMetaInfo == null) {
- tabletMetaInfo = new TTabletMetaInfo();
- tabletMetaInfo.setIsInMemory(isInMemory);
- }
- }
- if (Config.fix_tablet_partition_id_eq_0
- && tabletMeta.getPartitionId() > 0
- && backendTabletInfo.getPartitionId() ==
0) {
- LOG.warn("be report tablet partition id not eq
fe, in be {} but in fe {}",
- backendTabletInfo, tabletMeta);
- // Need to update partition id in BE
- tabletMetaInfo = new TTabletMetaInfo();
-
tabletMetaInfo.setPartitionId(tabletMeta.getPartitionId());
- }
- // 1. (intersection)
- if (needSync(replica, backendTabletInfo)) {
- // need sync
- synchronized (tabletSyncMap) {
- tabletSyncMap.put(tabletMeta.getDbId(),
tabletId);
- }
- }
+ processTabletReportAsync(backendId, backendTablets,
backendPartitionsVersion, storageMediumMap,
+ tabletSyncMap, tabletDeleteFromMeta,
tabletFoundInMeta, tabletMigrationMap,
+ partitionVersionSyncMap, transactionsToPublish,
transactionsToClear, tabletRecoveryMap,
+ tabletToUpdate, cooldownTablets,
replicaMetaWithBackend);
+ }
+ } finally {
+ readUnlock(stamp);
+ }
- // check and set path
- // path info of replica is only saved in Master FE
- if (backendTabletInfo.isSetPathHash()
- && replica.getPathHash() !=
backendTabletInfo.getPathHash()) {
-
replica.setPathHash(backendTabletInfo.getPathHash());
- }
+ // Process cooldown configs outside of read lock to avoid deadlock
+ cooldownTablets.forEach(p -> handleCooldownConf(p.first, p.second,
cooldownConfToPush, cooldownConfToUpdate));
- if (backendTabletInfo.isSetSchemaHash() &&
replica.getState() == ReplicaState.NORMAL
- && replica.getSchemaHash() !=
backendTabletInfo.getSchemaHash()) {
- // update the schema hash only when replica is
normal
-
replica.setSchemaHash(backendTabletInfo.getSchemaHash());
- }
+ logTabletReportSummary(backendId, feTabletNum, backendTablets,
backendPartitionsVersion,
+ tabletSyncMap, tabletDeleteFromMeta, tabletFoundInMeta,
tabletMigrationMap,
+ partitionVersionSyncMap, transactionsToPublish,
transactionsToClear,
+ tabletToUpdate, tabletRecoveryMap, start);
+ }
- if (needRecover(replica,
tabletMeta.getOldSchemaHash(), backendTabletInfo)) {
- LOG.warn("replica {} of tablet {} on backend
{} need recovery. "
- + "replica in FE: {}, report
version {}, report schema hash: {},"
- + " is bad: {}, is version
missing: {}",
- replica.getId(), tabletId, backendId,
replica,
- backendTabletInfo.getVersion(),
- backendTabletInfo.getSchemaHash(),
- backendTabletInfo.isSetUsed() ?
!backendTabletInfo.isUsed() : "false",
- backendTabletInfo.isSetVersionMiss() ?
backendTabletInfo.isVersionMiss() :
- "unset");
- synchronized (tabletRecoveryMap) {
-
tabletRecoveryMap.put(tabletMeta.getDbId(), tabletId);
- }
- }
+ /**
+ * Process tablet report asynchronously using thread pool
+ */
+ private void processTabletReportAsync(long backendId, Map<Long, TTablet>
backendTablets,
+ Map<Long, Long>
backendPartitionsVersion,
+ HashMap<Long, TStorageMedium>
storageMediumMap,
+ ListMultimap<Long, Long>
tabletSyncMap,
+ ListMultimap<Long, Long>
tabletDeleteFromMeta,
+ Set<Long> tabletFoundInMeta,
+ ListMultimap<TStorageMedium, Long>
tabletMigrationMap,
+ Map<Long, Long>
partitionVersionSyncMap,
+ Map<Long, SetMultimap<Long,
TPartitionVersionInfo>> transactionsToPublish,
+ SetMultimap<Long, Long>
transactionsToClear,
+ ListMultimap<Long, Long>
tabletRecoveryMap,
+ List<TTabletMetaInfo> tabletToUpdate,
+ List<Pair<TabletMeta, TTabletInfo>>
cooldownTablets,
+ Map<Long, Replica>
replicaMetaWithBackend) {
+ // Calculate optimal chunk size to balance task granularity and
concurrency
+ // For large tablet counts (40W-50W), we want smaller chunks to
maximize parallelism
+ // Target: create at least threadPoolSize * 4 tasks for better load
balancing
+ int totalTablets = replicaMetaWithBackend.size();
+ int threadPoolSize = Config.tablet_report_thread_pool_num;
+ int targetTasks = threadPoolSize * 4; // Create 4x tasks as threads
for better load balancing
+ int chunkSize = Math.max(500, totalTablets / targetTasks);
+
+ // Cap chunk size to avoid too large tasks
+ // so thread pool queue will not be fulled with few large tasks
+ int maxChunkSize = 10000;
+ chunkSize = Math.min(chunkSize, maxChunkSize);
+ List<Map.Entry<Long, Replica>> entries = new
ArrayList<>(replicaMetaWithBackend.entrySet());
+ List<CompletableFuture<Void>> tabletFutures = new ArrayList<>();
+ int estimatedTasks = (totalTablets + chunkSize - 1) / chunkSize;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing tablet report for backend[{}]: total
tablets={}, chunkSize={}, estimated tasks={}",
+ backendId, totalTablets, chunkSize, estimatedTasks);
+ }
- if (Config.enable_storage_policy &&
backendTabletInfo.isSetCooldownTerm()) {
- // Place tablet info in a container and
process it outside of read lock to avoid
- // deadlock with OlapTable lock
- synchronized (cooldownTablets) {
- cooldownTablets.add(Pair.of(tabletMeta,
backendTabletInfo));
- }
-
replica.setCooldownMetaId(backendTabletInfo.getCooldownMetaId());
-
replica.setCooldownTerm(backendTabletInfo.getCooldownTerm());
- }
+ for (int i = 0; i < entries.size(); i += chunkSize) {
+ final int start = i;
+ final int end = Math.min(i + chunkSize, entries.size());
+
+ CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
+ for (int j = start; j < end; j++) {
+ Map.Entry<Long, Replica> entry = entries.get(j);
+ processTabletEntry(backendId, backendTablets,
storageMediumMap, tabletSyncMap,
+ tabletDeleteFromMeta, tabletFoundInMeta,
tabletMigrationMap,
+ transactionsToPublish, transactionsToClear,
tabletRecoveryMap,
+ tabletToUpdate, cooldownTablets, entry);
+ }
+ }, taskPool);
- long partitionId = tabletMeta.getPartitionId();
- if (!Config.disable_storage_medium_check) {
- // check if need migration
- TStorageMedium storageMedium =
storageMediumMap.get(partitionId);
- if (storageMedium != null &&
backendTabletInfo.isSetStorageMedium()
- && isLocal(storageMedium) &&
isLocal(backendTabletInfo.getStorageMedium())
- &&
isLocal(tabletMeta.getStorageMedium())) {
- if (storageMedium !=
backendTabletInfo.getStorageMedium()) {
- synchronized (tabletMigrationMap) {
-
tabletMigrationMap.put(storageMedium, tabletId);
- }
- }
- if (storageMedium !=
tabletMeta.getStorageMedium()) {
-
tabletMeta.setStorageMedium(storageMedium);
- }
- }
- }
+ tabletFutures.add(future);
+ }
- // check if should clear transactions
- if (backendTabletInfo.isSetTransactionIds()) {
- handleBackendTransactions(backendId,
backendTabletInfo.getTransactionIds(), tabletId,
- tabletMeta, transactionsToPublish,
transactionsToClear);
- } // end for txn id
-
- // update replicase's version count
- // no need to write log, and no need to get db
lock.
- if (backendTabletInfo.isSetTotalVersionCount()) {
-
replica.setTotalVersionCount(backendTabletInfo.getTotalVersionCount());
-
replica.setVisibleVersionCount(backendTabletInfo.isSetVisibleVersionCount()
- ?
backendTabletInfo.getVisibleVersionCount()
- :
backendTabletInfo.getTotalVersionCount());
- }
- if (tabletMetaInfo != null) {
- tabletMetaInfo.setTabletId(tabletId);
- synchronized (tabletToUpdate) {
- tabletToUpdate.add(tabletMetaInfo);
- }
- }
- } else {
- // 2. (meta - be)
- // may need delete from meta
- if (LOG.isDebugEnabled()) {
- LOG.debug("backend[{}] does not report
tablet[{}-{}]", backendId, tabletId, tabletMeta);
- }
- synchronized (tabletDeleteFromMeta) {
- tabletDeleteFromMeta.put(tabletMeta.getDbId(),
tabletId);
- }
- }
- });
-
-
backendPartitionsVersion.entrySet().parallelStream().forEach(entry -> {
- long partitionId = entry.getKey();
- long backendVersion = entry.getValue();
- PartitionCollectInfo partitionInfo =
partitionCollectInfoMap.get(partitionId);
- if (partitionInfo != null &&
partitionInfo.getVisibleVersion() > backendVersion) {
- partitionVersionSyncMap.put(partitionId,
partitionInfo.getVisibleVersion());
- }
- });
- }).join();
+ // Process partition versions in parallel
+ CompletableFuture<Void> partitionFuture =
CompletableFuture.runAsync(() -> {
+ processPartitionVersions(backendPartitionsVersion,
partitionVersionSyncMap);
+ }, taskPool);
+
+ // Wait for all tasks to complete
+ CompletableFuture.allOf(tabletFutures.toArray(new
CompletableFuture[0])).join();
+ partitionFuture.join();
+ }
+
+ /**
+ * Process a single tablet entry from backend report
+ */
+ private void processTabletEntry(long backendId, Map<Long, TTablet>
backendTablets,
+ HashMap<Long, TStorageMedium>
storageMediumMap,
+ ListMultimap<Long, Long> tabletSyncMap,
+ ListMultimap<Long, Long>
tabletDeleteFromMeta,
+ Set<Long> tabletFoundInMeta,
+ ListMultimap<TStorageMedium, Long>
tabletMigrationMap,
+ Map<Long, SetMultimap<Long,
TPartitionVersionInfo>> transactionsToPublish,
+ SetMultimap<Long, Long>
transactionsToClear,
+ ListMultimap<Long, Long>
tabletRecoveryMap,
+ List<TTabletMetaInfo> tabletToUpdate,
+ List<Pair<TabletMeta, TTabletInfo>>
cooldownTablets,
+ Map.Entry<Long, Replica> entry) {
+ long tabletId = entry.getKey();
+ Replica replica = entry.getValue();
+
+ Preconditions.checkState(tabletMetaMap.containsKey(tabletId),
+ "tablet " + tabletId + " not exists, backend " + backendId);
+ TabletMeta tabletMeta = tabletMetaMap.get(tabletId);
+
+ if (backendTablets.containsKey(tabletId)) {
+ // Tablet exists in both FE and BE
+ TTablet backendTablet = backendTablets.get(tabletId);
+ TTabletInfo backendTabletInfo =
backendTablet.getTabletInfos().get(0);
+
+ tabletFoundInMeta.add(tabletId);
+
+ processExistingTablet(backendId, tabletId, replica, tabletMeta,
backendTabletInfo,
+ storageMediumMap, tabletSyncMap, tabletMigrationMap,
transactionsToPublish,
+ transactionsToClear, tabletRecoveryMap, tabletToUpdate,
cooldownTablets);
+ } else {
+ // Tablet exists in FE but not in BE - may need deletion
+ processDeletedTablet(backendId, tabletId, tabletMeta,
tabletDeleteFromMeta);
+ }
+ }
+
+ /**
+ * Process tablet that exists in both FE and BE
+ */
+ private void processExistingTablet(long backendId, long tabletId, Replica
replica,
+ TabletMeta tabletMeta, TTabletInfo
backendTabletInfo,
+ HashMap<Long, TStorageMedium>
storageMediumMap,
+ ListMultimap<Long, Long> tabletSyncMap,
+ ListMultimap<TStorageMedium, Long>
tabletMigrationMap,
+ Map<Long, SetMultimap<Long,
TPartitionVersionInfo>> transactionsToPublish,
+ SetMultimap<Long, Long>
transactionsToClear,
+ ListMultimap<Long, Long>
tabletRecoveryMap,
+ List<TTabletMetaInfo> tabletToUpdate,
+ List<Pair<TabletMeta, TTabletInfo>>
cooldownTablets) {
+ // Check and prepare tablet meta info update
+ TTabletMetaInfo tabletMetaInfo = prepareTabletMetaInfo(replica,
tabletMeta, backendTabletInfo);
+
+ // Check if version sync is needed
+ if (needSync(replica, backendTabletInfo)) {
+ synchronized (tabletSyncMap) {
+ tabletSyncMap.put(tabletMeta.getDbId(), tabletId);
}
- } finally {
- readUnlock(stamp);
}
- cooldownTablets.forEach(p -> handleCooldownConf(p.first, p.second,
cooldownConfToPush, cooldownConfToUpdate));
- long end = System.currentTimeMillis();
+ // Update replica path and schema hash
+ updateReplicaBasicInfo(replica, backendTabletInfo);
+
+ // Check if replica needs recovery
+ if (needRecover(replica, tabletMeta.getOldSchemaHash(),
backendTabletInfo)) {
+ logReplicaRecovery(replica, tabletId, backendId,
backendTabletInfo);
+ synchronized (tabletRecoveryMap) {
+ tabletRecoveryMap.put(tabletMeta.getDbId(), tabletId);
+ }
+ }
+
+ // Handle cooldown policy
+ if (Config.enable_storage_policy &&
backendTabletInfo.isSetCooldownTerm()) {
+ synchronized (cooldownTablets) {
+ cooldownTablets.add(Pair.of(tabletMeta, backendTabletInfo));
+ }
+ replica.setCooldownMetaId(backendTabletInfo.getCooldownMetaId());
+ replica.setCooldownTerm(backendTabletInfo.getCooldownTerm());
+ }
+
+ // Check storage medium migration
+ checkStorageMediumMigration(tabletId, tabletMeta, backendTabletInfo,
+ storageMediumMap, tabletMigrationMap);
+
+ // Handle transactions
+ if (backendTabletInfo.isSetTransactionIds()) {
+ handleBackendTransactions(backendId,
backendTabletInfo.getTransactionIds(), tabletId,
+ tabletMeta, transactionsToPublish, transactionsToClear);
+ }
+
+ // Update replica version count
+ updateReplicaVersionCount(replica, backendTabletInfo);
+
+ // Add tablet meta info to update list if needed
+ if (tabletMetaInfo != null) {
+ tabletMetaInfo.setTabletId(tabletId);
+ synchronized (tabletToUpdate) {
+ tabletToUpdate.add(tabletMetaInfo);
+ }
+ }
+ }
+
+ /**
+ * Prepare tablet meta info for BE update if needed
+ */
+ private TTabletMetaInfo prepareTabletMetaInfo(Replica replica, TabletMeta
tabletMeta,
+ TTabletInfo
backendTabletInfo) {
+ TTabletMetaInfo tabletMetaInfo = null;
+
+ // Check replica id mismatch
+ if (backendTabletInfo.getReplicaId() != replica.getId()
+ && replica.getState() != ReplicaState.CLONE) {
+ tabletMetaInfo = new TTabletMetaInfo();
+ tabletMetaInfo.setReplicaId(replica.getId());
+ }
+
+ // Check in-memory flag
+ PartitionCollectInfo partitionCollectInfo =
+
partitionCollectInfoMap.get(backendTabletInfo.getPartitionId());
+ boolean isInMemory = partitionCollectInfo != null &&
partitionCollectInfo.isInMemory();
+ if (isInMemory != backendTabletInfo.isIsInMemory()) {
+ if (tabletMetaInfo == null) {
+ tabletMetaInfo = new TTabletMetaInfo();
+ }
+ tabletMetaInfo.setIsInMemory(isInMemory);
+ }
+
+ // Check partition id mismatch
+ if (Config.fix_tablet_partition_id_eq_0
+ && tabletMeta.getPartitionId() > 0
+ && backendTabletInfo.getPartitionId() == 0) {
+ LOG.warn("be report tablet partition id not eq fe, in be {} but in
fe {}",
+ backendTabletInfo, tabletMeta);
+ if (tabletMetaInfo == null) {
+ tabletMetaInfo = new TTabletMetaInfo();
+ }
+ tabletMetaInfo.setPartitionId(tabletMeta.getPartitionId());
+ }
+
+ return tabletMetaInfo;
+ }
+
+ /**
+ * Update replica's basic info like path hash and schema hash
+ */
+ private void updateReplicaBasicInfo(Replica replica, TTabletInfo
backendTabletInfo) {
+ // Update path hash
+ if (backendTabletInfo.isSetPathHash()
+ && replica.getPathHash() != backendTabletInfo.getPathHash()) {
+ replica.setPathHash(backendTabletInfo.getPathHash());
+ }
+
+ // Update schema hash
+ if (backendTabletInfo.isSetSchemaHash()
+ && replica.getState() == ReplicaState.NORMAL
+ && replica.getSchemaHash() !=
backendTabletInfo.getSchemaHash()) {
+ replica.setSchemaHash(backendTabletInfo.getSchemaHash());
+ }
+ }
+
+ /**
+ * Log replica recovery information
+ */
+ private void logReplicaRecovery(Replica replica, long tabletId, long
backendId,
+ TTabletInfo backendTabletInfo) {
+ LOG.warn("replica {} of tablet {} on backend {} need recovery. "
+ + "replica in FE: {}, report version {}, report schema
hash: {}, "
+ + "is bad: {}, is version missing: {}",
+ replica.getId(), tabletId, backendId, replica,
+ backendTabletInfo.getVersion(),
+ backendTabletInfo.getSchemaHash(),
+ backendTabletInfo.isSetUsed() ? !backendTabletInfo.isUsed() :
"false",
+ backendTabletInfo.isSetVersionMiss() ?
backendTabletInfo.isVersionMiss() : "unset");
+ }
+
+ /**
+ * Check if storage medium migration is needed
+ */
+ private void checkStorageMediumMigration(long tabletId, TabletMeta
tabletMeta,
+ TTabletInfo backendTabletInfo,
+ HashMap<Long, TStorageMedium>
storageMediumMap,
+ ListMultimap<TStorageMedium,
Long> tabletMigrationMap) {
+ if (Config.disable_storage_medium_check) {
+ return;
+ }
+
+ long partitionId = tabletMeta.getPartitionId();
+ TStorageMedium storageMedium = storageMediumMap.get(partitionId);
+
+ if (storageMedium != null && backendTabletInfo.isSetStorageMedium()
+ && isLocal(storageMedium)
+ && isLocal(backendTabletInfo.getStorageMedium())
+ && isLocal(tabletMeta.getStorageMedium())) {
+
+ if (storageMedium != backendTabletInfo.getStorageMedium()) {
+ synchronized (tabletMigrationMap) {
+ tabletMigrationMap.put(storageMedium, tabletId);
+ }
+ }
+
+ if (storageMedium != tabletMeta.getStorageMedium()) {
+ tabletMeta.setStorageMedium(storageMedium);
+ }
+ }
+ }
+
+ /**
+ * Update replica's version count
+ */
+ private void updateReplicaVersionCount(Replica replica, TTabletInfo
backendTabletInfo) {
+ if (backendTabletInfo.isSetTotalVersionCount()) {
+
replica.setTotalVersionCount(backendTabletInfo.getTotalVersionCount());
+
replica.setVisibleVersionCount(backendTabletInfo.isSetVisibleVersionCount()
+ ? backendTabletInfo.getVisibleVersionCount()
+ : backendTabletInfo.getTotalVersionCount());
+ }
+ }
+
+ /**
+ * Process tablet that exists in FE but not reported by BE
+ */
+ private void processDeletedTablet(long backendId, long tabletId,
TabletMeta tabletMeta,
+ ListMultimap<Long, Long>
tabletDeleteFromMeta) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("backend[{}] does not report tablet[{}-{}]", backendId,
tabletId, tabletMeta);
+ }
+ synchronized (tabletDeleteFromMeta) {
+ tabletDeleteFromMeta.put(tabletMeta.getDbId(), tabletId);
+ }
+ }
+
+ /**
+ * Process partition versions reported by BE
+ */
+ private void processPartitionVersions(Map<Long, Long>
backendPartitionsVersion,
+ Map<Long, Long>
partitionVersionSyncMap) {
+ for (Map.Entry<Long, Long> entry :
backendPartitionsVersion.entrySet()) {
+ long partitionId = entry.getKey();
+ long backendVersion = entry.getValue();
+ PartitionCollectInfo partitionInfo =
partitionCollectInfoMap.get(partitionId);
+
+ if (partitionInfo != null && partitionInfo.getVisibleVersion() >
backendVersion) {
+ partitionVersionSyncMap.put(partitionId,
partitionInfo.getVisibleVersion());
+ }
+ }
+ }
+
+ /**
+ * Log tablet report summary
+ */
+ private void logTabletReportSummary(long backendId, long feTabletNum,
+ Map<Long, TTablet> backendTablets,
+ Map<Long, Long>
backendPartitionsVersion,
+ ListMultimap<Long, Long>
tabletSyncMap,
+ ListMultimap<Long, Long>
tabletDeleteFromMeta,
+ Set<Long> tabletFoundInMeta,
+ ListMultimap<TStorageMedium, Long>
tabletMigrationMap,
+ Map<Long, Long>
partitionVersionSyncMap,
+ Map<Long, SetMultimap<Long,
TPartitionVersionInfo>> transactionsToPublish,
+ SetMultimap<Long, Long>
transactionsToClear,
+ List<TTabletMetaInfo> tabletToUpdate,
+ ListMultimap<Long, Long>
tabletRecoveryMap,
+ long startTime) {
+ long endTime = System.currentTimeMillis();
long toClearTransactionsNum = transactionsToClear.keySet().size();
long toClearTransactionsPartitions =
transactionsToClear.values().size();
long toPublishTransactionsNum = transactionsToPublish.values().stream()
- .mapToLong(m ->
m.keySet().size()).sum();
+ .mapToLong(m -> m.keySet().size()).sum();
long toPublishTransactionsPartitions =
transactionsToPublish.values().stream()
- .mapToLong(m ->
m.values().size()).sum();
- LOG.info("finished to do tablet diff with backend[{}]. fe tablet num:
{}, backend tablet num: {}. sync: {}."
- + " metaDel: {}. foundInMeta: {}. migration: {}.
backend partition num: {}, backend need "
- + "update: {}. found invalid transactions
{}(partitions: {}). found republish "
- + "transactions {}(partitions: {}). tabletToUpdate:
{}. need recovery: {}. cost: {} ms",
+ .mapToLong(m -> m.values().size()).sum();
+
+ LOG.info("finished to do tablet diff with backend[{}]. fe tablet num:
{}, backend tablet num: {}. "
+ + "sync: {}, metaDel: {}, foundInMeta: {}, migration:
{}, "
+ + "backend partition num: {}, backend need update: {},
"
+ + "found invalid transactions {}(partitions: {}), "
+ + "found republish transactions {}(partitions: {}), "
+ + "tabletToUpdate: {}, need recovery: {}, cost: {} ms",
backendId, feTabletNum, backendTablets.size(),
tabletSyncMap.size(),
tabletDeleteFromMeta.size(), tabletFoundInMeta.size(),
tabletMigrationMap.size(),
- backendPartitionsVersion.size(),
partitionVersionSyncMap.size(), toClearTransactionsNum,
- toClearTransactionsPartitions, toPublishTransactionsNum,
toPublishTransactionsPartitions,
- tabletToUpdate.size(), tabletRecoveryMap.size(), (end -
start));
+ backendPartitionsVersion.size(),
partitionVersionSyncMap.size(),
+ toClearTransactionsNum, toClearTransactionsPartitions,
+ toPublishTransactionsNum, toPublishTransactionsPartitions,
+ tabletToUpdate.size(), tabletRecoveryMap.size(), (endTime -
startTime));
}
private void handleBackendTransactions(long backendId, List<Long>
transactionIds, long tabletId,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]