This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 0b5dcb7 Unify the minion plug-in package regex path (#6980) 0b5dcb7 is described below commit 0b5dcb7aed1b85ab26345ab6c1076ccb88358f4a Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Wed May 26 21:53:00 2021 -0700 Unify the minion plug-in package regex path (#6980) ## Description Unify minion pluggable class (`PinotTaskGenerator`, `PinotTaskExecutorFactory`, `MinionEventObserverFactory`) package regex path to `org.apache.pinot.*.plugin.minion.tasks.*`. Modify `SimpleMinionClusterIntegrationTest` to use the pluggable classes. ## Release Notes Regex path for pluggable `MinionEventObserverFactory` is changed from `org.apache.pinot.*.event.*` to `org.apache.pinot.*.plugin.minion.tasks.*` --- .../minion/generator/TaskGeneratorRegistry.java | 2 +- .../tests/BasicAuthBatchIntegrationTest.java | 28 ++-- .../tests/BasicAuthRealtimeIntegrationTest.java | 5 +- .../pinot/integration/tests/ClusterTest.java | 24 +-- ...vertToRawIndexMinionClusterIntegrationTest.java | 2 +- ...fflineSegmentsMinionClusterIntegrationTest.java | 2 +- .../tests/SimpleMinionClusterIntegrationTest.java | 161 ++------------------- .../minion/tasks/TestEventObserverFactory.java | 73 ++++++++++ .../minion/tasks/TestTaskExecutorFactory.java | 81 +++++++++++ .../plugin/minion/tasks/TestTaskGenerator.java | 72 +++++++++ .../minion/event/EventObserverFactoryRegistry.java | 9 +- .../executor/TaskExecutorFactoryRegistry.java | 6 +- .../annotations/minion/EventObserverFactory.java | 2 +- .../annotations/minion/TaskExecutorFactory.java | 2 +- .../spi/annotations/minion/TaskGenerator.java | 2 +- 15 files changed, 276 insertions(+), 195 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java index 95dee4b..083996d 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java @@ -40,7 +40,7 @@ public class TaskGeneratorRegistry { private final Map<String, PinotTaskGenerator> _taskGeneratorRegistry = new HashMap<>(); /** - * The package regex pattern for auto-registered {@link TaskGenerator}. + * The package regex pattern for auto-registered {@link PinotTaskGenerator}. */ public static final String TASK_GENERATOR_PACKAGE_REGEX_PATTERN = ".*\\.plugin\\.minion\\.tasks\\..*"; diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthBatchIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthBatchIntegrationTest.java index 1bb6825..b5d0bb7 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthBatchIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthBatchIntegrationTest.java @@ -24,7 +24,6 @@ import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; -import java.util.Collections; import java.util.Map; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; @@ -63,7 +62,7 @@ public class BasicAuthBatchIntegrationTest extends ClusterTest { startController(); startBroker(); startServer(); - startMinion(Collections.emptyList(), Collections.emptyList()); + startMinion(); } @AfterClass(alwaysRun = true) @@ -99,8 +98,8 @@ public class BasicAuthBatchIntegrationTest extends ClusterTest { @Test public void testBrokerNoAuth() throws Exception { - JsonNode response = - JsonUtils.stringToJsonNode(sendPostRequest("http://localhost:" + getRandomBrokerPort() + "/query/sql", "{\"sql\":\"SELECT now()\"}")); + JsonNode response = JsonUtils.stringToJsonNode( + sendPostRequest("http://localhost:" + getRandomBrokerPort() + "/query/sql", "{\"sql\":\"SELECT now()\"}")); Assert.assertFalse(response.has("resultTable"), "must not return result table"); Assert.assertTrue(response.get("exceptions").get(0).get("errorCode").asInt() != 0, "must return error code"); } @@ -109,7 +108,8 @@ public class BasicAuthBatchIntegrationTest extends ClusterTest { public void testBroker() throws Exception { JsonNode response = JsonUtils.stringToJsonNode( - sendPostRequest("http://localhost:" + getRandomBrokerPort() + "/query/sql", "{\"sql\":\"SELECT now()\"}", AUTH_HEADER)); + sendPostRequest("http://localhost:" + getRandomBrokerPort() + "/query/sql", "{\"sql\":\"SELECT now()\"}", + AUTH_HEADER)); Assert.assertEquals(response.get("resultTable").get("dataSchema").get("columnDataTypes").get(0).asText(), "LONG", "must return result with LONG value"); Assert.assertTrue(response.get("exceptions").isEmpty(), "must not return exception"); @@ -118,7 +118,8 @@ public class BasicAuthBatchIntegrationTest extends ClusterTest { @Test public void testControllerGetTables() throws Exception { - JsonNode response = JsonUtils.stringToJsonNode(sendGetRequest("http://localhost:" + getControllerPort() + "/tables", AUTH_HEADER)); + JsonNode response = + JsonUtils.stringToJsonNode(sendGetRequest("http://localhost:" + getControllerPort() + "/tables", AUTH_HEADER)); Assert.assertTrue(response.get("tables").isArray(), "must return table array"); } @@ -155,18 +156,17 @@ public class BasicAuthBatchIntegrationTest extends ClusterTest { // patch ingestion job file String jobFileContents = IOUtils.toString(new FileInputStream(jobFile)); - IOUtils.write(jobFileContents.replaceAll("9000", String.valueOf(getControllerPort())), - new FileOutputStream(jobFile)); + IOUtils + .write(jobFileContents.replaceAll("9000", String.valueOf(getControllerPort())), new FileOutputStream(jobFile)); - new BootstrapTableTool("http", "localhost", getControllerPort(), baseDir.getAbsolutePath(), AUTH_TOKEN) - .execute(); + new BootstrapTableTool("http", "localhost", getControllerPort(), baseDir.getAbsolutePath(), AUTH_TOKEN).execute(); Thread.sleep(5000); // admin with full access JsonNode response = JsonUtils.stringToJsonNode( - sendPostRequest("http://localhost:" + getRandomBrokerPort() + "/query/sql", "{\"sql\":\"SELECT count(*) FROM baseballStats\"}", - AUTH_HEADER)); + sendPostRequest("http://localhost:" + getRandomBrokerPort() + "/query/sql", + "{\"sql\":\"SELECT count(*) FROM baseballStats\"}", AUTH_HEADER)); Assert.assertEquals(response.get("resultTable").get("dataSchema").get("columnDataTypes").get(0).asText(), "LONG", "must return result with LONG value"); Assert.assertEquals(response.get("resultTable").get("dataSchema").get("columnNames").get(0).asText(), "count(*)", @@ -177,8 +177,8 @@ public class BasicAuthBatchIntegrationTest extends ClusterTest { // user with valid auth but no table access JsonNode responseUser = JsonUtils.stringToJsonNode( - sendPostRequest("http://localhost:" + getRandomBrokerPort() + "/query/sql", "{\"sql\":\"SELECT count(*) FROM baseballStats\"}", - AUTH_HEADER_USER)); + sendPostRequest("http://localhost:" + getRandomBrokerPort() + "/query/sql", + "{\"sql\":\"SELECT count(*) FROM baseballStats\"}", AUTH_HEADER_USER)); Assert.assertFalse(responseUser.has("resultTable"), "must not return result table"); Assert.assertTrue(responseUser.get("exceptions").get(0).get("errorCode").asInt() != 0, "must return error code"); } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthRealtimeIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthRealtimeIntegrationTest.java index 05c2264..4a9105e 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthRealtimeIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthRealtimeIntegrationTest.java @@ -61,7 +61,7 @@ public class BasicAuthRealtimeIntegrationTest extends BaseClusterIntegrationTest startController(); startBroker(); startServer(); - startMinion(null, null); + startMinion(); // Unpack the Avro files List<File> avroFiles = unpackAvroData(_tempDir); @@ -141,8 +141,7 @@ public class BasicAuthRealtimeIntegrationTest extends BaseClusterIntegrationTest @Override protected Connection getPinotConnection() { if (_pinotConnection == null) { - _pinotConnection = - ConnectionFactory.fromZookeeper(getZkUrl() + "/" + getHelixClusterName(), AUTH_HEADER); + _pinotConnection = ConnectionFactory.fromZookeeper(getZkUrl() + "/" + getHelixClusterName(), AUTH_HEADER); } return _pinotConnection; } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java index fdb1d5c..83cd38b 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java @@ -35,7 +35,6 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import javax.annotation.Nullable; import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; @@ -53,8 +52,6 @@ import org.apache.pinot.common.exception.HttpErrorStatusException; import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.controller.helix.ControllerTest; import org.apache.pinot.minion.MinionStarter; -import org.apache.pinot.minion.event.MinionEventObserverFactory; -import org.apache.pinot.minion.executor.PinotTaskExecutorFactory; import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor; import org.apache.pinot.plugin.inputformat.avro.AvroUtils; import org.apache.pinot.server.starter.helix.DefaultHelixStarterServerConfig; @@ -161,8 +158,7 @@ public abstract class ClusterTest extends ControllerTest { } protected void startServer(PinotConfiguration configuration) { - startServers(1, configuration, Server.DEFAULT_ADMIN_API_PORT, Helix.DEFAULT_SERVER_NETTY_PORT, - getZkUrl()); + startServers(1, configuration, Server.DEFAULT_ADMIN_API_PORT, Helix.DEFAULT_SERVER_NETTY_PORT, getZkUrl()); } protected void startServers(int numServers) { @@ -206,26 +202,13 @@ public abstract class ClusterTest extends ControllerTest { // NOTE: We don't allow multiple Minion instances in the same JVM because Minion uses singleton class MinionContext // to manage the instance level configs - protected void startMinion(@Nullable List<PinotTaskExecutorFactory> taskExecutorFactories, - @Nullable List<MinionEventObserverFactory> eventObserverFactories) { + protected void startMinion() { FileUtils.deleteQuietly(new File(Minion.DEFAULT_INSTANCE_BASE_DIR)); try { PinotConfiguration minionConf = getDefaultMinionConfiguration(); minionConf.setProperty(Helix.CONFIG_OF_CLUSTER_NAME, getHelixClusterName()); minionConf.setProperty(Helix.CONFIG_OF_ZOOKEEPR_SERVER, getZkUrl()); _minionStarter = new MinionStarter(minionConf); - // Register task executor factories - if (taskExecutorFactories != null) { - for (PinotTaskExecutorFactory taskExecutorFactory : taskExecutorFactories) { - _minionStarter.registerTaskExecutorFactory(taskExecutorFactory); - } - } - // Register event observer factories - if (eventObserverFactories != null) { - for (MinionEventObserverFactory eventObserverFactory : eventObserverFactories) { - _minionStarter.registerEventObserverFactory(eventObserverFactory); - } - } _minionStarter.start(); } catch (Exception e) { throw new RuntimeException(e); @@ -420,7 +403,8 @@ public abstract class ClusterTest extends ControllerTest { /** * Queries the broker's pql query endpoint (/query) */ - public static JsonNode postQuery(String query, String brokerBaseApiUrl, boolean enableTrace, String queryType, Map<String, String> headers) + public static JsonNode postQuery(String query, String brokerBaseApiUrl, boolean enableTrace, String queryType, + Map<String, String> headers) throws Exception { ObjectNode payload = JsonUtils.newObjectNode(); payload.put(queryType, query); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java index 1d96c71..83312b2 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java @@ -81,7 +81,7 @@ public class ConvertToRawIndexMinionClusterIntegrationTest extends HybridCluster // The parent setUp() sets up Zookeeper, Kafka, controller, broker and servers super.setUp(); - startMinion(null, null); + startMinion(); _helixTaskResourceManager = _controllerStarter.getHelixTaskResourceManager(); _taskManager = _controllerStarter.getTaskManager(); } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java index 91ca0aa..6a8f654 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java @@ -71,7 +71,7 @@ public class RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends Realt // Setup realtime table, and blank offline table super.setUp(); addTableConfig(createOfflineTableConfig()); - startMinion(null, null); + startMinion(); _helixTaskResourceManager = _controllerStarter.getHelixTaskResourceManager(); _taskManager = _controllerStarter.getTaskManager(); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java index c753611..17e984c 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java @@ -18,34 +18,19 @@ */ package org.apache.pinot.integration.tests; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; -import javax.annotation.Nullable; import org.apache.helix.task.TaskState; import org.apache.pinot.common.metrics.ControllerGauge; import org.apache.pinot.common.metrics.ControllerMetrics; -import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor; import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager; import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator; -import org.apache.pinot.core.minion.PinotTaskConfig; -import org.apache.pinot.minion.event.MinionEventObserver; -import org.apache.pinot.minion.event.MinionEventObserverFactory; -import org.apache.pinot.minion.exception.TaskCancelledException; -import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager; import org.apache.pinot.minion.executor.PinotTaskExecutor; -import org.apache.pinot.minion.executor.PinotTaskExecutorFactory; -import org.apache.pinot.plugin.minion.tasks.BaseTaskExecutor; -import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableTaskConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; -import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.util.TestUtils; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -59,20 +44,21 @@ import static org.testng.Assert.*; * minion functionality. */ public class SimpleMinionClusterIntegrationTest extends ClusterTest { - private static final String TASK_TYPE = "TestTask"; - private static final String TABLE_NAME_1 = "testTable1"; - private static final String TABLE_NAME_2 = "testTable2"; - private static final String TABLE_NAME_3 = "testTable3"; + // Accessed by the plug-in classes + public static final String TASK_TYPE = "TestTask"; + public static final String TABLE_NAME_1 = "testTable1"; + public static final String TABLE_NAME_2 = "testTable2"; + public static final String TABLE_NAME_3 = "testTable3"; + public static final int NUM_TASKS = 2; + public static final int NUM_CONFIGS = 3; + public static final AtomicBoolean HOLD = new AtomicBoolean(); + public static final AtomicBoolean TASK_START_NOTIFIED = new AtomicBoolean(); + public static final AtomicBoolean TASK_SUCCESS_NOTIFIED = new AtomicBoolean(); + public static final AtomicBoolean TASK_CANCELLED_NOTIFIED = new AtomicBoolean(); + public static final AtomicBoolean TASK_ERROR_NOTIFIED = new AtomicBoolean(); + private static final long STATE_TRANSITION_TIMEOUT_MS = 60_000L; // 1 minute private static final long ZK_CALLBACK_TIMEOUT_MS = 30_000L; // 30 seconds - private static final int NUM_TASKS = 2; - private static final int NUM_CONFIGS = 3; - - private static final AtomicBoolean HOLD = new AtomicBoolean(); - private static final AtomicBoolean TASK_START_NOTIFIED = new AtomicBoolean(); - private static final AtomicBoolean TASK_SUCCESS_NOTIFIED = new AtomicBoolean(); - private static final AtomicBoolean TASK_CANCELLED_NOTIFIED = new AtomicBoolean(); - private static final AtomicBoolean TASK_ERROR_NOTIFIED = new AtomicBoolean(); private PinotHelixTaskResourceManager _helixTaskResourceManager; private PinotTaskManager _taskManager; @@ -96,13 +82,7 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest { _helixTaskResourceManager = _controllerStarter.getHelixTaskResourceManager(); _taskManager = _controllerStarter.getTaskManager(); - // Register the test task generator into task manager - PinotTaskGenerator taskGenerator = new TestTaskGenerator(); - taskGenerator.init(_taskManager.getClusterInfoAccessor()); - _taskManager.registerTaskGenerator(taskGenerator); - - startMinion(Collections.singletonList(new TestTaskExecutorFactory()), - Collections.singletonList(new TestEventObserverFactory())); + startMinion(); } @Test @@ -229,117 +209,4 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest { stopController(); stopZk(); } - - private static class TestTaskGenerator implements PinotTaskGenerator { - - private ClusterInfoAccessor _clusterInfoAccessor; - - @Override - public void init(ClusterInfoAccessor clusterInfoAccessor) { - _clusterInfoAccessor = clusterInfoAccessor; - } - - @Override - public String getTaskType() { - return TASK_TYPE; - } - - @Override - public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) { - assertEquals(tableConfigs.size(), NUM_TASKS); - - // Generate at most 2 tasks - if (_clusterInfoAccessor.getTaskStates(TASK_TYPE).size() >= NUM_TASKS) { - return Collections.emptyList(); - } - - List<PinotTaskConfig> taskConfigs = new ArrayList<>(); - for (TableConfig tableConfig : tableConfigs) { - Map<String, String> configs = new HashMap<>(); - configs.put("tableName", tableConfig.getTableName()); - configs.put("tableType", tableConfig.getTableType().toString()); - taskConfigs.add(new PinotTaskConfig(TASK_TYPE, configs)); - } - return taskConfigs; - } - } - - public static class TestTaskExecutorFactory implements PinotTaskExecutorFactory { - - @Override - public void init(MinionTaskZkMetadataManager zkMetadataManager) { - } - - @Override - public String getTaskType() { - return TASK_TYPE; - } - - @Override - public PinotTaskExecutor create() { - return new BaseTaskExecutor() { - @Override - public Boolean executeTask(PinotTaskConfig pinotTaskConfig) { - assertTrue(MINION_CONTEXT.getDataDir().exists()); - assertNotNull(MINION_CONTEXT.getMinionMetrics()); - assertNotNull(MINION_CONTEXT.getHelixPropertyStore()); - - assertEquals(pinotTaskConfig.getTaskType(), TASK_TYPE); - Map<String, String> configs = pinotTaskConfig.getConfigs(); - assertEquals(configs.size(), NUM_CONFIGS); - String offlineTableName = configs.get("tableName"); - assertEquals(TableNameBuilder.getTableTypeFromTableName(offlineTableName), TableType.OFFLINE); - String rawTableName = TableNameBuilder.extractRawTableName(offlineTableName); - assertTrue(rawTableName.equals(TABLE_NAME_1) || rawTableName.equals(TABLE_NAME_2)); - assertEquals(configs.get("tableType"), TableType.OFFLINE.toString()); - - do { - if (_cancelled) { - throw new TaskCancelledException("Task has been cancelled"); - } - } while (HOLD.get()); - return true; - } - }; - } - } - - public static class TestEventObserverFactory implements MinionEventObserverFactory { - - @Override - public void init(MinionTaskZkMetadataManager zkMetadataManager) { - } - - @Override - public String getTaskType() { - return TASK_TYPE; - } - - @Override - public MinionEventObserver create() { - return new MinionEventObserver() { - @Override - public void notifyTaskStart(PinotTaskConfig pinotTaskConfig) { - TASK_START_NOTIFIED.set(true); - } - - @Override - public void notifyTaskSuccess(PinotTaskConfig pinotTaskConfig, @Nullable Object executionResult) { - assertTrue(executionResult instanceof Boolean); - assertTrue((Boolean) executionResult); - TASK_SUCCESS_NOTIFIED.set(true); - } - - @Override - public void notifyTaskCancelled(PinotTaskConfig pinotTaskConfig) { - TASK_CANCELLED_NOTIFIED.set(true); - } - - @Override - public void notifyTaskError(PinotTaskConfig pinotTaskConfig, Exception exception) { - TASK_ERROR_NOTIFIED.set(true); - } - }; - } - } } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestEventObserverFactory.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestEventObserverFactory.java new file mode 100644 index 0000000..91ff5f1 --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestEventObserverFactory.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests.plugin.minion.tasks; + +import javax.annotation.Nullable; +import org.apache.pinot.core.minion.PinotTaskConfig; +import org.apache.pinot.integration.tests.SimpleMinionClusterIntegrationTest; +import org.apache.pinot.minion.event.MinionEventObserver; +import org.apache.pinot.minion.event.MinionEventObserverFactory; +import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager; +import org.apache.pinot.spi.annotations.minion.EventObserverFactory; + +import static org.testng.Assert.assertTrue; + + +/** + * Event observer factory for {@link SimpleMinionClusterIntegrationTest}. + */ +@EventObserverFactory +public class TestEventObserverFactory implements MinionEventObserverFactory { + + @Override + public void init(MinionTaskZkMetadataManager zkMetadataManager) { + } + + @Override + public String getTaskType() { + return SimpleMinionClusterIntegrationTest.TASK_TYPE; + } + + @Override + public MinionEventObserver create() { + return new MinionEventObserver() { + @Override + public void notifyTaskStart(PinotTaskConfig pinotTaskConfig) { + SimpleMinionClusterIntegrationTest.TASK_START_NOTIFIED.set(true); + } + + @Override + public void notifyTaskSuccess(PinotTaskConfig pinotTaskConfig, @Nullable Object executionResult) { + assertTrue(executionResult instanceof Boolean); + assertTrue((Boolean) executionResult); + SimpleMinionClusterIntegrationTest.TASK_SUCCESS_NOTIFIED.set(true); + } + + @Override + public void notifyTaskCancelled(PinotTaskConfig pinotTaskConfig) { + SimpleMinionClusterIntegrationTest.TASK_CANCELLED_NOTIFIED.set(true); + } + + @Override + public void notifyTaskError(PinotTaskConfig pinotTaskConfig, Exception exception) { + SimpleMinionClusterIntegrationTest.TASK_ERROR_NOTIFIED.set(true); + } + }; + } +} diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestTaskExecutorFactory.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestTaskExecutorFactory.java new file mode 100644 index 0000000..d14509a --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestTaskExecutorFactory.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests.plugin.minion.tasks; + +import java.util.Map; +import org.apache.pinot.core.minion.PinotTaskConfig; +import org.apache.pinot.integration.tests.SimpleMinionClusterIntegrationTest; +import org.apache.pinot.minion.exception.TaskCancelledException; +import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager; +import org.apache.pinot.minion.executor.PinotTaskExecutor; +import org.apache.pinot.minion.executor.PinotTaskExecutorFactory; +import org.apache.pinot.plugin.minion.tasks.BaseTaskExecutor; +import org.apache.pinot.spi.annotations.minion.TaskExecutorFactory; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + + +/** + * Task executor factory for {@link SimpleMinionClusterIntegrationTest}. + */ +@TaskExecutorFactory +public class TestTaskExecutorFactory implements PinotTaskExecutorFactory { + + @Override + public void init(MinionTaskZkMetadataManager zkMetadataManager) { + } + + @Override + public String getTaskType() { + return SimpleMinionClusterIntegrationTest.TASK_TYPE; + } + + @Override + public PinotTaskExecutor create() { + return new BaseTaskExecutor() { + @Override + public Boolean executeTask(PinotTaskConfig pinotTaskConfig) { + assertTrue(MINION_CONTEXT.getDataDir().exists()); + assertNotNull(MINION_CONTEXT.getMinionMetrics()); + assertNotNull(MINION_CONTEXT.getHelixPropertyStore()); + + assertEquals(pinotTaskConfig.getTaskType(), SimpleMinionClusterIntegrationTest.TASK_TYPE); + Map<String, String> configs = pinotTaskConfig.getConfigs(); + assertEquals(configs.size(), SimpleMinionClusterIntegrationTest.NUM_CONFIGS); + String offlineTableName = configs.get("tableName"); + assertEquals(TableNameBuilder.getTableTypeFromTableName(offlineTableName), TableType.OFFLINE); + String rawTableName = TableNameBuilder.extractRawTableName(offlineTableName); + assertTrue(rawTableName.equals(SimpleMinionClusterIntegrationTest.TABLE_NAME_1) || rawTableName + .equals(SimpleMinionClusterIntegrationTest.TABLE_NAME_2)); + assertEquals(configs.get("tableType"), TableType.OFFLINE.toString()); + + do { + if (_cancelled) { + throw new TaskCancelledException("Task has been cancelled"); + } + } while (SimpleMinionClusterIntegrationTest.HOLD.get()); + return true; + } + }; + } +} diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestTaskGenerator.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestTaskGenerator.java new file mode 100644 index 0000000..740d3e5 --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/plugin/minion/tasks/TestTaskGenerator.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests.plugin.minion.tasks; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor; +import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator; +import org.apache.pinot.core.minion.PinotTaskConfig; +import org.apache.pinot.integration.tests.SimpleMinionClusterIntegrationTest; +import org.apache.pinot.spi.annotations.minion.TaskGenerator; +import org.apache.pinot.spi.config.table.TableConfig; + +import static org.testng.Assert.assertEquals; + + +/** + * Task generator for {@link SimpleMinionClusterIntegrationTest}. + */ +@TaskGenerator +public class TestTaskGenerator implements PinotTaskGenerator { + private ClusterInfoAccessor _clusterInfoAccessor; + + @Override + public void init(ClusterInfoAccessor clusterInfoAccessor) { + _clusterInfoAccessor = clusterInfoAccessor; + } + + @Override + public String getTaskType() { + return SimpleMinionClusterIntegrationTest.TASK_TYPE; + } + + @Override + public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) { + assertEquals(tableConfigs.size(), SimpleMinionClusterIntegrationTest.NUM_TASKS); + + // Generate at most 2 tasks + if (_clusterInfoAccessor.getTaskStates(SimpleMinionClusterIntegrationTest.TASK_TYPE).size() + >= SimpleMinionClusterIntegrationTest.NUM_TASKS) { + return Collections.emptyList(); + } + + List<PinotTaskConfig> taskConfigs = new ArrayList<>(); + for (TableConfig tableConfig : tableConfigs) { + Map<String, String> configs = new HashMap<>(); + configs.put("tableName", tableConfig.getTableName()); + configs.put("tableType", tableConfig.getTableType().toString()); + taskConfigs.add(new PinotTaskConfig(SimpleMinionClusterIntegrationTest.TASK_TYPE, configs)); + } + return taskConfigs; + } +} diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/event/EventObserverFactoryRegistry.java b/pinot-minion/src/main/java/org/apache/pinot/minion/event/EventObserverFactoryRegistry.java index 716bdd8..d220f8d 100644 --- a/pinot-minion/src/main/java/org/apache/pinot/minion/event/EventObserverFactoryRegistry.java +++ b/pinot-minion/src/main/java/org/apache/pinot/minion/event/EventObserverFactoryRegistry.java @@ -34,6 +34,11 @@ import org.slf4j.LoggerFactory; public class EventObserverFactoryRegistry { private static final Logger LOGGER = LoggerFactory.getLogger(EventObserverFactoryRegistry.class); + /** + * The package regex pattern for auto-registered {@link MinionEventObserverFactory}. + */ + private static final String EVENT_OBSERVER_FACTORY_PACKAGE_REGEX_PATTERN = ".*\\.plugin\\.minion\\.tasks\\..*"; + private final Map<String, MinionEventObserverFactory> _eventObserverFactoryRegistry = new HashMap<>(); /** @@ -43,8 +48,8 @@ public class EventObserverFactoryRegistry { */ public EventObserverFactoryRegistry(MinionTaskZkMetadataManager zkMetadataManager) { long startTimeMs = System.currentTimeMillis(); - Set<Class<?>> classes = - PinotReflectionUtils.getClassesThroughReflection(".*\\.event\\..*", EventObserverFactory.class); + Set<Class<?>> classes = PinotReflectionUtils + .getClassesThroughReflection(EVENT_OBSERVER_FACTORY_PACKAGE_REGEX_PATTERN, EventObserverFactory.class); for (Class<?> clazz : classes) { EventObserverFactory annotation = clazz.getAnnotation(EventObserverFactory.class); if (annotation.enabled()) { diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java index 6b5f2c2..3ba1c40 100644 --- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java +++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java @@ -34,9 +34,9 @@ public class TaskExecutorFactoryRegistry { private static final Logger LOGGER = LoggerFactory.getLogger(TaskExecutorFactoryRegistry.class); /** - * The package regex pattern for auto-registered {@link TaskExecutorFactory}. + * The package regex pattern for auto-registered {@link PinotTaskExecutorFactory}. */ - private static final String TASK_EXECUTOR_PACKAGE_REGEX_PATTERN = ".*\\.plugin\\.minion\\.tasks\\..*"; + private static final String TASK_EXECUTOR_FACTORY_PACKAGE_REGEX_PATTERN = ".*\\.plugin\\.minion\\.tasks\\..*"; private final Map<String, PinotTaskExecutorFactory> _taskExecutorFactoryRegistry = new HashMap<>(); @@ -68,7 +68,7 @@ public class TaskExecutorFactoryRegistry { public static Set<Class<?>> getTaskExecutorFactoryClasses() { return PinotReflectionUtils - .getClassesThroughReflection(TASK_EXECUTOR_PACKAGE_REGEX_PATTERN, TaskExecutorFactory.class); + .getClassesThroughReflection(TASK_EXECUTOR_FACTORY_PACKAGE_REGEX_PATTERN, TaskExecutorFactory.class); } /** diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/EventObserverFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/EventObserverFactory.java index 2aa8d68..df85a32 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/EventObserverFactory.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/EventObserverFactory.java @@ -29,7 +29,7 @@ import java.lang.annotation.Target; * * NOTE: * - The annotated class must implement the MinionEventObserverFactory interface - * - The annotated class must be under the package of name 'org.apache.pinot.*.event.*' to be auto-registered. + * - The annotated class must be under the package of name 'org.apache.pinot.*.plugin.minion.tasks.*' to be auto-registered. */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE) diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/TaskExecutorFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/TaskExecutorFactory.java index 38166b0..a4c6c4d 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/TaskExecutorFactory.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/TaskExecutorFactory.java @@ -29,7 +29,7 @@ import java.lang.annotation.Target; * * NOTE: * - The annotated class must implement the PinotTaskExecutorFactory interface - * - The annotated class must be under the package of name 'org.apache.pinot.*.executor.*' to be auto-registered. + * - The annotated class must be under the package of name 'org.apache.pinot.*.plugin.minion.tasks.*' to be auto-registered. */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE) diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/TaskGenerator.java b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/TaskGenerator.java index 33c7511..c615708 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/TaskGenerator.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/minion/TaskGenerator.java @@ -29,7 +29,7 @@ import java.lang.annotation.Target; * * NOTE: * - The annotated class must implement the PinotTaskGenerator interface - * - The annotated class must be under the package of name 'org.apache.pinot.plugin.minion.tasks.*' to be auto-registered. + * - The annotated class must be under the package of name 'org.apache.pinot.*.plugin.minion.tasks.*' to be auto-registered. */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org