Jackie-Jiang commented on code in PR #16791:
URL: https://github.com/apache/pinot/pull/16791#discussion_r2364915850
##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -461,6 +461,10 @@ public static class Broker {
"pinot.broker.enable.partition.metadata.manager";
public static final boolean DEFAULT_ENABLE_PARTITION_METADATA_MANAGER =
true;
+ public static final String
CONFIG_OF_ROUTING_PROCESS_SEGMENT_ASSIGNMENT_CHANGE_NUM_THREADS =
+ "pinot.broker.routing.processSegmentAssignmentChangeNumThreads";
+ public static final int
DEFAULT_ROUTING_PROCESS_SEGMENT_ASSIGNMENT_CHANGE_NUM_THREADS = 10;
Review Comment:
Use `Runtime.getRuntime().availableProcessors()`
##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java:
##########
@@ -489,132 +637,185 @@ public synchronized void
buildRoutingForLogicalTable(String logicalTableName) {
* Builds the routing for a table.
* @param tableNameWithType the name of the table
*/
- public synchronized void buildRouting(String tableNameWithType) {
- LOGGER.info("Building routing for table: {}", tableNameWithType);
-
- TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
- Preconditions.checkState(tableConfig != null, "Failed to find table config
for table: %s", tableNameWithType);
-
- String idealStatePath = getIdealStatePath(tableNameWithType);
- IdealState idealState = getIdealState(idealStatePath);
- Preconditions.checkState(idealState != null, "Failed to find ideal state
for table: %s", tableNameWithType);
- int idealStateVersion = idealState.getRecord().getVersion();
-
- String externalViewPath = getExternalViewPath(tableNameWithType);
- ExternalView externalView = getExternalView(externalViewPath);
- int externalViewVersion;
- // NOTE: External view might be null for new created tables. In such case,
create an empty one and set the version
- // to -1 to ensure the version does not match the next external view
- if (externalView == null) {
- externalView = new ExternalView(tableNameWithType);
- externalViewVersion = -1;
- } else {
- externalViewVersion = externalView.getRecord().getVersion();
+ public void buildRouting(String tableNameWithType) {
+ _globalLock.readLock().lock();
+ try {
+ buildRoutingInternal(tableNameWithType);
+ } finally {
+ _globalLock.readLock().unlock();
}
+ }
- Set<String> onlineSegments = getOnlineSegments(idealState);
+ private void buildRoutingInternal(String tableNameWithType) {
+ long buildStartTimeMs = System.currentTimeMillis();
+ Object tableLock = getRoutingTableBuildLock(tableNameWithType);
+ synchronized (tableLock) {
+ long lastBuildStartTimeMs =
getLastRoutingTableBuildStartTimeMs(tableNameWithType);
+ if (buildStartTimeMs <= lastBuildStartTimeMs) {
+ LOGGER.info("Skipping routing build for table: {} because the build
routing request timestamp {} "
+ + "is earlier than the last build start time: {}",
+ tableNameWithType, buildStartTimeMs, lastBuildStartTimeMs);
+ return;
+ }
- SegmentPreSelector segmentPreSelector =
- SegmentPreSelectorFactory.getSegmentPreSelector(tableConfig,
_propertyStore);
- Set<String> preSelectedOnlineSegments =
segmentPreSelector.preSelect(onlineSegments);
- SegmentSelector segmentSelector =
SegmentSelectorFactory.getSegmentSelector(tableConfig);
- segmentSelector.init(idealState, externalView, preSelectedOnlineSegments);
+ int entryAddedOrUpdatedIteration = 0;
+ do {
+ // Record build start time to gate older requests and to use to
compare with the timestamp for when
+ // the global processSegmentAssignmentChange() was last called
+ _routingTableBuildStartTimeMs.put(tableNameWithType,
System.currentTimeMillis());
Review Comment:
Do we need to repeat the entire process? To keep the behavior consistent,
should we repeat the part in from line 269-291 (extract a common method)?
##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java:
##########
@@ -131,6 +162,10 @@ public BrokerRoutingManager(BrokerMetrics brokerMetrics,
ServerRoutingStatsManag
_enablePartitionMetadataManager =
pinotConfig.getProperty(CommonConstants.Broker.CONFIG_OF_ENABLE_PARTITION_METADATA_MANAGER,
CommonConstants.Broker.DEFAULT_ENABLE_PARTITION_METADATA_MANAGER);
+ _processSegmentAssignmentChangeNumThreads =
+
pinotConfig.getProperty(CommonConstants.Broker.CONFIG_OF_ROUTING_PROCESS_SEGMENT_ASSIGNMENT_CHANGE_NUM_THREADS,
+
CommonConstants.Broker.DEFAULT_ROUTING_PROCESS_SEGMENT_ASSIGNMENT_CHANGE_NUM_THREADS);
+ _processAssignmentChangeSnapshotTimestampMs = Long.MIN_VALUE;
Review Comment:
(nit) This is probably redundant given timestamp is always positive
##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -461,6 +461,10 @@ public static class Broker {
"pinot.broker.enable.partition.metadata.manager";
public static final boolean DEFAULT_ENABLE_PARTITION_METADATA_MANAGER =
true;
+ public static final String
CONFIG_OF_ROUTING_PROCESS_SEGMENT_ASSIGNMENT_CHANGE_NUM_THREADS =
Review Comment:
Seems we don't use camel case for broker config. Consider renaming to
`pinot.broker.routing.assignment.change.process.parallelism`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]