J-HowHuang commented on code in PR #16886:
URL: https://github.com/apache/pinot/pull/16886#discussion_r2395340924
##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java:
##########
@@ -1086,6 +1089,368 @@ private void addTenantTagToInstances(String testTenant)
{
});
}
+ @Test
+ public void testZkBasedTenantRebalanceObserverPoll()
+ throws Exception {
+ int numServers = 3;
+ for (int i = 0; i < numServers; i++) {
+ addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX +
i, true);
+ }
+
+ TenantRebalancer tenantRebalancer =
+ new TenantRebalancer(_tableRebalanceManager, _helixResourceManager,
_executorService);
+
+ // tag all servers and brokers to test tenant
+ addTenantTagToInstances(TENANT_NAME);
+
+ // create 2 schemas
+ addDummySchema(RAW_TABLE_NAME_A);
+ addDummySchema(RAW_TABLE_NAME_B);
+
+ // create 2 tables on test tenant
+ createTableWithSegments(RAW_TABLE_NAME_A, TENANT_NAME);
+ createTableWithSegments(RAW_TABLE_NAME_B, TENANT_NAME);
+
+ // Add 3 more servers which will be tagged to default tenant
+ int numServersToAdd = 3;
+ for (int i = 0; i < numServersToAdd; i++) {
+ addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX +
(numServers + i), true);
+ }
+ addTenantTagToInstances(TENANT_NAME);
+
+ String jobId = "test-poll-job-123";
+
+ // Create tenant rebalance context with tables in queues
+ TenantRebalanceConfig config = new TenantRebalanceConfig();
+ config.setTenantName(TENANT_NAME);
+ config.setVerboseResult(true);
+ config.setDryRun(true);
+
+ TenantRebalanceResult dryRunResult = tenantRebalancer.rebalance(config);
+ TenantRebalanceContext context = new TenantRebalanceContext(
+ jobId, config, 1,
+ dryRunResult.getRebalanceTableResults().keySet().stream()
+ .map(tableName -> new
TenantRebalancer.TenantTableRebalanceJobContext(tableName, tableName + "_job",
false))
+ .collect(Collectors.toCollection(ConcurrentLinkedDeque::new)),
+ new LinkedList<>(),
+ new ConcurrentLinkedQueue<>()
+ );
+
+ TenantRebalanceProgressStats progressStats =
+ new
TenantRebalanceProgressStats(dryRunResult.getRebalanceTableResults().keySet());
+
+ // Test polling from parallel queue
+ ZkBasedTenantRebalanceObserver observer =
+ new ZkBasedTenantRebalanceObserver(jobId, TENANT_NAME, progressStats,
context, _helixResourceManager);
+ TenantRebalancer.TenantTableRebalanceJobContext polledJob =
observer.pollParallel();
+ assertNotNull(polledJob);
+
assertTrue(dryRunResult.getRebalanceTableResults().containsKey(polledJob.getTableName()));
+
+ // Test polling from sequential queue (should be empty)
+ TenantRebalancer.TenantTableRebalanceJobContext sequentialJob =
observer.pollSequential();
+ assertNull(sequentialJob);
+
+ // Verify the job was moved to ongoing queue and status was updated
+ Map<String, String> updatedMetadata =
+ _helixResourceManager.getControllerJobZKMetadata(jobId,
ControllerJobTypes.TENANT_REBALANCE);
+ assertNotNull(updatedMetadata);
+ TenantRebalanceContext updatedContext =
TenantRebalanceContext.fromTenantRebalanceJobMetadata(updatedMetadata);
+ TenantRebalanceProgressStats updatedStats =
+
TenantRebalanceProgressStats.fromTenantRebalanceJobMetadata(updatedMetadata);
+
+ assertNotNull(updatedContext);
+ assertNotNull(updatedStats);
+ assertEquals(updatedContext.getOngoingJobsQueue().size(), 1);
+
assertEquals(updatedStats.getTableStatusMap().get(polledJob.getTableName()),
+ TenantRebalanceProgressStats.TableStatus.REBALANCING.name());
+
+ _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_A);
+ _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME_B);
+
+ for (int i = 0; i < numServers + numServersToAdd; i++) {
+ stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+ }
+ }
+
+ @Test
+ public void testZkBasedTenantRebalanceObserverCancelJob()
+ throws Exception {
+ int numServers = 3;
+ for (int i = 0; i < numServers; i++) {
+ addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX +
i, true);
+ }
+
+ TenantRebalancer tenantRebalancer =
+ new TenantRebalancer(_tableRebalanceManager, _helixResourceManager,
_executorService);
+
+ // tag all servers and brokers to test tenant
+ addTenantTagToInstances(TENANT_NAME);
+
+ // create 2 schemas
+ addDummySchema(RAW_TABLE_NAME_A);
+ addDummySchema(RAW_TABLE_NAME_B);
+
+ // create 2 tables on test tenant
+ createTableWithSegments(RAW_TABLE_NAME_A, TENANT_NAME);
+ createTableWithSegments(RAW_TABLE_NAME_B, TENANT_NAME);
+
+ // Add 3 more servers which will be tagged to default tenant
+ int numServersToAdd = 3;
+ for (int i = 0; i < numServersToAdd; i++) {
+ addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX +
(numServers + i), true);
+ }
+ addTenantTagToInstances(TENANT_NAME);
+
+ // Create observer and test cancellation
+ String jobId = "test-cancel-job-456";
+
+ // Create tenant rebalance context with tables in queues
+ TenantRebalanceConfig config = new TenantRebalanceConfig();
+ config.setTenantName(TENANT_NAME);
+ config.setVerboseResult(true);
+ config.setDryRun(true);
+
+ TenantRebalanceResult dryRunResult = tenantRebalancer.rebalance(config);
+ Set<String> tableNames = dryRunResult.getRebalanceTableResults().keySet();
+
+ TenantRebalanceContext context = new TenantRebalanceContext(
+ jobId, config, 1,
+ tableNames.stream()
+ .map(tableName -> new
TenantRebalancer.TenantTableRebalanceJobContext(tableName, tableName + "_job",
false))
+ .collect(Collectors.toCollection(ConcurrentLinkedDeque::new)),
+ new LinkedList<>(),
+ new ConcurrentLinkedQueue<>()
+ );
+
+ TenantRebalanceProgressStats progressStats = new
TenantRebalanceProgressStats(tableNames);
+
+ // Test cancellation by user
+ ZkBasedTenantRebalanceObserver observer =
+ new ZkBasedTenantRebalanceObserver(jobId, TENANT_NAME, progressStats,
context, _helixResourceManager);
+
+ // move one job to ongoing to test cancellation from that state
+ TenantRebalancer.TenantTableRebalanceJobContext polledJob =
observer.pollParallel();
+ assertNotNull(polledJob);
+
assertTrue(dryRunResult.getRebalanceTableResults().containsKey(polledJob.getTableName()));
+
+ Pair<List<String>, Boolean> cancelResult = observer.cancelJob(true);
+ assertTrue(cancelResult.getRight()); // cancellation was successful
+ assertTrue(cancelResult.getLeft().isEmpty()); // no jobs were cancelled
(they were just in queue)
Review Comment:
The comment here was wrong. The job wouldn't be in the cancelled job list
because the job was only added to the ongoing queue but not yet started a table
rebalance job.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]