yashmayya commented on code in PR #16886:
URL: https://github.com/apache/pinot/pull/16886#discussion_r2395468117
##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java:
##########
@@ -1451,6 +1458,142 @@ public void
testZkBasedTenantRebalanceObserverLifecycle()
}
}
+ @Test
+ public void testZkBasedTenantRebalanceObserverConcurrentPoll()
+ throws Exception {
+
+ int numServers = 3;
+ for (int i = 0; i < numServers; i++) {
+ addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX +
i, true);
+ }
+
+ // tag all servers and brokers to test tenant
+ addTenantTagToInstances(TENANT_NAME);
+
+ // create multiple schemas and tables for concurrent testing
+ String[] tableNames = {"table1", "table2", "table3", "table4", "table5"};
+ for (String tableName : tableNames) {
+ addDummySchema(tableName);
+ createTableWithSegments(tableName, TENANT_NAME);
+ }
+
+ // Add more servers
+ int numServersToAdd = 3;
+ for (int i = 0; i < numServersToAdd; i++) {
+ addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX +
(numServers + i), true);
+ }
+ addTenantTagToInstances(TENANT_NAME);
+
+ for (int i = 0; i < 3; i++) {
+ runZkBasedTenantRebalanceObserverConcurrentPoll();
+ }
+
+ // Clean up tables
+ for (String tableName : tableNames) {
+ _helixResourceManager.deleteOfflineTable(tableName);
+ }
+
+ for (int i = 0; i < numServers + numServersToAdd; i++) {
+ stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+ }
+ }
+
+ private void runZkBasedTenantRebalanceObserverConcurrentPoll()
+ throws Exception {
+ TenantRebalancer tenantRebalancer =
+ new TenantRebalancer(_tableRebalanceManager, _helixResourceManager,
_executorService);
+
+ String jobId = "test-concurrent-poll-job-303";
+
+ // Create tenant rebalance context with multiple tables
+ TenantRebalanceConfig config = new TenantRebalanceConfig();
+ config.setTenantName(TENANT_NAME);
+ config.setVerboseResult(true);
+ config.setDryRun(true);
+
+ TenantRebalanceResult dryRunResult = tenantRebalancer.rebalance(config);
+ Set<String> offlineTableNames =
dryRunResult.getRebalanceTableResults().keySet();
+
+ TenantRebalanceContext context = new TenantRebalanceContext(
+ jobId, config, 1,
+ offlineTableNames.stream()
+ .map(tableName -> new
TenantRebalancer.TenantTableRebalanceJobContext(tableName, tableName + "_job",
false))
+ .collect(Collectors.toCollection(ConcurrentLinkedDeque::new)),
+ new LinkedList<>(),
+ new ConcurrentLinkedQueue<>()
+ );
+
+ TenantRebalanceProgressStats progressStats = new
TenantRebalanceProgressStats(offlineTableNames);
+
+ // Create observer
+ ZkBasedTenantRebalanceObserver observer =
+ new ZkBasedTenantRebalanceObserver(jobId, TENANT_NAME, progressStats,
context, _helixResourceManager);
+
+ // Test concurrent polling with multiple threads
+ int numThreads = offlineTableNames.size();
+ ExecutorService concurrentExecutor =
Executors.newFixedThreadPool(numThreads);
+ List<Future<TenantRebalancer.TenantTableRebalanceJobContext>> futures =
new ArrayList<>();
+ Set<String> polledTables = ConcurrentHashMap.newKeySet();
+ AtomicInteger pollCount = new AtomicInteger(0);
+
+ // Submit concurrent polling tasks
+ for (int i = 0; i < numThreads; i++) {
+ futures.add(concurrentExecutor.submit(() -> {
+ TenantRebalancer.TenantTableRebalanceJobContext job =
observer.pollParallel();
+ if (job != null) {
+ polledTables.add(job.getTableName());
+ pollCount.incrementAndGet();
+ }
+ return job;
+ }));
+ }
+
+ // Wait for all polling tasks to complete
+ List<TenantRebalancer.TenantTableRebalanceJobContext> polledJobs = new
ArrayList<>();
+ for (Future<TenantRebalancer.TenantTableRebalanceJobContext> future :
futures) {
+ TenantRebalancer.TenantTableRebalanceJobContext job = future.get(5,
TimeUnit.SECONDS);
+ if (job != null) {
+ polledJobs.add(job);
+ }
+ }
+
+ // Verify thread safety: no duplicate jobs should be polled
+ assertEquals(polledJobs.size(), pollCount.get());
+ assertEquals(polledTables.size(), polledJobs.size());
+ assertTrue(polledJobs.size() <= offlineTableNames.size());
+
+ // Verify that all polled jobs are valid table names
+ for (TenantRebalancer.TenantTableRebalanceJobContext job : polledJobs) {
+ assertTrue(offlineTableNames.contains(job.getTableName()));
+ }
+
+ // Verify ZK state consistency after concurrent operations
+ Map<String, String> updatedMetadata =
+ _helixResourceManager.getControllerJobZKMetadata(jobId,
ControllerJobTypes.TENANT_REBALANCE);
+ assertNotNull(updatedMetadata);
+ TenantRebalanceContext updatedContext =
TenantRebalanceContext.fromTenantRebalanceJobMetadata(updatedMetadata);
+ TenantRebalanceProgressStats updatedStats =
+
TenantRebalanceProgressStats.fromTenantRebalanceJobMetadata(updatedMetadata);
+
+ assertNotNull(updatedContext);
+ assertNotNull(updatedStats);
+
+ // Verify that polled jobs are in ongoing queue
+ assertEquals(updatedContext.getOngoingJobsQueue().size(),
polledJobs.size());
+ for (TenantRebalancer.TenantTableRebalanceJobContext job : polledJobs) {
+ assertTrue(updatedContext.getOngoingJobsQueue().contains(job));
+ assertEquals(updatedStats.getTableStatusMap().get(job.getTableName()),
+ TenantRebalanceProgressStats.TableStatus.REBALANCING.name());
+ }
+
+ // Verify remaining tables in parallel queue
+ assertEquals(updatedContext.getParallelQueue().size(),
offlineTableNames.size() - polledJobs.size());
Review Comment:
Won't the parallel queue always be empty here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]