This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d9e731782af broker: per-table routing build lock and request
gating\n\n- Add per-table locks for buildRouting to allow concurrent builds
across tables\n- Track per-table routing build start time; ignore requests
older than build start (gate removed per latest change)\n- Skip redundant
builds if an earlier concurrent build was already started\n- Clean up
locks/timestamps on removeRouting\n- Add unit test BuildRoutingDelayTest for
delay/skip logic (#16585)
d9e731782af is described below
commit d9e731782afc62149b487c27e80de4d809673853
Author: Xiang Fu <[email protected]>
AuthorDate: Thu Aug 14 12:24:38 2025 -0700
broker: per-table routing build lock and request gating\n\n- Add per-table
locks for buildRouting to allow concurrent builds across tables\n- Track
per-table routing build start time; ignore requests older than build start
(gate removed per latest change)\n- Skip redundant builds if an earlier
concurrent build was already started\n- Clean up locks/timestamps on
removeRouting\n- Add unit test BuildRoutingDelayTest for delay/skip logic
(#16585)
---
.../pinot/broker/routing/BrokerRoutingManager.java | 360 ++++++++++++---------
.../broker/routing/BuildRoutingDelayTest.java | 59 ++++
2 files changed, 270 insertions(+), 149 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
index 6cb983c0e5a..c347391c503 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
@@ -27,6 +27,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.AccessOption;
@@ -109,8 +111,8 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
private final BrokerMetrics _brokerMetrics;
private final Map<String, RoutingEntry> _routingEntryMap = new
ConcurrentHashMap<>();
private final Map<String, ServerInstance> _enabledServerInstanceMap = new
ConcurrentHashMap<>();
- // NOTE: _excludedServers doesn't need to be concurrent because it is only
accessed within the synchronized block
- private final Set<String> _excludedServers = new HashSet<>();
+ // Thread-safe set because it can be read/modified concurrently from
instance/server change paths
+ private final Set<String> _excludedServers = ConcurrentHashMap.newKeySet();
private final ServerRoutingStatsManager _serverRoutingStatsManager;
private final PinotConfiguration _pinotConfig;
private final boolean _enablePartitionMetadataManager;
@@ -122,6 +124,14 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
private ZkHelixPropertyStore<ZNRecord> _propertyStore;
private Set<String> _routableServers;
+ // Per-table locks to allow concurrent routing builds across different
tables while serializing per-table operations
+ private final Map<String, Object> _routingTableBuildLocks = new
ConcurrentHashMap<>();
+ // Per-table build start time in millis. Any request before this time should
be ignored.
+ private final Map<String, Long> _routingTableBuildStartTimeMs = new
ConcurrentHashMap<>();
+
+ private Object getTableLock(String tableNameWithType) {
+ return _routingTableBuildLocks.computeIfAbsent(tableNameWithType, k -> new
Object());
+ }
public BrokerRoutingManager(BrokerMetrics brokerMetrics,
ServerRoutingStatsManager serverRoutingStatsManager,
PinotConfiguration pinotConfig) {
@@ -144,7 +154,7 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
}
@Override
- public synchronized void processClusterChange(ChangeType changeType) {
+ public void processClusterChange(ChangeType changeType) {
if (changeType == ChangeType.IDEAL_STATE || changeType ==
ChangeType.EXTERNAL_VIEW) {
processSegmentAssignmentChange();
} else if (changeType == ChangeType.INSTANCE_CONFIG) {
@@ -177,8 +187,8 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
Stat[] externalViewStats = _zkDataAccessor.getStats(externalViewPaths,
AccessOption.PERSISTENT);
long fetchStatsEndTimeMs = System.currentTimeMillis();
- List<String> tablesToUpdate = new ArrayList<>();
- for (int i = 0; i < numTables; i++) {
+ List<String> tablesToUpdate = new CopyOnWriteArrayList<>();
+ IntStream.range(0, numTables).parallel().forEach(i -> {
Stat idealStateStat = idealStateStats[i];
Stat externalViewStat = externalViewStats[i];
if (idealStateStat != null && externalViewStat != null) {
@@ -192,15 +202,18 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
if (idealState == null) {
LOGGER.warn("Failed to find ideal state for table: {}, skipping
updating routing entry",
tableNameWithType);
- continue;
+ return;
}
ExternalView externalView =
getExternalView(routingEntry._externalViewPath);
if (externalView == null) {
LOGGER.warn("Failed to find external view for table: {},
skipping updating routing entry",
tableNameWithType);
- continue;
+ return;
+ }
+ Object tableLock = getTableLock(tableNameWithType);
+ synchronized (tableLock) {
+ routingEntry.onAssignmentChange(idealState, externalView);
}
- routingEntry.onAssignmentChange(idealState, externalView);
} catch (Exception e) {
LOGGER.error(
"Caught unexpected exception while updating routing entry on
segment assignment change for table: {}",
@@ -208,7 +221,7 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
}
}
}
- }
+ });
long updateRoutingEntriesEndTimeMs = System.currentTimeMillis();
LOGGER.info(
@@ -314,14 +327,18 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
}
// Update routing entry for all tables
- for (RoutingEntry routingEntry : _routingEntryMap.values()) {
+ Set<String> routableServersSnapshot = _routableServers;
+ _routingEntryMap.values().parallelStream().forEach(routingEntry -> {
try {
- routingEntry.onInstancesChange(_routableServers, changedServers);
+ Object tableLock = getTableLock(routingEntry.getTableNameWithType());
+ synchronized (tableLock) {
+ routingEntry.onInstancesChange(routableServersSnapshot,
changedServers);
+ }
} catch (Exception e) {
LOGGER.error("Caught unexpected exception while updating routing entry
on instances change for table: {}",
routingEntry.getTableNameWithType(), e);
}
- }
+ });
long updateRoutingEntriesEndTimeMs = System.currentTimeMillis();
// Remove new disabled servers from _enabledServerInstanceMap after
updating all routing entries to ensure it
@@ -361,7 +378,7 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
/**
* Excludes a server from the routing.
*/
- public synchronized void excludeServerFromRouting(String instanceId) {
+ public void excludeServerFromRouting(String instanceId) {
LOGGER.warn("Excluding server: {} from routing", instanceId);
if (!_excludedServers.add(instanceId)) {
LOGGER.info("Server: {} is already excluded from routing, skipping
updating the routing", instanceId);
@@ -378,14 +395,19 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
routableServers.remove(instanceId);
_routableServers = routableServers;
List<String> changedServers = Collections.singletonList(instanceId);
- for (RoutingEntry routingEntry : _routingEntryMap.values()) {
+ Set<String> routableServersSnapshot = _routableServers;
+ _routingEntryMap.values().parallelStream().forEach(routingEntry -> {
try {
- routingEntry.onInstancesChange(_routableServers, changedServers);
+ Object tableLock = getTableLock(routingEntry.getTableNameWithType());
+ synchronized (tableLock) {
+ routingEntry.onInstancesChange(routableServersSnapshot,
changedServers);
+ }
} catch (Exception e) {
- LOGGER.error("Caught unexpected exception while updating routing entry
when excluding server: {} for table: {}",
+ LOGGER.error(
+ "Caught unexpected exception while updating routing entry when
excluding server: {} for table: {}",
instanceId, routingEntry.getTableNameWithType(), e);
}
- }
+ });
LOGGER.info("Excluded server: {} from routing in {}ms (updated {} routing
entries)", instanceId,
System.currentTimeMillis() - startTimeMs, _routingEntryMap.size());
}
@@ -393,7 +415,7 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
/**
* Includes a previous excluded server to the routing.
*/
- public synchronized void includeServerToRouting(String instanceId) {
+ public void includeServerToRouting(String instanceId) {
LOGGER.info("Including server: {} to routing", instanceId);
if (!_excludedServers.remove(instanceId)) {
LOGGER.info("Server: {} is not previously excluded, skipping updating
the routing", instanceId);
@@ -410,14 +432,19 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
routableServers.add(instanceId);
_routableServers = routableServers;
List<String> changedServers = Collections.singletonList(instanceId);
- for (RoutingEntry routingEntry : _routingEntryMap.values()) {
+ Set<String> routableServersSnapshot = _routableServers;
+ _routingEntryMap.values().parallelStream().forEach(routingEntry -> {
try {
- routingEntry.onInstancesChange(_routableServers, changedServers);
+ Object tableLock = getTableLock(routingEntry.getTableNameWithType());
+ synchronized (tableLock) {
+ routingEntry.onInstancesChange(routableServersSnapshot,
changedServers);
+ }
} catch (Exception e) {
- LOGGER.error("Caught unexpected exception while updating routing entry
when including server: {} for table: {}",
+ LOGGER.error(
+ "Caught unexpected exception while updating routing entry when
including server: {} for table: {}",
instanceId, routingEntry.getTableNameWithType(), e);
}
- }
+ });
LOGGER.info("Included server: {} to routing in {}ms (updated {} routing
entries)", instanceId,
System.currentTimeMillis() - startTimeMs, _routingEntryMap.size());
}
@@ -477,7 +504,10 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
TimeBoundaryManager timeBoundaryManager = new
TimeBoundaryManager(tableConfig, _propertyStore, _brokerMetrics);
timeBoundaryManager.init(idealState, externalView,
preSelectedOnlineSegments);
-
_routingEntryMap.get(tableNameWithType).setTimeBoundaryManager(timeBoundaryManager);
+ Object tableLock = getTableLock(tableNameWithType);
+ synchronized (tableLock) {
+
_routingEntryMap.get(tableNameWithType).setTimeBoundaryManager(timeBoundaryManager);
+ }
} catch (Exception e) {
LOGGER.error("Caught unexpected exception while setting time boundary
manager for table: {}", tableNameWithType,
e);
@@ -489,132 +519,152 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
* Builds the routing for a table.
* @param tableNameWithType the name of the table
*/
- public synchronized void buildRouting(String tableNameWithType) {
- LOGGER.info("Building routing for table: {}", tableNameWithType);
-
- TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
- Preconditions.checkState(tableConfig != null, "Failed to find table config
for table: %s", tableNameWithType);
-
- String idealStatePath = getIdealStatePath(tableNameWithType);
- IdealState idealState = getIdealState(idealStatePath);
- Preconditions.checkState(idealState != null, "Failed to find ideal state
for table: %s", tableNameWithType);
- int idealStateVersion = idealState.getRecord().getVersion();
-
- String externalViewPath = getExternalViewPath(tableNameWithType);
- ExternalView externalView = getExternalView(externalViewPath);
- int externalViewVersion;
- // NOTE: External view might be null for new created tables. In such case,
create an empty one and set the version
- // to -1 to ensure the version does not match the next external view
- if (externalView == null) {
- externalView = new ExternalView(tableNameWithType);
- externalViewVersion = -1;
- } else {
- externalViewVersion = externalView.getRecord().getVersion();
- }
-
- Set<String> onlineSegments = getOnlineSegments(idealState);
-
- SegmentPreSelector segmentPreSelector =
- SegmentPreSelectorFactory.getSegmentPreSelector(tableConfig,
_propertyStore);
- Set<String> preSelectedOnlineSegments =
segmentPreSelector.preSelect(onlineSegments);
- SegmentSelector segmentSelector =
SegmentSelectorFactory.getSegmentSelector(tableConfig);
- segmentSelector.init(idealState, externalView, preSelectedOnlineSegments);
+ public void buildRouting(String tableNameWithType) {
+ long buildStartTimeMs = System.currentTimeMillis();
+ Object tableLock =
_routingTableBuildLocks.computeIfAbsent(tableNameWithType, k -> new Object());
+ synchronized (tableLock) {
+ Long lastStartObj = _routingTableBuildStartTimeMs.get(tableNameWithType);
+ long lastRoutingBuildStartTimeMs = lastStartObj != null ? lastStartObj :
Long.MIN_VALUE;
+ if (buildStartTimeMs <= lastRoutingBuildStartTimeMs) {
+ LOGGER.info("Skipping routing build for table: {} because the build
routing request timestamp {} "
+ + "is earlier than the last build start time: {}",
+ tableNameWithType, buildStartTimeMs, lastRoutingBuildStartTimeMs);
+ return;
+ }
- // Register segment pruners and initialize segment zk metadata fetcher.
- List<SegmentPruner> segmentPruners =
SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+ // Record build start time to gate older requests
+ _routingTableBuildStartTimeMs.put(tableNameWithType,
System.currentTimeMillis());
+ LOGGER.info("Building routing for table: {}", tableNameWithType);
+
+ TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
+ Preconditions.checkState(tableConfig != null, "Failed to find table
config for table: %s", tableNameWithType);
+
+ String idealStatePath = getIdealStatePath(tableNameWithType);
+ IdealState idealState = getIdealState(idealStatePath);
+ Preconditions.checkState(idealState != null, "Failed to find ideal state
for table: %s", tableNameWithType);
+ int idealStateVersion = idealState.getRecord().getVersion();
+
+ String externalViewPath = getExternalViewPath(tableNameWithType);
+ ExternalView externalView = getExternalView(externalViewPath);
+ int externalViewVersion;
+ // NOTE: External view might be null for new created tables. In such
case, create an empty one and set the
+ // version to -1 to ensure the version does not match the next external
view
+ if (externalView == null) {
+ externalView = new ExternalView(tableNameWithType);
+ externalViewVersion = -1;
+ } else {
+ externalViewVersion = externalView.getRecord().getVersion();
+ }
- AdaptiveServerSelector adaptiveServerSelector =
-
AdaptiveServerSelectorFactory.getAdaptiveServerSelector(_serverRoutingStatsManager,
_pinotConfig);
- InstanceSelector instanceSelector =
- InstanceSelectorFactory.getInstanceSelector(tableConfig,
_propertyStore, _brokerMetrics,
- adaptiveServerSelector, _pinotConfig);
- instanceSelector.init(_routableServers, _enabledServerInstanceMap,
idealState, externalView,
- preSelectedOnlineSegments);
+ Set<String> onlineSegments = getOnlineSegments(idealState);
- // Add time boundary manager if both offline and real-time part exist for
a hybrid table
- TimeBoundaryManager timeBoundaryManager = null;
- String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
- if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
- // Current table is offline
- String realtimeTableName =
TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
- if (_routingEntryMap.containsKey(realtimeTableName)) {
- LOGGER.info("Adding time boundary manager for table: {}",
tableNameWithType);
- timeBoundaryManager = new TimeBoundaryManager(tableConfig,
_propertyStore, _brokerMetrics);
- timeBoundaryManager.init(idealState, externalView,
preSelectedOnlineSegments);
- }
- } else {
- // Current table is real-time
- String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
- RoutingEntry offlineTableRoutingEntry =
_routingEntryMap.get(offlineTableName);
- if (offlineTableRoutingEntry != null &&
offlineTableRoutingEntry.getTimeBoundaryManager() == null) {
- LOGGER.info("Adding time boundary manager for table: {}",
offlineTableName);
-
- // NOTE: Add time boundary manager to the offline part before adding
the routing for the real-time part to
- // ensure no overlapping data getting queried
- TableConfig offlineTableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, offlineTableName);
- Preconditions.checkState(offlineTableConfig != null, "Failed to find
table config for table: %s",
- offlineTableName);
- IdealState offlineTableIdealState =
getIdealState(getIdealStatePath(offlineTableName));
- Preconditions.checkState(offlineTableIdealState != null, "Failed to
find ideal state for table: %s",
- offlineTableName);
- // NOTE: External view might be null for new created tables. In such
case, create an empty one.
- ExternalView offlineTableExternalView =
getExternalView(getExternalViewPath(offlineTableName));
- if (offlineTableExternalView == null) {
- offlineTableExternalView = new ExternalView(offlineTableName);
+ SegmentPreSelector segmentPreSelector =
+ SegmentPreSelectorFactory.getSegmentPreSelector(tableConfig,
_propertyStore);
+ Set<String> preSelectedOnlineSegments =
segmentPreSelector.preSelect(onlineSegments);
+ SegmentSelector segmentSelector =
SegmentSelectorFactory.getSegmentSelector(tableConfig);
+ segmentSelector.init(idealState, externalView,
preSelectedOnlineSegments);
+
+ // Register segment pruners and initialize segment zk metadata fetcher.
+ List<SegmentPruner> segmentPruners =
SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+
+ AdaptiveServerSelector adaptiveServerSelector =
+
AdaptiveServerSelectorFactory.getAdaptiveServerSelector(_serverRoutingStatsManager,
_pinotConfig);
+ InstanceSelector instanceSelector =
+ InstanceSelectorFactory.getInstanceSelector(tableConfig,
_propertyStore, _brokerMetrics,
+ adaptiveServerSelector, _pinotConfig);
+ instanceSelector.init(_routableServers, _enabledServerInstanceMap,
idealState, externalView,
+ preSelectedOnlineSegments);
+
+ // Add time boundary manager if both offline and real-time part exist
for a hybrid table
+ TimeBoundaryManager timeBoundaryManager = null;
+ String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
+ if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
+ // Current table is offline
+ String realtimeTableName =
TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+ if (_routingEntryMap.containsKey(realtimeTableName)) {
+ LOGGER.info("Adding time boundary manager for table: {}",
tableNameWithType);
+ timeBoundaryManager = new TimeBoundaryManager(tableConfig,
_propertyStore, _brokerMetrics);
+ timeBoundaryManager.init(idealState, externalView,
preSelectedOnlineSegments);
+ }
+ } else {
+ // Current table is real-time
+ String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
+ RoutingEntry offlineTableRoutingEntry =
_routingEntryMap.get(offlineTableName);
+ if (offlineTableRoutingEntry != null &&
offlineTableRoutingEntry.getTimeBoundaryManager() == null) {
+ LOGGER.info("Adding time boundary manager for table: {}",
offlineTableName);
+
+ // NOTE: Add time boundary manager to the offline part before adding
the routing for the real-time part to
+ // ensure no overlapping data getting queried
+ TableConfig offlineTableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, offlineTableName);
+ Preconditions.checkState(offlineTableConfig != null, "Failed to find
table config for table: %s",
+ offlineTableName);
+ IdealState offlineTableIdealState =
getIdealState(getIdealStatePath(offlineTableName));
+ Preconditions.checkState(offlineTableIdealState != null, "Failed to
find ideal state for table: %s",
+ offlineTableName);
+ // NOTE: External view might be null for new created tables. In such
case, create an empty one.
+ ExternalView offlineTableExternalView =
getExternalView(getExternalViewPath(offlineTableName));
+ if (offlineTableExternalView == null) {
+ offlineTableExternalView = new ExternalView(offlineTableName);
+ }
+ Set<String> offlineTableOnlineSegments =
getOnlineSegments(offlineTableIdealState);
+ SegmentPreSelector offlineTableSegmentPreSelector =
+
SegmentPreSelectorFactory.getSegmentPreSelector(offlineTableConfig,
_propertyStore);
+ Set<String> offlineTablePreSelectedOnlineSegments =
+
offlineTableSegmentPreSelector.preSelect(offlineTableOnlineSegments);
+ TimeBoundaryManager offlineTableTimeBoundaryManager =
+ new TimeBoundaryManager(offlineTableConfig, _propertyStore,
_brokerMetrics);
+ offlineTableTimeBoundaryManager.init(offlineTableIdealState,
offlineTableExternalView,
+ offlineTablePreSelectedOnlineSegments);
+ Object offlineTableLock = getTableLock(offlineTableName);
+ synchronized (offlineTableLock) {
+
offlineTableRoutingEntry.setTimeBoundaryManager(offlineTableTimeBoundaryManager);
+ }
}
- Set<String> offlineTableOnlineSegments =
getOnlineSegments(offlineTableIdealState);
- SegmentPreSelector offlineTableSegmentPreSelector =
-
SegmentPreSelectorFactory.getSegmentPreSelector(offlineTableConfig,
_propertyStore);
- Set<String> offlineTablePreSelectedOnlineSegments =
-
offlineTableSegmentPreSelector.preSelect(offlineTableOnlineSegments);
- TimeBoundaryManager offlineTableTimeBoundaryManager =
- new TimeBoundaryManager(offlineTableConfig, _propertyStore,
_brokerMetrics);
- offlineTableTimeBoundaryManager.init(offlineTableIdealState,
offlineTableExternalView,
- offlineTablePreSelectedOnlineSegments);
-
offlineTableRoutingEntry.setTimeBoundaryManager(offlineTableTimeBoundaryManager);
}
- }
- SegmentPartitionMetadataManager partitionMetadataManager = null;
- // TODO: Support multiple partition columns
- // TODO: Make partition pruner on top of the partition metadata manager to
avoid keeping 2 copies of the metadata
- if (_enablePartitionMetadataManager) {
- SegmentPartitionConfig segmentPartitionConfig =
tableConfig.getIndexingConfig().getSegmentPartitionConfig();
- if (segmentPartitionConfig != null) {
- Map<String, ColumnPartitionConfig> columnPartitionMap =
segmentPartitionConfig.getColumnPartitionMap();
- if (columnPartitionMap.size() == 1) {
- Map.Entry<String, ColumnPartitionConfig> partitionConfig =
columnPartitionMap.entrySet().iterator().next();
- LOGGER.info("Enabling SegmentPartitionMetadataManager for table: {}
on partition column: {}",
- tableNameWithType, partitionConfig.getKey());
- partitionMetadataManager = new
SegmentPartitionMetadataManager(tableNameWithType, partitionConfig.getKey(),
- partitionConfig.getValue().getFunctionName(),
partitionConfig.getValue().getNumPartitions());
- } else {
- LOGGER.warn("Cannot enable SegmentPartitionMetadataManager for
table: {} with multiple partition columns: {}",
- tableNameWithType, columnPartitionMap.keySet());
+ SegmentPartitionMetadataManager partitionMetadataManager = null;
+ // TODO: Support multiple partition columns
+ // TODO: Make partition pruner on top of the partition metadata manager
to avoid keeping 2 copies of the metadata
+ if (_enablePartitionMetadataManager) {
+ SegmentPartitionConfig segmentPartitionConfig =
tableConfig.getIndexingConfig().getSegmentPartitionConfig();
+ if (segmentPartitionConfig != null) {
+ Map<String, ColumnPartitionConfig> columnPartitionMap =
segmentPartitionConfig.getColumnPartitionMap();
+ if (columnPartitionMap.size() == 1) {
+ Map.Entry<String, ColumnPartitionConfig> partitionConfig =
columnPartitionMap.entrySet().iterator().next();
+ LOGGER.info("Enabling SegmentPartitionMetadataManager for table:
{} on partition column: {}",
+ tableNameWithType, partitionConfig.getKey());
+ partitionMetadataManager = new
SegmentPartitionMetadataManager(tableNameWithType, partitionConfig.getKey(),
+ partitionConfig.getValue().getFunctionName(),
partitionConfig.getValue().getNumPartitions());
+ } else {
+ LOGGER.warn(
+ "Cannot enable SegmentPartitionMetadataManager for table: {}
with multiple partition columns: {}",
+ tableNameWithType, columnPartitionMap.keySet());
+ }
}
}
- }
- QueryConfig queryConfig = tableConfig.getQueryConfig();
- Long queryTimeoutMs = queryConfig != null ? queryConfig.getTimeoutMs() :
null;
+ QueryConfig queryConfig = tableConfig.getQueryConfig();
+ Long queryTimeoutMs = queryConfig != null ? queryConfig.getTimeoutMs() :
null;
- SegmentZkMetadataFetcher segmentZkMetadataFetcher = new
SegmentZkMetadataFetcher(tableNameWithType, _propertyStore);
- for (SegmentZkMetadataFetchListener listener : segmentPruners) {
- segmentZkMetadataFetcher.register(listener);
- }
- if (partitionMetadataManager != null) {
- segmentZkMetadataFetcher.register(partitionMetadataManager);
- }
- segmentZkMetadataFetcher.init(idealState, externalView,
preSelectedOnlineSegments);
-
- RoutingEntry routingEntry =
- new RoutingEntry(tableNameWithType, idealStatePath, externalViewPath,
segmentPreSelector, segmentSelector,
- segmentPruners, instanceSelector, idealStateVersion,
externalViewVersion, segmentZkMetadataFetcher,
- timeBoundaryManager, partitionMetadataManager, queryTimeoutMs,
!idealState.isEnabled());
- if (_routingEntryMap.put(tableNameWithType, routingEntry) == null) {
- LOGGER.info("Built routing for table: {}", tableNameWithType);
- } else {
- LOGGER.info("Rebuilt routing for table: {}", tableNameWithType);
+ SegmentZkMetadataFetcher segmentZkMetadataFetcher =
+ new SegmentZkMetadataFetcher(tableNameWithType, _propertyStore);
+ for (SegmentZkMetadataFetchListener listener : segmentPruners) {
+ segmentZkMetadataFetcher.register(listener);
+ }
+ if (partitionMetadataManager != null) {
+ segmentZkMetadataFetcher.register(partitionMetadataManager);
+ }
+ segmentZkMetadataFetcher.init(idealState, externalView,
preSelectedOnlineSegments);
+
+ RoutingEntry routingEntry =
+ new RoutingEntry(tableNameWithType, idealStatePath,
externalViewPath, segmentPreSelector, segmentSelector,
+ segmentPruners, instanceSelector, idealStateVersion,
externalViewVersion, segmentZkMetadataFetcher,
+ timeBoundaryManager, partitionMetadataManager, queryTimeoutMs,
!idealState.isEnabled());
+ if (_routingEntryMap.put(tableNameWithType, routingEntry) == null) {
+ LOGGER.info("Built routing for table: {}", tableNameWithType);
+ } else {
+ LOGGER.info("Rebuilt routing for table: {}", tableNameWithType);
+ }
}
}
@@ -637,7 +687,7 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
/**
* Removes the routing for the given table.
*/
- public synchronized void removeRouting(String tableNameWithType) {
+ public void removeRouting(String tableNameWithType) {
LOGGER.info("Removing routing for table: {}", tableNameWithType);
if (_routingEntryMap.remove(tableNameWithType) != null) {
LOGGER.info("Removed routing for table: {}", tableNameWithType);
@@ -649,10 +699,16 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
TableNameBuilder.OFFLINE.tableNameWithType(TableNameBuilder.extractRawTableName(tableNameWithType));
RoutingEntry routingEntry = _routingEntryMap.get(offlineTableName);
if (routingEntry != null) {
- routingEntry.setTimeBoundaryManager(null);
+ Object offlineTableLock = getTableLock(offlineTableName);
+ synchronized (offlineTableLock) {
+ routingEntry.setTimeBoundaryManager(null);
+ }
LOGGER.info("Removed time boundary manager for table: {}",
offlineTableName);
}
}
+ // Clean up any per-table build lock as the routing for this table has
been removed
+ _routingTableBuildLocks.remove(tableNameWithType);
+ _routingTableBuildStartTimeMs.remove(tableNameWithType);
} else {
LOGGER.warn("Routing does not exist for table: {}, skipping removing
routing", tableNameWithType);
}
@@ -688,7 +744,10 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
RoutingEntry routingEntry = _routingEntryMap.get(tableNameWithType);
if (routingEntry != null) {
- routingEntry.setTimeBoundaryManager(null);
+ Object tableLock = getTableLock(tableNameWithType);
+ synchronized (tableLock) {
+ routingEntry.setTimeBoundaryManager(null);
+ }
LOGGER.info("Removed time boundary manager for table: {}",
tableNameWithType);
} else {
LOGGER.warn("Routing does not exist for table: {}, skipping",
tableNameWithType);
@@ -699,14 +758,17 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
/**
* Refreshes the metadata for the given segment (called when segment is
getting refreshed).
*/
- public synchronized void refreshSegment(String tableNameWithType, String
segment) {
+ public void refreshSegment(String tableNameWithType, String segment) {
LOGGER.info("Refreshing segment: {} for table: {}", segment,
tableNameWithType);
- RoutingEntry routingEntry = _routingEntryMap.get(tableNameWithType);
- if (routingEntry != null) {
- routingEntry.refreshSegment(segment);
- LOGGER.info("Refreshed segment: {} for table: {}", segment,
tableNameWithType);
- } else {
- LOGGER.warn("Routing does not exist for table: {}, skipping refreshing
segment", tableNameWithType);
+ Object tableLock = getTableLock(tableNameWithType);
+ synchronized (tableLock) {
+ RoutingEntry routingEntry = _routingEntryMap.get(tableNameWithType);
+ if (routingEntry != null) {
+ routingEntry.refreshSegment(segment);
+ LOGGER.info("Refreshed segment: {} for table: {}", segment,
tableNameWithType);
+ } else {
+ LOGGER.warn("Routing does not exist for table: {}, skipping refreshing
segment", tableNameWithType);
+ }
}
}
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/BuildRoutingDelayTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/BuildRoutingDelayTest.java
new file mode 100644
index 00000000000..f0fb052aab0
--- /dev/null
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/BuildRoutingDelayTest.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing;
+
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class BuildRoutingDelayTest {
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testBuildRoutingSkipsWhenRequestIsOlderThanLastStart()
+ throws Exception {
+ // Construct with nulls as the build should return early before using
these fields
+ BrokerRoutingManager manager = new BrokerRoutingManager(null, null, new
PinotConfiguration());
+
+ String tableNameWithType = "testTable_OFFLINE";
+
+ // Set a future last build start time to force skipping the current build
call
+ long futureStart = System.currentTimeMillis() + 10_000L;
+
+ Field startTimesField =
BrokerRoutingManager.class.getDeclaredField("_routingTableBuildStartTimeMs");
+ startTimesField.setAccessible(true);
+ Map<String, Long> startTimes = (Map<String, Long>)
startTimesField.get(manager);
+ if (startTimes == null) {
+ startTimes = new ConcurrentHashMap<>();
+ startTimesField.set(manager, startTimes);
+ }
+ startTimes.put(tableNameWithType, futureStart);
+
+ // Should return without throwing and without attempting to build routing
+ manager.buildRouting(tableNameWithType);
+
+ // Ensure routing was not created and the last start time was not
overwritten
+ Assert.assertFalse(manager.routingExists(tableNameWithType));
+ Assert.assertEquals(startTimes.get(tableNameWithType).longValue(),
futureStart);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]