This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 4d72eb5a58 Segment compaction for upsert real-time tables (#10463) 4d72eb5a58 is described below commit 4d72eb5a58ed7c9874076d9d488afb1a81eeb54f Author: Robert Zych <robert.z...@gmail.com> AuthorDate: Wed Jul 26 10:11:45 2023 -0700 Segment compaction for upsert real-time tables (#10463) --- .../pinot/controller/BaseControllerStarter.java | 31 +-- .../helix/core/minion/ClusterInfoAccessor.java | 36 ++- .../helix/core/minion/PinotTaskManager.java | 11 +- .../apache/pinot/core/common/MinionConstants.java | 19 ++ ...sertCompactionMinionClusterIntegrationTest.java | 228 +++++++++++++++++ .../test/resources/upsert_compaction_test.tar.gz | Bin 0 -> 9405 bytes .../org/apache/pinot/minion/BaseMinionStarter.java | 17 +- .../org/apache/pinot/minion/MinionContext.java | 10 + .../tasks/BaseSingleSegmentConversionExecutor.java | 9 +- .../UpsertCompactionTaskExecutor.java | 248 ++++++++++++++++++ .../UpsertCompactionTaskExecutorFactory.java | 49 ++++ .../UpsertCompactionTaskGenerator.java | 276 +++++++++++++++++++++ ...psertCompactionTaskProgressObserverFactory.java | 33 +++ .../UpsertCompactionTaskExecutorTest.java | 63 +++++ .../UpsertCompactionTaskGeneratorTest.java | 273 ++++++++++++++++++++ .../segment/local/utils/TableConfigUtils.java | 69 ++++-- .../segment/local/utils/TableConfigUtilsTest.java | 136 ++++++---- .../pinot/server/api/resources/TablesResource.java | 96 +++++-- .../pinot/server/api/TablesResourceTest.java | 81 +++--- 19 files changed, 1531 insertions(+), 154 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java index cc294a5341..e32a419ecf 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java @@ -245,19 +245,19 @@ public abstract class BaseControllerStarter implements ServiceStartable { // NOTE: Helix will disconnect the manager and disable the instance if it detects flapping (too frequent disconnect // from ZooKeeper). Setting flapping time window to a small value can avoid this from happening. Helix ignores the // non-positive value, so set the default value as 1. - System.setProperty(SystemPropertyKeys.FLAPPING_TIME_WINDOW, _config - .getProperty(CommonConstants.Helix.CONFIG_OF_CONTROLLER_FLAPPING_TIME_WINDOW_MS, + System.setProperty(SystemPropertyKeys.FLAPPING_TIME_WINDOW, + _config.getProperty(CommonConstants.Helix.CONFIG_OF_CONTROLLER_FLAPPING_TIME_WINDOW_MS, CommonConstants.Helix.DEFAULT_FLAPPING_TIME_WINDOW_MS)); } private void setupHelixClusterConstraints() { - String maxStateTransitions = _config - .getProperty(CommonConstants.Helix.CONFIG_OF_HELIX_INSTANCE_MAX_STATE_TRANSITIONS, + String maxStateTransitions = + _config.getProperty(CommonConstants.Helix.CONFIG_OF_HELIX_INSTANCE_MAX_STATE_TRANSITIONS, CommonConstants.Helix.DEFAULT_HELIX_INSTANCE_MAX_STATE_TRANSITIONS); Map<ClusterConstraints.ConstraintAttribute, String> constraintAttributes = new HashMap<>(); constraintAttributes.put(ClusterConstraints.ConstraintAttribute.INSTANCE, ".*"); - constraintAttributes - .put(ClusterConstraints.ConstraintAttribute.MESSAGE_TYPE, Message.MessageType.STATE_TRANSITION.name()); + constraintAttributes.put(ClusterConstraints.ConstraintAttribute.MESSAGE_TYPE, + Message.MessageType.STATE_TRANSITION.name()); ConstraintItem constraintItem = new ConstraintItem(constraintAttributes, maxStateTransitions); _helixControllerManager.getClusterManagmentTool() @@ -371,8 +371,8 @@ public abstract class BaseControllerStarter implements ServiceStartable { private void setUpPinotController() { // install default SSL context if necessary (even if not force-enabled everywhere) TlsConfig tlsDefaults = TlsUtils.extractTlsConfig(_config, ControllerConf.CONTROLLER_TLS_PREFIX); - if (StringUtils.isNotBlank(tlsDefaults.getKeyStorePath()) || StringUtils - .isNotBlank(tlsDefaults.getTrustStorePath())) { + if (StringUtils.isNotBlank(tlsDefaults.getKeyStorePath()) || StringUtils.isNotBlank( + tlsDefaults.getTrustStorePath())) { LOGGER.info("Installing default SSL context for any client requests"); TlsUtils.installDefaultSSLSocketFactory(tlsDefaults); } @@ -392,8 +392,9 @@ public abstract class BaseControllerStarter implements ServiceStartable { _config.getProperty(CommonConstants.Controller.CONFIG_OF_CONTROLLER_QUERY_REWRITER_CLASS_NAMES)); LOGGER.info("Initializing Helix participant manager"); - _helixParticipantManager = HelixManagerFactory - .getZKHelixManager(_helixClusterName, _helixParticipantInstanceId, InstanceType.PARTICIPANT, _helixZkURL); + _helixParticipantManager = + HelixManagerFactory.getZKHelixManager(_helixClusterName, _helixParticipantInstanceId, InstanceType.PARTICIPANT, + _helixZkURL); // LeadControllerManager needs to be initialized before registering as Helix participant. LOGGER.info("Initializing lead controller manager"); @@ -502,8 +503,7 @@ public abstract class BaseControllerStarter implements ServiceStartable { LOGGER.info("Starting controller admin application on: {}", ListenerConfigUtil.toString(_listenerConfigs)); _adminApp.start(_listenerConfigs); - _controllerMetrics.addCallbackGauge("dataDir.exists", - () -> new File(_config.getDataDir()).exists() ? 1L : 0L); + _controllerMetrics.addCallbackGauge("dataDir.exists", () -> new File(_config.getDataDir()).exists() ? 1L : 0L); _controllerMetrics.addCallbackGauge("dataDir.fileOpLatencyMs", () -> { File dataDir = new File(_config.getDataDir()); if (dataDir.exists()) { @@ -673,7 +673,7 @@ public abstract class BaseControllerStarter implements ServiceStartable { _taskManagerStatusCache = getTaskManagerStatusCache(); _taskManager = new PinotTaskManager(_helixTaskResourceManager, _helixResourceManager, _leadControllerManager, _config, - _controllerMetrics, _taskManagerStatusCache); + _controllerMetrics, _taskManagerStatusCache, _executorService, _connectionManager); periodicTasks.add(_taskManager); _retentionManager = new RetentionManager(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics); @@ -693,8 +693,9 @@ public abstract class BaseControllerStarter implements ServiceStartable { new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics, _executorService); periodicTasks.add(_segmentStatusChecker); - _realtimeConsumerMonitor = new RealtimeConsumerMonitor(_config, _helixResourceManager, _leadControllerManager, - _controllerMetrics, _executorService); + _realtimeConsumerMonitor = + new RealtimeConsumerMonitor(_config, _helixResourceManager, _leadControllerManager, _controllerMetrics, + _executorService); periodicTasks.add(_realtimeConsumerMonitor); _segmentRelocator = new SegmentRelocator(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics, _executorService, _connectionManager); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java index bcb1491988..5551c04d7c 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java @@ -21,7 +21,9 @@ package org.apache.pinot.controller.helix.core.minion; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.Executor; import javax.annotation.Nullable; +import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager; import org.apache.helix.model.HelixConfigScope; import org.apache.helix.model.builder.HelixConfigScopeBuilder; import org.apache.helix.task.TaskState; @@ -51,15 +53,20 @@ public class ClusterInfoAccessor { private final ControllerConf _controllerConf; private final ControllerMetrics _controllerMetrics; private final LeadControllerManager _leadControllerManager; + private final Executor _executor; + private final MultiThreadedHttpConnectionManager _connectionManager; public ClusterInfoAccessor(PinotHelixResourceManager pinotHelixResourceManager, PinotHelixTaskResourceManager pinotHelixTaskResourceManager, ControllerConf controllerConf, - ControllerMetrics controllerMetrics, LeadControllerManager leadControllerManager) { + ControllerMetrics controllerMetrics, LeadControllerManager leadControllerManager, Executor executor, + MultiThreadedHttpConnectionManager connectionManager) { _pinotHelixResourceManager = pinotHelixResourceManager; _pinotHelixTaskResourceManager = pinotHelixTaskResourceManager; _controllerConf = controllerConf; _controllerMetrics = controllerMetrics; _leadControllerManager = leadControllerManager; + _executor = executor; + _connectionManager = connectionManager; } /** @@ -94,6 +101,20 @@ public class ClusterInfoAccessor { return ZKMetadataProvider.getSegmentsZKMetadata(_pinotHelixResourceManager.getPropertyStore(), tableNameWithType); } + /** + * Get shared executor + */ + public Executor getExecutor() { + return _executor; + } + + /** + * Get shared connection manager + */ + public MultiThreadedHttpConnectionManager getConnectionManager() { + return _connectionManager; + } + /** * Fetches the ZNRecord under MINION_TASK_METADATA/${tableNameWithType}/${taskType} for the given * taskType and tableNameWithType @@ -114,8 +135,8 @@ public class ClusterInfoAccessor { */ @Nullable public SegmentLineage getSegmentLineage(String tableNameWithType) { - return SegmentLineageAccessHelper - .getSegmentLineage(_pinotHelixResourceManager.getPropertyStore(), tableNameWithType); + return SegmentLineageAccessHelper.getSegmentLineage(_pinotHelixResourceManager.getPropertyStore(), + tableNameWithType); } /** @@ -127,8 +148,8 @@ public class ClusterInfoAccessor { * @param expectedVersion The expected version of data to be overwritten. Set to -1 to override version check. */ public void setMinionTaskMetadata(BaseTaskMetadata taskMetadata, String taskType, int expectedVersion) { - MinionTaskMetadataUtils - .persistTaskMetadata(_pinotHelixResourceManager.getPropertyStore(), taskType, taskMetadata, expectedVersion); + MinionTaskMetadataUtils.persistTaskMetadata(_pinotHelixResourceManager.getPropertyStore(), taskType, taskMetadata, + expectedVersion); } /** @@ -175,8 +196,9 @@ public class ClusterInfoAccessor { * @return cluster config */ public String getClusterConfig(String configName) { - HelixConfigScope helixConfigScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER) - .forCluster(_pinotHelixResourceManager.getHelixClusterName()).build(); + HelixConfigScope helixConfigScope = + new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster( + _pinotHelixResourceManager.getHelixClusterName()).build(); Map<String, String> configMap = _pinotHelixResourceManager.getHelixAdmin().getConfig(helixConfigScope, Collections.singletonList(configName)); return configMap != null ? configMap.get(configName) : null; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java index e7a7584b76..4f10b6a5c5 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java @@ -32,8 +32,10 @@ import java.util.Properties; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; import javax.annotation.Nullable; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager; import org.apache.helix.AccessOption; import org.apache.helix.task.TaskState; import org.apache.helix.zookeeper.zkclient.IZkChildListener; @@ -114,7 +116,8 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager, PinotHelixResourceManager helixResourceManager, LeadControllerManager leadControllerManager, ControllerConf controllerConf, ControllerMetrics controllerMetrics, - TaskManagerStatusCache<TaskGeneratorMostRecentRunInfo> taskManagerStatusCache) { + TaskManagerStatusCache<TaskGeneratorMostRecentRunInfo> taskManagerStatusCache, Executor executor, + MultiThreadedHttpConnectionManager connectionManager) { super("PinotTaskManager", controllerConf.getTaskManagerFrequencyInSeconds(), controllerConf.getPinotTaskManagerInitialDelaySeconds(), helixResourceManager, leadControllerManager, controllerMetrics); @@ -122,7 +125,7 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { _taskManagerStatusCache = taskManagerStatusCache; _clusterInfoAccessor = new ClusterInfoAccessor(helixResourceManager, helixTaskResourceManager, controllerConf, controllerMetrics, - leadControllerManager); + leadControllerManager, executor, connectionManager); _taskGeneratorRegistry = new TaskGeneratorRegistry(_clusterInfoAccessor); _skipLateCronSchedule = controllerConf.isSkipLateCronSchedule(); _maxCronScheduleDelayInSeconds = controllerConf.getMaxCronScheduleDelayInSeconds(); @@ -561,8 +564,8 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { long successRunTimestamp = System.currentTimeMillis(); for (TableConfig tableConfig : enabledTableConfigs) { _taskManagerStatusCache.saveTaskGeneratorInfo(tableConfig.getTableName(), taskGenerator.getTaskType(), - taskGeneratorMostRecentRunInfo -> taskGeneratorMostRecentRunInfo.addErrorRunMessage( - successRunTimestamp, errors.toString())); + taskGeneratorMostRecentRunInfo -> taskGeneratorMostRecentRunInfo.addErrorRunMessage(successRunTimestamp, + errors.toString())); // before the first task schedule, the follow gauge metric will be empty // TODO: find a better way to report task generation information _controllerMetrics.setOrUpdateTableGauge(tableConfig.getTableName(), taskGenerator.getTaskType(), diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java index 8e4c5cfc1c..25c5427cf6 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java @@ -139,4 +139,23 @@ public class MinionConstants { public static final String CONFIG_NUMBER_CONCURRENT_TASKS_PER_INSTANCE = "SegmentGenerationAndPushTask.numConcurrentTasksPerInstance"; } + + public static class UpsertCompactionTask { + public static final String TASK_TYPE = "UpsertCompactionTask"; + /** + * The time period to wait before picking segments for this task + * e.g. if set to "2d", no task will be scheduled for a time window younger than 2 days + */ + public static final String BUFFER_TIME_PERIOD_KEY = "bufferTimePeriod"; + /** + * The maximum percent of old records allowed for a completed segment. + * e.g. if the percent surpasses 30, then the segment may be compacted + */ + public static final String INVALID_RECORDS_THRESHOLD_PERCENT = "invalidRecordsThresholdPercent"; + /** + * The maximum count of old records for a completed segment + * e.g. if the count surpasses 100k, then the segment may be compacted + */ + public static final String INVALID_RECORDS_THRESHOLD_COUNT = "invalidRecordsThresholdCount"; + } } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertCompactionMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertCompactionMinionClusterIntegrationTest.java new file mode 100644 index 0000000000..de645ca749 --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertCompactionMinionClusterIntegrationTest.java @@ -0,0 +1,228 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.commons.io.FileUtils; +import org.apache.helix.task.TaskState; +import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager; +import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableTaskConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.util.TestUtils; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; + + +public class UpsertCompactionMinionClusterIntegrationTest extends BaseClusterIntegrationTest { + protected PinotHelixTaskResourceManager _helixTaskResourceManager; + protected PinotTaskManager _taskManager; + private static final String PRIMARY_KEY_COL = "clientId"; + private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(DEFAULT_TABLE_NAME); + private static List<File> _avroFiles; + private TableConfig _tableConfig; + private Schema _schema; + + @Override + protected String getSchemaFileName() { + return "upsert_upload_segment_test.schema"; + } + + @Override + protected String getSchemaName() { + return "upsertSchema"; + } + + @Override + protected String getAvroTarFileName() { + return "upsert_compaction_test.tar.gz"; + } + + @Override + protected String getPartitionColumn() { + return PRIMARY_KEY_COL; + } + + private TableTaskConfig getCompactionTaskConfig() { + Map<String, String> tableTaskConfigs = new HashMap<>(); + tableTaskConfigs.put(MinionConstants.UpsertCompactionTask.BUFFER_TIME_PERIOD_KEY, "0d"); + tableTaskConfigs.put(MinionConstants.UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_PERCENT, "1"); + tableTaskConfigs.put(MinionConstants.UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_COUNT, "10"); + return new TableTaskConfig( + Collections.singletonMap(MinionConstants.UpsertCompactionTask.TASK_TYPE, tableTaskConfigs)); + } + + @BeforeClass + public void setUp() + throws Exception { + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); + + // Start the Pinot cluster + startZk(); + startController(); + startBroker(); + startServers(1); + + // Unpack the Avro files + _avroFiles = unpackAvroData(_tempDir); + + startKafka(); + + // Create and upload the schema and table config + _schema = createSchema(); + addSchema(_schema); + _tableConfig = createUpsertTableConfig(_avroFiles.get(0), PRIMARY_KEY_COL, null, getNumKafkaPartitions()); + _tableConfig.setTaskConfig(getCompactionTaskConfig()); + addTableConfig(_tableConfig); + + ClusterIntegrationTestUtils.buildSegmentsFromAvro(_avroFiles, _tableConfig, _schema, 0, _segmentDir, _tarDir); + + startMinion(); + _helixTaskResourceManager = _controllerStarter.getHelixTaskResourceManager(); + _taskManager = _controllerStarter.getTaskManager(); + } + + @BeforeMethod + public void beforeMethod() + throws Exception { + // Create and upload segments + uploadSegments(getTableName(), TableType.REALTIME, _tarDir); + } + + protected void waitForAllDocsLoaded(long timeoutMs, long expectedCount) + throws Exception { + TestUtils.waitForCondition(aVoid -> { + try { + return getCurrentCountStarResultWithoutUpsert() == expectedCount; + } catch (Exception e) { + return null; + } + }, 100L, timeoutMs, "Failed to load all documents"); + assertEquals(getCurrentCountStarResult(), getCountStarResult()); + } + + private long getCurrentCountStarResultWithoutUpsert() { + return getPinotConnection().execute("SELECT COUNT(*) FROM " + getTableName() + " OPTION(skipUpsert=true)") + .getResultSet(0).getLong(0); + } + + private long getSalary() { + return getPinotConnection().execute("SELECT salary FROM " + getTableName() + " WHERE clientId=100001") + .getResultSet(0).getLong(0); + } + + @Override + protected long getCountStarResult() { + return 3; + } + + @AfterMethod + public void afterMethod() + throws Exception { + String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(getTableName()); + + // Test dropping all segments one by one + List<String> segments = listSegments(realtimeTableName); + assertFalse(segments.isEmpty()); + for (String segment : segments) { + dropSegment(realtimeTableName, segment); + } + // NOTE: There is a delay to remove the segment from property store + TestUtils.waitForCondition((aVoid) -> { + try { + return listSegments(realtimeTableName).isEmpty(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }, 60_000L, "Failed to drop the segments"); + + stopServer(); + startServers(1); + } + + @AfterClass + public void tearDown() + throws IOException { + String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(getTableName()); + dropRealtimeTable(realtimeTableName); + stopMinion(); + stopServer(); + stopBroker(); + stopController(); + stopKafka(); + stopZk(); + FileUtils.deleteDirectory(_tempDir); + } + + @Test + public void testCompaction() + throws Exception { + waitForAllDocsLoaded(600_000L, 283); + assertEquals(getSalary(), 9747108); + + assertNotNull(_taskManager.scheduleTasks(REALTIME_TABLE_NAME).get(MinionConstants.UpsertCompactionTask.TASK_TYPE)); + waitForTaskToComplete(); + waitForAllDocsLoaded(600_000L, 3); + assertEquals(getSalary(), 9747108); + } + + @Test + public void testCompactionDeletesSegments() + throws Exception { + pushAvroIntoKafka(_avroFiles); + // Wait for all documents loaded + waitForAllDocsLoaded(600_000L, 566); + assertEquals(getSalary(), 9747108); + + assertNull(_taskManager.scheduleTasks(REALTIME_TABLE_NAME).get(MinionConstants.UpsertCompactionTask.TASK_TYPE)); + waitForTaskToComplete(); + waitForAllDocsLoaded(600_000L, 283); + assertEquals(getSalary(), 9747108); + } + + protected void waitForTaskToComplete() { + TestUtils.waitForCondition(input -> { + // Check task state + for (TaskState taskState : _helixTaskResourceManager.getTaskStates(MinionConstants.UpsertCompactionTask.TASK_TYPE) + .values()) { + if (taskState != TaskState.COMPLETED) { + return false; + } + } + return true; + }, 600_000L, "Failed to complete task"); + } +} diff --git a/pinot-integration-tests/src/test/resources/upsert_compaction_test.tar.gz b/pinot-integration-tests/src/test/resources/upsert_compaction_test.tar.gz new file mode 100644 index 0000000000..76b0a50126 Binary files /dev/null and b/pinot-integration-tests/src/test/resources/upsert_compaction_test.tar.gz differ diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java b/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java index 845ec95e66..3d3bfcbe0b 100644 --- a/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java +++ b/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java @@ -124,8 +124,8 @@ public abstract class BaseMinionStarter implements ServiceStartable { // NOTE: Helix will disconnect the manager and disable the instance if it detects flapping (too frequent disconnect // from ZooKeeper). Setting flapping time window to a small value can avoid this from happening. Helix ignores the // non-positive value, so set the default value as 1. - System.setProperty(SystemPropertyKeys.FLAPPING_TIME_WINDOW, _config - .getProperty(CommonConstants.Helix.CONFIG_OF_MINION_FLAPPING_TIME_WINDOW_MS, + System.setProperty(SystemPropertyKeys.FLAPPING_TIME_WINDOW, + _config.getProperty(CommonConstants.Helix.CONFIG_OF_MINION_FLAPPING_TIME_WINDOW_MS, CommonConstants.Helix.DEFAULT_FLAPPING_TIME_WINDOW_MS)); } @@ -174,8 +174,8 @@ public abstract class BaseMinionStarter implements ServiceStartable { // Initialize data directory LOGGER.info("Initializing data directory"); - File dataDir = new File(_config - .getProperty(CommonConstants.Helix.Instance.DATA_DIR_KEY, CommonConstants.Minion.DEFAULT_INSTANCE_DATA_DIR)); + File dataDir = new File(_config.getProperty(CommonConstants.Helix.Instance.DATA_DIR_KEY, + CommonConstants.Minion.DEFAULT_INSTANCE_DATA_DIR)); if (dataDir.exists()) { FileUtils.cleanDirectory(dataDir); } else { @@ -195,8 +195,8 @@ public abstract class BaseMinionStarter implements ServiceStartable { // Install default SSL context if necessary (even if not force-enabled everywhere) TlsConfig tlsDefaults = TlsUtils.extractTlsConfig(_config, CommonConstants.Minion.MINION_TLS_PREFIX); - if (StringUtils.isNotBlank(tlsDefaults.getKeyStorePath()) || StringUtils - .isNotBlank(tlsDefaults.getTrustStorePath())) { + if (StringUtils.isNotBlank(tlsDefaults.getKeyStorePath()) || StringUtils.isNotBlank( + tlsDefaults.getTrustStorePath())) { LOGGER.info("Installing default SSL context for any client requests"); TlsUtils.installDefaultSSLSocketFactory(tlsDefaults); } @@ -237,8 +237,7 @@ public abstract class BaseMinionStarter implements ServiceStartable { PinotConfiguration segmentUploaderConfig = _config.subset(CommonConstants.Minion.PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER); if (segmentUploaderConfig.isEmpty()) { - segmentUploaderConfig = - _config.subset(CommonConstants.Minion.DEPRECATED_PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER); + segmentUploaderConfig = _config.subset(CommonConstants.Minion.DEPRECATED_PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER); } PinotConfiguration httpsConfig = segmentUploaderConfig.subset(CommonConstants.HTTPS_PROTOCOL); if (httpsConfig.getProperty(HTTPS_ENABLED, false)) { @@ -254,7 +253,7 @@ public abstract class BaseMinionStarter implements ServiceStartable { _helixManager.connect(); updateInstanceConfigIfNeeded(); minionContext.setHelixPropertyStore(_helixManager.getHelixPropertyStore()); - + minionContext.setHelixManager(_helixManager); LOGGER.info("Starting minion admin application on: {}", ListenerConfigUtil.toString(_listenerConfigs)); _minionAdminApplication = new MinionAdminApiApplication(_instanceId, _config); _minionAdminApplication.start(_listenerConfigs); diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/MinionContext.java b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionContext.java index 6ccd364677..8b6e0dc4c9 100644 --- a/pinot-minion/src/main/java/org/apache/pinot/minion/MinionContext.java +++ b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionContext.java @@ -20,6 +20,7 @@ package org.apache.pinot.minion; import java.io.File; import javax.net.ssl.SSLContext; +import org.apache.helix.HelixManager; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.metrics.MinionMetrics; @@ -43,6 +44,7 @@ public class MinionContext { private File _dataDir; private MinionMetrics _minionMetrics; private ZkHelixPropertyStore<ZNRecord> _helixPropertyStore; + private HelixManager _helixManager; // For segment upload private SSLContext _sslContext; @@ -107,4 +109,12 @@ public class MinionContext { public void setTaskAuthProvider(AuthProvider taskAuthProvider) { _taskAuthProvider = taskAuthProvider; } + + public void setHelixManager(HelixManager helixManager) { + _helixManager = helixManager; + } + + public HelixManager getHelixManager() { + return _helixManager; + } } diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java index 51c7f98543..001ce26d46 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java @@ -131,9 +131,12 @@ public abstract class BaseSingleSegmentConversionExecutor extends BaseTaskExecut "Converted segment name: %s does not match original segment name: %s", segmentConversionResult.getSegmentName(), segmentName); + File convertedSegmentDir = segmentConversionResult.getFile(); + if (convertedSegmentDir == null) { + return segmentConversionResult; + } // Tar the converted segment _eventObserver.notifyProgress(_pinotTaskConfig, "Compressing segment: " + segmentName); - File convertedSegmentDir = segmentConversionResult.getFile(); File convertedTarredSegmentFile = new File(tempDataDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION); TarGzCompressionUtils.createTarGzFile(convertedSegmentDir, convertedTarredSegmentFile); @@ -184,8 +187,8 @@ public abstract class BaseSingleSegmentConversionExecutor extends BaseTaskExecut TableNameBuilder.extractRawTableName(tableNameWithType)); NameValuePair tableTypeParameter = new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE, TableNameBuilder.getTableTypeFromTableName(tableNameWithType).toString()); - List<NameValuePair> parameters = Arrays.asList(enableParallelPushProtectionParameter, tableNameParameter, - tableTypeParameter); + List<NameValuePair> parameters = + Arrays.asList(enableParallelPushProtectionParameter, tableNameParameter, tableTypeParameter); // Upload the tarred segment _eventObserver.notifyProgress(_pinotTaskConfig, "Uploading segment: " + segmentName); diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java new file mode 100644 index 0000000000..aa37ac871a --- /dev/null +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.minion.tasks.upsertcompaction; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nullable; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.core.Response; +import org.apache.commons.io.FileUtils; +import org.apache.helix.HelixAdmin; +import org.apache.helix.HelixManager; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.InstanceConfig; +import org.apache.http.client.utils.URIBuilder; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier; +import org.apache.pinot.common.utils.config.InstanceUtils; +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.core.minion.PinotTaskConfig; +import org.apache.pinot.plugin.minion.tasks.BaseSingleSegmentConversionExecutor; +import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult; +import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader; +import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; +import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.data.readers.RecordReader; +import org.apache.pinot.spi.data.readers.RecordReaderConfig; +import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel; +import org.roaringbitmap.PeekableIntIterator; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class UpsertCompactionTaskExecutor extends BaseSingleSegmentConversionExecutor { + private static final Logger LOGGER = LoggerFactory.getLogger(UpsertCompactionTaskExecutor.class); + private static HelixManager _helixManager = MINION_CONTEXT.getHelixManager(); + private static HelixAdmin _clusterManagementTool = _helixManager.getClusterManagmentTool(); + private static String _clusterName = _helixManager.getClusterName(); + + private class CompactedRecordReader implements RecordReader { + private final PinotSegmentRecordReader _pinotSegmentRecordReader; + private final PeekableIntIterator _validDocIdsIterator; + // Reusable generic row to store the next row to return + GenericRow _nextRow = new GenericRow(); + // Flag to mark whether we need to fetch another row + boolean _nextRowReturned = true; + + CompactedRecordReader(File indexDir, ImmutableRoaringBitmap validDocIds) { + _pinotSegmentRecordReader = new PinotSegmentRecordReader(); + _pinotSegmentRecordReader.init(indexDir, null, null); + _validDocIdsIterator = validDocIds.getIntIterator(); + } + + @Override + public void init(File dataFile, Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig) { + } + + @Override + public boolean hasNext() { + if (!_validDocIdsIterator.hasNext() && _nextRowReturned) { + return false; + } + + // If next row has not been returned, return true + if (!_nextRowReturned) { + return true; + } + + // Try to get the next row to return + if (_validDocIdsIterator.hasNext()) { + int docId = _validDocIdsIterator.next(); + _nextRow.clear(); + _pinotSegmentRecordReader.getRecord(docId, _nextRow); + _nextRowReturned = false; + return true; + } + + // Cannot find next row to return, return false + return false; + } + + @Override + public GenericRow next() { + return next(new GenericRow()); + } + + @Override + public GenericRow next(GenericRow reuse) { + Preconditions.checkState(!_nextRowReturned); + reuse.init(_nextRow); + _nextRowReturned = true; + return reuse; + } + + @Override + public void rewind() { + _pinotSegmentRecordReader.rewind(); + _nextRowReturned = true; + } + + @Override + public void close() + throws IOException { + _pinotSegmentRecordReader.close(); + } + } + + @Override + protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File indexDir, File workingDir) + throws Exception { + _eventObserver.notifyProgress(pinotTaskConfig, "Compacting segment: " + indexDir); + Map<String, String> configs = pinotTaskConfig.getConfigs(); + String segmentName = configs.get(MinionConstants.SEGMENT_NAME_KEY); + String taskType = pinotTaskConfig.getTaskType(); + LOGGER.info("Starting task: {} with configs: {}", taskType, configs); + long startMillis = System.currentTimeMillis(); + + String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY); + TableConfig tableConfig = getTableConfig(tableNameWithType); + ImmutableRoaringBitmap validDocIds = getValidDocIds(tableNameWithType, configs); + + if (validDocIds.isEmpty()) { + // prevents empty segment generation + LOGGER.info("validDocIds is empty, skip the task. Table: {}, segment: {}", tableNameWithType, segmentName); + if (indexDir.exists() && !FileUtils.deleteQuietly(indexDir)) { + LOGGER.warn("Failed to delete input segment: {}", indexDir.getAbsolutePath()); + } + if (!FileUtils.deleteQuietly(workingDir)) { + LOGGER.warn("Failed to delete working directory: {}", workingDir.getAbsolutePath()); + } + return new SegmentConversionResult.Builder().setTableNameWithType(tableNameWithType).setSegmentName(segmentName) + .build(); + } + + SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir); + try (CompactedRecordReader compactedRecordReader = new CompactedRecordReader(indexDir, validDocIds)) { + SegmentGeneratorConfig config = getSegmentGeneratorConfig(workingDir, tableConfig, segmentMetadata, segmentName); + SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); + driver.init(config, compactedRecordReader); + driver.build(); + } + + File compactedSegmentFile = new File(workingDir, segmentName); + SegmentConversionResult result = + new SegmentConversionResult.Builder().setFile(compactedSegmentFile).setTableNameWithType(tableNameWithType) + .setSegmentName(segmentName).build(); + + long endMillis = System.currentTimeMillis(); + LOGGER.info("Finished task: {} with configs: {}. Total time: {}ms", taskType, configs, (endMillis - startMillis)); + + return result; + } + + private static SegmentGeneratorConfig getSegmentGeneratorConfig(File workingDir, TableConfig tableConfig, + SegmentMetadataImpl segmentMetadata, String segmentName) { + SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, segmentMetadata.getSchema()); + config.setOutDir(workingDir.getPath()); + config.setSegmentName(segmentName); + // Keep index creation time the same as original segment because both segments use the same raw data. + // This way, for REFRESH case, when new segment gets pushed to controller, we can use index creation time to + // identify if the new pushed segment has newer data than the existing one. + config.setCreationTime(String.valueOf(segmentMetadata.getIndexCreationTime())); + + // The time column type info is not stored in the segment metadata. + // Keep segment start/end time to properly handle time column type other than EPOCH (e.g.SIMPLE_FORMAT). + if (segmentMetadata.getTimeInterval() != null) { + config.setTimeColumnName(tableConfig.getValidationConfig().getTimeColumnName()); + config.setStartTime(Long.toString(segmentMetadata.getStartTime())); + config.setEndTime(Long.toString(segmentMetadata.getEndTime())); + config.setSegmentTimeUnit(segmentMetadata.getTimeUnit()); + } + return config; + } + + // TODO: Consider moving this method to a more appropriate class (eg ServerSegmentMetadataReader) + private static ImmutableRoaringBitmap getValidDocIds(String tableNameWithType, Map<String, String> configs) + throws URISyntaxException { + String segmentName = configs.get(MinionConstants.SEGMENT_NAME_KEY); + String server = getServer(segmentName, tableNameWithType); + + // get the url for the validDocIds for the server + InstanceConfig instanceConfig = _clusterManagementTool.getInstanceConfig(_clusterName, server); + String endpoint = InstanceUtils.getServerAdminEndpoint(instanceConfig); + String url = + new URIBuilder(endpoint).setPath(String.format("/segments/%s/%s/validDocIds", tableNameWithType, segmentName)) + .toString(); + + // get the validDocIds from that server + Response response = ClientBuilder.newClient().target(url).request().get(Response.class); + Preconditions.checkState(response.getStatus() == Response.Status.OK.getStatusCode(), + "Unable to retrieve validDocIds from %s", url); + byte[] snapshot = response.readEntity(byte[].class); + ImmutableRoaringBitmap validDocIds = new ImmutableRoaringBitmap(ByteBuffer.wrap(snapshot)); + return validDocIds; + } + + @VisibleForTesting + public static String getServer(String segmentName, String tableNameWithType) { + ExternalView externalView = _clusterManagementTool.getResourceExternalView(_clusterName, tableNameWithType); + if (externalView == null) { + throw new IllegalStateException("External view does not exist for table: " + tableNameWithType); + } + Map<String, String> instanceStateMap = externalView.getStateMap(segmentName); + if (instanceStateMap == null) { + throw new IllegalStateException("Failed to find segment: " + segmentName); + } + for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) { + if (entry.getValue().equals(SegmentStateModel.ONLINE)) { + return entry.getKey(); + } + } + throw new IllegalStateException("Failed to find ONLINE server for segment: " + segmentName); + } + + @Override + protected SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier(PinotTaskConfig pinotTaskConfig, + SegmentConversionResult segmentConversionResult) { + return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE, + Collections.singletonMap(MinionConstants.UpsertCompactionTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX, + String.valueOf(System.currentTimeMillis()))); + } +} diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutorFactory.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutorFactory.java new file mode 100644 index 0000000000..8989892f77 --- /dev/null +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutorFactory.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.minion.tasks.upsertcompaction; + +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.minion.MinionConf; +import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager; +import org.apache.pinot.minion.executor.PinotTaskExecutor; +import org.apache.pinot.minion.executor.PinotTaskExecutorFactory; +import org.apache.pinot.spi.annotations.minion.TaskExecutorFactory; + + +@TaskExecutorFactory +public class UpsertCompactionTaskExecutorFactory implements PinotTaskExecutorFactory { + + @Override + public void init(MinionTaskZkMetadataManager zkMetadataManager) { + } + + @Override + public void init(MinionTaskZkMetadataManager zkMetadataManager, MinionConf minionConf) { + } + + @Override + public String getTaskType() { + return MinionConstants.UpsertCompactionTask.TASK_TYPE; + } + + @Override + public PinotTaskExecutor create() { + return new UpsertCompactionTaskExecutor(); + } +} diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java new file mode 100644 index 0000000000..aa84ffefa5 --- /dev/null +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java @@ -0,0 +1,276 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.minion.tasks.upsertcompaction; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.BiMap; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.http.client.utils.URIBuilder; +import org.apache.pinot.common.exception.InvalidConfigException; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator; +import org.apache.pinot.controller.util.CompletionServiceHelper; +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.core.common.MinionConstants.UpsertCompactionTask; +import org.apache.pinot.core.minion.PinotTaskConfig; +import org.apache.pinot.spi.annotations.minion.TaskGenerator; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.TimeUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +@TaskGenerator +public class UpsertCompactionTaskGenerator extends BaseTaskGenerator { + private static final Logger LOGGER = LoggerFactory.getLogger(UpsertCompactionTaskGenerator.class); + private static final String DEFAULT_BUFFER_PERIOD = "7d"; + private static final double DEFAULT_INVALID_RECORDS_THRESHOLD_PERCENT = 0.0; + private static final long DEFAULT_INVALID_RECORDS_THRESHOLD_COUNT = 0; + + public static class SegmentSelectionResult { + + private List<SegmentZKMetadata> _segmentsForCompaction; + + private List<String> _segmentsForDeletion; + + SegmentSelectionResult(List<SegmentZKMetadata> segmentsForCompaction, List<String> segmentsForDeletion) { + _segmentsForCompaction = segmentsForCompaction; + _segmentsForDeletion = segmentsForDeletion; + } + + public List<SegmentZKMetadata> getSegmentsForCompaction() { + return _segmentsForCompaction; + } + + public List<String> getSegmentsForDeletion() { + return _segmentsForDeletion; + } + } + + @Override + public String getTaskType() { + return MinionConstants.UpsertCompactionTask.TASK_TYPE; + } + + @Override + public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) { + String taskType = MinionConstants.UpsertCompactionTask.TASK_TYPE; + List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>(); + for (TableConfig tableConfig : tableConfigs) { + if (!validate(tableConfig)) { + continue; + } + + String tableNameWithType = tableConfig.getTableName(); + LOGGER.info("Start generating task configs for table: {}", tableNameWithType); + + Map<String, String> taskConfigs = tableConfig.getTaskConfig().getConfigsForTaskType(taskType); + List<SegmentZKMetadata> completedSegments = getCompletedSegments(tableNameWithType, taskConfigs); + + if (completedSegments.isEmpty()) { + LOGGER.info("No completed segments were eligible for compaction for table: {}", tableNameWithType); + continue; + } + + // get server to segment mappings + PinotHelixResourceManager pinotHelixResourceManager = _clusterInfoAccessor.getPinotHelixResourceManager(); + Map<String, List<String>> serverToSegments = pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType); + BiMap<String, String> serverToEndpoints; + try { + serverToEndpoints = pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet()); + } catch (InvalidConfigException e) { + throw new RuntimeException(e); + } + + Map<String, SegmentZKMetadata> completedSegmentsMap = + completedSegments.stream().collect(Collectors.toMap(SegmentZKMetadata::getSegmentName, Function.identity())); + + List<String> validDocIdUrls; + try { + validDocIdUrls = getValidDocIdMetadataUrls(serverToSegments, serverToEndpoints, tableNameWithType, + completedSegmentsMap.keySet()); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + + // request the urls from the servers + CompletionServiceHelper completionServiceHelper = + new CompletionServiceHelper(_clusterInfoAccessor.getExecutor(), _clusterInfoAccessor.getConnectionManager(), + serverToEndpoints.inverse()); + + CompletionServiceHelper.CompletionServiceResponse serviceResponse = + completionServiceHelper.doMultiGetRequest(validDocIdUrls, tableNameWithType, false, 3000); + + SegmentSelectionResult segmentSelectionResult = + processValidDocIdMetadata(taskConfigs, completedSegmentsMap, serviceResponse._httpResponses.entrySet()); + + if (!segmentSelectionResult.getSegmentsForDeletion().isEmpty()) { + pinotHelixResourceManager.deleteSegments(tableNameWithType, segmentSelectionResult.getSegmentsForDeletion(), + "0d"); + LOGGER.info("Deleted segments containing only invalid records for table: {}", tableNameWithType); + } + + int numTasks = 0; + int maxTasks = getMaxTasks(taskType, tableNameWithType, taskConfigs); + for (SegmentZKMetadata segment : segmentSelectionResult.getSegmentsForCompaction()) { + if (numTasks == maxTasks) { + break; + } + Map<String, String> configs = new HashMap<>(); + configs.put(MinionConstants.TABLE_NAME_KEY, tableNameWithType); + configs.put(MinionConstants.SEGMENT_NAME_KEY, segment.getSegmentName()); + configs.put(MinionConstants.DOWNLOAD_URL_KEY, segment.getDownloadUrl()); + configs.put(MinionConstants.UPLOAD_URL_KEY, _clusterInfoAccessor.getVipUrl() + "/segments"); + configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY, String.valueOf(segment.getCrc())); + pinotTaskConfigs.add(new PinotTaskConfig(UpsertCompactionTask.TASK_TYPE, configs)); + numTasks++; + } + LOGGER.info("Finished generating {} tasks configs for table: {}", numTasks, tableNameWithType); + } + return pinotTaskConfigs; + } + + @VisibleForTesting + public static SegmentSelectionResult processValidDocIdMetadata(Map<String, String> taskConfigs, + Map<String, SegmentZKMetadata> completedSegmentsMap, Set<Map.Entry<String, String>> responseSet) { + double invalidRecordsThresholdPercent = Double.parseDouble( + taskConfigs.getOrDefault(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_PERCENT, + String.valueOf(DEFAULT_INVALID_RECORDS_THRESHOLD_PERCENT))); + long invalidRecordsThresholdCount = Long.parseLong( + taskConfigs.getOrDefault(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_COUNT, + String.valueOf(DEFAULT_INVALID_RECORDS_THRESHOLD_COUNT))); + List<SegmentZKMetadata> segmentsForCompaction = new ArrayList<>(); + List<String> segmentsForDeletion = new ArrayList<>(); + for (Map.Entry<String, String> streamResponse : responseSet) { + JsonNode allValidDocIdMetadata; + try { + allValidDocIdMetadata = JsonUtils.stringToJsonNode(streamResponse.getValue()); + } catch (IOException e) { + LOGGER.error("Unable to parse validDocIdMetadata response for: {}", streamResponse.getKey()); + continue; + } + Iterator<JsonNode> iterator = allValidDocIdMetadata.elements(); + while (iterator.hasNext()) { + JsonNode validDocIdMetadata = iterator.next(); + long totalInvalidDocs = validDocIdMetadata.get("totalInvalidDocs").asLong(); + String segmentName = validDocIdMetadata.get("segmentName").asText(); + SegmentZKMetadata segment = completedSegmentsMap.get(segmentName); + long totalDocs = validDocIdMetadata.get("totalDocs").asLong(); + double invalidRecordPercent = ((double) totalInvalidDocs / totalDocs) * 100; + if (totalInvalidDocs == totalDocs) { + segmentsForDeletion.add(segment.getSegmentName()); + } else if (invalidRecordPercent > invalidRecordsThresholdPercent + && totalInvalidDocs > invalidRecordsThresholdCount) { + segmentsForCompaction.add(segment); + } + } + } + return new SegmentSelectionResult(segmentsForCompaction, segmentsForDeletion); + } + + @VisibleForTesting + public static List<String> getValidDocIdMetadataUrls(Map<String, List<String>> serverToSegments, + BiMap<String, String> serverToEndpoints, String tableNameWithType, Set<String> completedSegments) + throws URISyntaxException { + Set<String> remainingSegments = new HashSet<>(completedSegments); + List<String> urls = new ArrayList<>(); + for (Map.Entry<String, List<String>> entry : serverToSegments.entrySet()) { + if (remainingSegments.isEmpty()) { + break; + } + String server = entry.getKey(); + List<String> segmentNames = entry.getValue(); + URIBuilder uriBuilder = new URIBuilder(serverToEndpoints.get(server)).setPath( + String.format("/tables/%s/validDocIdMetadata", tableNameWithType)); + int completedSegmentCountPerServer = 0; + for (String segmentName : segmentNames) { + if (remainingSegments.remove(segmentName)) { + completedSegmentCountPerServer++; + uriBuilder.addParameter("segmentNames", segmentName); + } + } + if (completedSegmentCountPerServer > 0) { + // only add to the list if the server has completed segments + urls.add(uriBuilder.toString()); + } + } + return urls; + } + + private List<SegmentZKMetadata> getCompletedSegments(String tableNameWithType, Map<String, String> taskConfigs) { + List<SegmentZKMetadata> completedSegments = new ArrayList<>(); + String bufferPeriod = taskConfigs.getOrDefault(UpsertCompactionTask.BUFFER_TIME_PERIOD_KEY, DEFAULT_BUFFER_PERIOD); + long bufferMs = TimeUtils.convertPeriodToMillis(bufferPeriod); + List<SegmentZKMetadata> allSegments = _clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType); + for (SegmentZKMetadata segment : allSegments) { + CommonConstants.Segment.Realtime.Status status = segment.getStatus(); + // initial segments selection based on status and age + if (status.isCompleted() && (segment.getEndTimeMs() <= (System.currentTimeMillis() - bufferMs))) { + completedSegments.add(segment); + } + } + return completedSegments; + } + + @VisibleForTesting + public static int getMaxTasks(String taskType, String tableNameWithType, Map<String, String> taskConfigs) { + int maxTasks = Integer.MAX_VALUE; + String tableMaxNumTasksConfig = taskConfigs.get(MinionConstants.TABLE_MAX_NUM_TASKS_KEY); + if (tableMaxNumTasksConfig != null) { + try { + maxTasks = Integer.parseInt(tableMaxNumTasksConfig); + } catch (Exception e) { + LOGGER.warn("MaxNumTasks have been wrongly set for table : {}, and task {}", tableNameWithType, taskType); + } + } + return maxTasks; + } + + @VisibleForTesting + static boolean validate(TableConfig tableConfig) { + String taskType = MinionConstants.UpsertCompactionTask.TASK_TYPE; + String tableNameWithType = tableConfig.getTableName(); + if (tableConfig.getTableType() == TableType.OFFLINE) { + LOGGER.warn("Skip generation task: {} for table: {}, offline table is not supported", taskType, + tableNameWithType); + return false; + } + if (!tableConfig.isUpsertEnabled()) { + LOGGER.warn("Skip generation task: {} for table: {}, table without upsert enabled is not supported", taskType, + tableNameWithType); + return false; + } + return true; + } +} diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskProgressObserverFactory.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskProgressObserverFactory.java new file mode 100644 index 0000000000..591a21b8ff --- /dev/null +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskProgressObserverFactory.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.minion.tasks.upsertcompaction; + +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.minion.event.BaseMinionProgressObserverFactory; +import org.apache.pinot.spi.annotations.minion.EventObserverFactory; + + +@EventObserverFactory +public class UpsertCompactionTaskProgressObserverFactory extends BaseMinionProgressObserverFactory { + + @Override + public String getTaskType() { + return MinionConstants.UpsertCompactionTask.TASK_TYPE; + } +} diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutorTest.java new file mode 100644 index 0000000000..604c58f6d0 --- /dev/null +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutorTest.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.minion.tasks.upsertcompaction; + +import java.util.HashMap; +import java.util.Map; +import org.apache.helix.HelixAdmin; +import org.apache.helix.HelixManager; +import org.apache.helix.model.ExternalView; +import org.apache.pinot.minion.MinionContext; +import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class UpsertCompactionTaskExecutorTest { + private static final String REALTIME_TABLE_NAME = "testTable_REALTIME"; + private static final String SEGMENT_NAME = "testSegment"; + private static final String CLUSTER_NAME = "testCluster"; + + @Test + public void testGetServer() { + ExternalView externalView = new ExternalView(REALTIME_TABLE_NAME); + Map<String, Map<String, String>> externalViewSegmentAssignment = externalView.getRecord().getMapFields(); + Map<String, String> map = new HashMap<>(); + map.put("server1", SegmentStateModel.ONLINE); + externalViewSegmentAssignment.put(SEGMENT_NAME, map); + HelixAdmin clusterManagementTool = Mockito.mock(HelixAdmin.class); + MinionContext minionContext = MinionContext.getInstance(); + Mockito.when(clusterManagementTool.getResourceExternalView(CLUSTER_NAME, REALTIME_TABLE_NAME)) + .thenReturn(externalView); + HelixManager helixManager = Mockito.mock(HelixManager.class); + Mockito.when(helixManager.getClusterName()).thenReturn(CLUSTER_NAME); + Mockito.when(helixManager.getClusterManagmentTool()).thenReturn(clusterManagementTool); + minionContext.setHelixManager(helixManager); + + String server = UpsertCompactionTaskExecutor.getServer(SEGMENT_NAME, REALTIME_TABLE_NAME); + + Assert.assertEquals(server, "server1"); + + // verify exception thrown with OFFLINE server + map.put("server1", SegmentStateModel.OFFLINE); + Assert.assertThrows(IllegalStateException.class, + () -> UpsertCompactionTaskExecutor.getServer(SEGMENT_NAME, REALTIME_TABLE_NAME)); + } +} diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java new file mode 100644 index 0000000000..9c93979d9f --- /dev/null +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java @@ -0,0 +1,273 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.minion.tasks.upsertcompaction; + +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; +import java.net.URISyntaxException; +import java.util.AbstractMap; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor; +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.core.common.MinionConstants.UpsertCompactionTask; +import org.apache.pinot.core.minion.PinotTaskConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableTaskConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.table.UpsertConfig; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.TimeUtils; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; +import org.testng.collections.Lists; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + + +public class UpsertCompactionTaskGeneratorTest { + private static final String RAW_TABLE_NAME = "testTable"; + private static final String REALTIME_TABLE_NAME = "testTable_REALTIME"; + private static final String TIME_COLUMN_NAME = "millisSinceEpoch"; + private UpsertCompactionTaskGenerator _taskGenerator; + private TableConfig _tableConfig; + private ClusterInfoAccessor _mockClusterInfoAccessor; + private SegmentZKMetadata _completedSegment; + private SegmentZKMetadata _completedSegment2; + private Map<String, SegmentZKMetadata> _completedSegmentsMap; + + @BeforeClass + public void setUp() { + _taskGenerator = new UpsertCompactionTaskGenerator(); + Map<String, Map<String, String>> tableTaskConfigs = new HashMap<>(); + Map<String, String> compactionConfigs = new HashMap<>(); + tableTaskConfigs.put(UpsertCompactionTask.TASK_TYPE, compactionConfigs); + _tableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME) + .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL)) + .setTaskConfig(new TableTaskConfig(tableTaskConfigs)).build(); + _mockClusterInfoAccessor = mock(ClusterInfoAccessor.class); + + _completedSegment = new SegmentZKMetadata("testTable__0"); + _completedSegment.setStatus(CommonConstants.Segment.Realtime.Status.DONE); + _completedSegment.setStartTime(System.currentTimeMillis() - TimeUtils.convertPeriodToMillis("2d")); + _completedSegment.setEndTime(System.currentTimeMillis() - TimeUtils.convertPeriodToMillis("1d")); + _completedSegment.setTimeUnit(TimeUnit.MILLISECONDS); + _completedSegment.setTotalDocs(100L); + + _completedSegment2 = new SegmentZKMetadata("testTable__1"); + _completedSegment2.setStatus(CommonConstants.Segment.Realtime.Status.DONE); + _completedSegment2.setStartTime(System.currentTimeMillis() - TimeUtils.convertPeriodToMillis("1d")); + _completedSegment2.setEndTime(System.currentTimeMillis()); + _completedSegment2.setTimeUnit(TimeUnit.MILLISECONDS); + _completedSegment2.setTotalDocs(10L); + + _completedSegmentsMap = new HashMap<>(); + _completedSegmentsMap.put(_completedSegment.getSegmentName(), _completedSegment); + _completedSegmentsMap.put(_completedSegment2.getSegmentName(), _completedSegment2); + } + + @Test + public void testValidate() { + TableConfig tableConfig = + new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME) + .build(); + assertFalse(UpsertCompactionTaskGenerator.validate(tableConfig)); + + TableConfigBuilder tableConfigBuilder = + new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME); + assertFalse(UpsertCompactionTaskGenerator.validate(tableConfigBuilder.build())); + + tableConfigBuilder = tableConfigBuilder.setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL)); + assertTrue(UpsertCompactionTaskGenerator.validate(tableConfigBuilder.build())); + } + + @Test + public void testGenerateTasksValidatesTableConfigs() { + UpsertCompactionTaskGenerator taskGenerator = new UpsertCompactionTaskGenerator(); + TableConfig offlineTableConfig = + new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME) + .build(); + List<PinotTaskConfig> pinotTaskConfigs = taskGenerator.generateTasks(Lists.newArrayList(offlineTableConfig)); + assertTrue(pinotTaskConfigs.isEmpty()); + + TableConfig realtimeTableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME) + .build(); + pinotTaskConfigs = taskGenerator.generateTasks(Lists.newArrayList(realtimeTableConfig)); + assertTrue(pinotTaskConfigs.isEmpty()); + } + + @Test + public void testGenerateTasksWithNoSegments() { + when(_mockClusterInfoAccessor.getSegmentsZKMetadata(REALTIME_TABLE_NAME)).thenReturn( + Lists.newArrayList(Collections.emptyList())); + _taskGenerator.init(_mockClusterInfoAccessor); + + List<PinotTaskConfig> pinotTaskConfigs = _taskGenerator.generateTasks(Lists.newArrayList(_tableConfig)); + + assertEquals(pinotTaskConfigs.size(), 0); + } + + @Test + public void testGenerateTasksWithConsumingSegment() { + SegmentZKMetadata consumingSegment = new SegmentZKMetadata("testTable__0"); + consumingSegment.setStatus(CommonConstants.Segment.Realtime.Status.IN_PROGRESS); + when(_mockClusterInfoAccessor.getSegmentsZKMetadata(REALTIME_TABLE_NAME)).thenReturn( + Lists.newArrayList(consumingSegment)); + _taskGenerator.init(_mockClusterInfoAccessor); + + List<PinotTaskConfig> pinotTaskConfigs = _taskGenerator.generateTasks(Lists.newArrayList(_tableConfig)); + + assertEquals(pinotTaskConfigs.size(), 0); + } + + @Test + public void testGenerateTasksWithNewlyCompletedSegment() { + when(_mockClusterInfoAccessor.getSegmentsZKMetadata(REALTIME_TABLE_NAME)).thenReturn( + Lists.newArrayList(_completedSegment)); + _taskGenerator.init(_mockClusterInfoAccessor); + + List<PinotTaskConfig> pinotTaskConfigs = _taskGenerator.generateTasks(Lists.newArrayList(_tableConfig)); + + assertEquals(pinotTaskConfigs.size(), 0); + } + + @Test + public void testGetValidDocIdMetadataUrls() + throws URISyntaxException { + Map<String, List<String>> serverToSegments = new HashMap<>(); + serverToSegments.put("server1", + Lists.newArrayList(_completedSegment.getSegmentName(), _completedSegment2.getSegmentName())); + serverToSegments.put("server2", Lists.newArrayList("consumingSegment")); + BiMap<String, String> serverToEndpoints = HashBiMap.create(1); + serverToEndpoints.put("server1", "http://endpoint1"); + serverToEndpoints.put("server2", "http://endpoint2"); + Set<String> completedSegments = new HashSet<>(); + completedSegments.add(_completedSegment.getSegmentName()); + completedSegments.add(_completedSegment2.getSegmentName()); + + List<String> validDocIdUrls = + UpsertCompactionTaskGenerator.getValidDocIdMetadataUrls(serverToSegments, serverToEndpoints, + REALTIME_TABLE_NAME, completedSegments); + + String expectedUrl = + String.format("%s/tables/%s/validDocIdMetadata?segmentNames=%s&segmentNames=%s", "http://endpoint1", + REALTIME_TABLE_NAME, _completedSegment.getSegmentName(), _completedSegment2.getSegmentName()); + assertEquals(validDocIdUrls.get(0), expectedUrl); + assertEquals(validDocIdUrls.size(), 1); + } + + @Test + public void testGetValidDocIdMetadataUrlsWithReplicatedSegments() + throws URISyntaxException { + Map<String, List<String>> serverToSegments = new LinkedHashMap<>(); + serverToSegments.put("server1", + Lists.newArrayList(_completedSegment.getSegmentName(), _completedSegment2.getSegmentName())); + serverToSegments.put("server2", + Lists.newArrayList(_completedSegment.getSegmentName(), _completedSegment2.getSegmentName())); + BiMap<String, String> serverToEndpoints = HashBiMap.create(1); + serverToEndpoints.put("server1", "http://endpoint1"); + serverToEndpoints.put("server2", "http://endpoint2"); + Set<String> completedSegments = new HashSet<>(); + completedSegments.add(_completedSegment.getSegmentName()); + completedSegments.add(_completedSegment2.getSegmentName()); + + List<String> validDocIdUrls = + UpsertCompactionTaskGenerator.getValidDocIdMetadataUrls(serverToSegments, serverToEndpoints, + REALTIME_TABLE_NAME, completedSegments); + + String expectedUrl = + String.format("%s/tables/%s/validDocIdMetadata?segmentNames=%s&segmentNames=%s", "http://endpoint1", + REALTIME_TABLE_NAME, _completedSegment.getSegmentName(), _completedSegment2.getSegmentName()); + assertEquals(validDocIdUrls.get(0), expectedUrl); + assertEquals(validDocIdUrls.size(), 1); + } + + @Test + public void testGetMaxTasks() { + Map<String, String> taskConfigs = new HashMap<>(); + taskConfigs.put(MinionConstants.TABLE_MAX_NUM_TASKS_KEY, "10"); + + int maxTasks = + UpsertCompactionTaskGenerator.getMaxTasks(UpsertCompactionTask.TASK_TYPE, REALTIME_TABLE_NAME, taskConfigs); + + assertEquals(maxTasks, 10); + } + + @Test + public void testProcessValidDocIdMetadata() { + Map<String, String> compactionConfigs = getCompactionConfigs("1", "10"); + Set<Map.Entry<String, String>> responseSet = new HashSet<>(); + String json = "[{" + "\"totalValidDocs\" : 50," + "\"totalInvalidDocs\" : 50," + "\"segmentName\" : \"" + + _completedSegment.getSegmentName() + "\"," + "\"totalDocs\" : 100" + "}," + "{" + "\"totalValidDocs\" : 0," + + "\"totalInvalidDocs\" : 10," + "\"segmentName\" : \"" + _completedSegment2.getSegmentName() + "\"," + + "\"totalDocs\" : 10" + "}]"; + responseSet.add(new AbstractMap.SimpleEntry<>("", json)); + UpsertCompactionTaskGenerator.SegmentSelectionResult segmentSelectionResult = + UpsertCompactionTaskGenerator.processValidDocIdMetadata(compactionConfigs, _completedSegmentsMap, responseSet); + assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(), + _completedSegment.getSegmentName()); + assertEquals(segmentSelectionResult.getSegmentsForDeletion().get(0), _completedSegment2.getSegmentName()); + + // test with a higher invalidRecordsThresholdPercent + compactionConfigs = getCompactionConfigs("60", "10"); + segmentSelectionResult = + UpsertCompactionTaskGenerator.processValidDocIdMetadata(compactionConfigs, _completedSegmentsMap, responseSet); + assertTrue(segmentSelectionResult.getSegmentsForCompaction().isEmpty()); + + // test without an invalidRecordsThresholdPercent + compactionConfigs = getCompactionConfigs("0", "10"); + segmentSelectionResult = + UpsertCompactionTaskGenerator.processValidDocIdMetadata(compactionConfigs, _completedSegmentsMap, responseSet); + assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(), + _completedSegment.getSegmentName()); + + // test without a invalidRecordsThresholdCount + compactionConfigs = getCompactionConfigs("30", "0"); + segmentSelectionResult = + UpsertCompactionTaskGenerator.processValidDocIdMetadata(compactionConfigs, _completedSegmentsMap, responseSet); + assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(), + _completedSegment.getSegmentName()); + } + + private Map<String, String> getCompactionConfigs(String invalidRecordsThresholdPercent, + String invalidRecordsThresholdCount) { + Map<String, String> compactionConfigs = new HashMap<>(); + if (!invalidRecordsThresholdPercent.equals("0")) { + compactionConfigs.put(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_PERCENT, invalidRecordsThresholdPercent); + } + if (!invalidRecordsThresholdCount.equals("0")) { + compactionConfigs.put(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_COUNT, invalidRecordsThresholdCount); + } + return compactionConfigs; + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java index d5741fa0b7..a64170c60f 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java @@ -100,6 +100,7 @@ public final class TableConfigUtils { // supported TableTaskTypes, must be identical to the one return in the impl of {@link PinotTaskGenerator}. private static final String REALTIME_TO_OFFLINE_TASK_TYPE = "RealtimeToOfflineSegmentsTask"; + private static final String UPSERT_COMPACTION_TASK_TYPE = "UpsertCompactionTask"; // this is duplicate with KinesisConfig.STREAM_TYPE, while instead of use KinesisConfig.STREAM_TYPE directly, we // hardcode the value here to avoid pulling the entire pinot-kinesis module as dependency. @@ -543,6 +544,33 @@ public final class TableConfigUtils { } } } + } else if (taskTypeConfigName.equals(UPSERT_COMPACTION_TASK_TYPE)) { + // check table is realtime + Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME, + "UpsertCompactionTask only supports realtime tables!"); + // check upsert enabled + Preconditions.checkState(tableConfig.isUpsertEnabled(), "Upsert must be enabled for UpsertCompactionTask"); + + // check no malformed period + if (taskTypeConfig.containsKey("bufferTimePeriod")) { + TimeUtils.convertPeriodToMillis(taskTypeConfig.get("bufferTimePeriod")); + } + // check maxNumRecordsPerSegment + if (taskTypeConfig.containsKey("invalidRecordsThresholdPercent")) { + Preconditions.checkState(Double.parseDouble(taskTypeConfig.get("invalidRecordsThresholdPercent")) > 0 + && Double.parseDouble(taskTypeConfig.get("invalidRecordsThresholdPercent")) <= 100, + "invalidRecordsThresholdPercent must be > 0 and <= 100"); + } + // check invalidRecordsThresholdCount + if (taskTypeConfig.containsKey("invalidRecordsThresholdCount")) { + Preconditions.checkState(Long.parseLong(taskTypeConfig.get("invalidRecordsThresholdCount")) >= 1, + "invalidRecordsThresholdCount must be >= 1"); + } + // check that either invalidRecordsThresholdPercent or invalidRecordsThresholdCount was provided + Preconditions.checkState( + taskTypeConfig.containsKey("invalidRecordsThresholdPercent") || taskTypeConfig.containsKey( + "invalidRecordsThresholdCount"), + "invalidRecordsThresholdPercent or invalidRecordsThresholdCount or both must be provided"); } } } @@ -581,8 +609,8 @@ public final class TableConfigUtils { Preconditions.checkState(streamConfig.hasLowLevelConsumerType() && !streamConfig.hasHighLevelConsumerType(), "Upsert/Dedup table must use low-level streaming consumer type"); // replica group is configured for routing - Preconditions.checkState(tableConfig.getRoutingConfig() != null - && isRoutingStrategyAllowedForUpsert(tableConfig.getRoutingConfig()), + Preconditions.checkState( + tableConfig.getRoutingConfig() != null && isRoutingStrategyAllowedForUpsert(tableConfig.getRoutingConfig()), "Upsert/Dedup table must use strict replica-group (i.e. strictReplicaGroup) based routing"); // specifically for upsert @@ -649,8 +677,8 @@ public final class TableConfigUtils { */ @VisibleForTesting static void validateInstancePartitionsTypeMapConfig(TableConfig tableConfig) { - if (MapUtils.isEmpty(tableConfig.getInstancePartitionsMap()) - || MapUtils.isEmpty(tableConfig.getInstanceAssignmentConfigMap())) { + if (MapUtils.isEmpty(tableConfig.getInstancePartitionsMap()) || MapUtils.isEmpty( + tableConfig.getInstanceAssignmentConfigMap())) { return; } for (InstancePartitionsType instancePartitionsType : tableConfig.getInstancePartitionsMap().keySet()) { @@ -668,11 +696,11 @@ public final class TableConfigUtils { */ @VisibleForTesting static void validatePartitionedReplicaGroupInstance(TableConfig tableConfig) { - if (tableConfig.getValidationConfig().getReplicaGroupStrategyConfig() == null - || MapUtils.isEmpty(tableConfig.getInstanceAssignmentConfigMap())) { + if (tableConfig.getValidationConfig().getReplicaGroupStrategyConfig() == null || MapUtils.isEmpty( + tableConfig.getInstanceAssignmentConfigMap())) { return; } - for (Map.Entry<String, InstanceAssignmentConfig> entry: tableConfig.getInstanceAssignmentConfigMap().entrySet()) { + for (Map.Entry<String, InstanceAssignmentConfig> entry : tableConfig.getInstanceAssignmentConfigMap().entrySet()) { boolean isNullReplicaGroupPartitionConfig = entry.getValue().getReplicaGroupPartitionConfig() == null; Preconditions.checkState(isNullReplicaGroupPartitionConfig, "Both replicaGroupStrategyConfig and replicaGroupPartitionConfig is provided"); @@ -1025,8 +1053,8 @@ public final class TableConfigUtils { return; } - boolean forwardIndexDisabled = - Boolean.parseBoolean(fieldConfigProperties.getOrDefault(FieldConfig.FORWARD_INDEX_DISABLED, + boolean forwardIndexDisabled = Boolean.parseBoolean( + fieldConfigProperties.getOrDefault(FieldConfig.FORWARD_INDEX_DISABLED, FieldConfig.DEFAULT_FORWARD_INDEX_DISABLED)); if (!forwardIndexDisabled) { return; @@ -1045,20 +1073,23 @@ public final class TableConfigUtils { } Preconditions.checkState( - !indexingConfigs.isOptimizeDictionaryForMetrics() && !indexingConfigs.isOptimizeDictionary(), - String.format("Dictionary override optimization options (OptimizeDictionary, optimizeDictionaryForMetrics)" - + " not supported with forward index for column: %s, disabled", columnName)); + !indexingConfigs.isOptimizeDictionaryForMetrics() && !indexingConfigs.isOptimizeDictionary(), String.format( + "Dictionary override optimization options (OptimizeDictionary, optimizeDictionaryForMetrics)" + + " not supported with forward index for column: %s, disabled", columnName)); - boolean hasDictionary = fieldConfig.getEncodingType() == FieldConfig.EncodingType.DICTIONARY - || noDictionaryColumns == null || !noDictionaryColumns.contains(columnName); - boolean hasInvertedIndex = indexingConfigs.getInvertedIndexColumns() != null - && indexingConfigs.getInvertedIndexColumns().contains(columnName); + boolean hasDictionary = + fieldConfig.getEncodingType() == FieldConfig.EncodingType.DICTIONARY || noDictionaryColumns == null + || !noDictionaryColumns.contains(columnName); + boolean hasInvertedIndex = + indexingConfigs.getInvertedIndexColumns() != null && indexingConfigs.getInvertedIndexColumns() + .contains(columnName); if (!hasDictionary || !hasInvertedIndex) { LOGGER.warn("Forward index has been disabled for column {}. Either dictionary ({}) and / or inverted index ({}) " + "has been disabled. If the forward index needs to be regenerated or another index added please refresh " + "or back-fill the forward index as it cannot be rebuilt without dictionary and inverted index.", - columnName, hasDictionary ? "enabled" : "disabled", hasInvertedIndex ? "enabled" : "disabled"); + columnName, + hasDictionary ? "enabled" : "disabled", hasInvertedIndex ? "enabled" : "disabled"); } } @@ -1271,9 +1302,7 @@ public final class TableConfigUtils { if (clone.getFieldConfigList() != null) { List<FieldConfig> cleanFieldConfigList = new ArrayList<>(); for (FieldConfig fieldConfig : clone.getFieldConfigList()) { - cleanFieldConfigList.add(new FieldConfig.Builder(fieldConfig) - .withIndexTypes(null) - .build()); + cleanFieldConfigList.add(new FieldConfig.Builder(fieldConfig).withIndexTypes(null).build()); } clone.setFieldConfigList(cleanFieldConfigList); } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java index 1f6834f62c..9d58b42161 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java @@ -924,8 +924,8 @@ public class TableConfigUtilsTest { // be rebuilt without a dictionary, the constraint to have a dictionary has been lifted. Map<String, String> fieldConfigProperties = new HashMap<>(); fieldConfigProperties.put(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString()); - FieldConfig fieldConfig = new FieldConfig("myCol1", FieldConfig.EncodingType.RAW, null, null, null, null, - fieldConfigProperties); + FieldConfig fieldConfig = + new FieldConfig("myCol1", FieldConfig.EncodingType.RAW, null, null, null, null, fieldConfigProperties); tableConfig.setFieldConfigList(Arrays.asList(fieldConfig)); TableConfigUtils.validate(tableConfig, schema); } catch (Exception e) { @@ -938,8 +938,8 @@ public class TableConfigUtilsTest { // lifted. Map<String, String> fieldConfigProperties = new HashMap<>(); fieldConfigProperties.put(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString()); - FieldConfig fieldConfig = new FieldConfig("myCol2", FieldConfig.EncodingType.DICTIONARY, null, null, null, null, - fieldConfigProperties); + FieldConfig fieldConfig = + new FieldConfig("myCol2", FieldConfig.EncodingType.DICTIONARY, null, null, null, null, fieldConfigProperties); tableConfig.setFieldConfigList(Arrays.asList(fieldConfig)); TableConfigUtils.validate(tableConfig, schema); } catch (Exception e) { @@ -964,8 +964,9 @@ public class TableConfigUtilsTest { // Enable forward index disabled flag for a column with inverted index Map<String, String> fieldConfigProperties = new HashMap<>(); fieldConfigProperties.put(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString()); - FieldConfig fieldConfig = new FieldConfig("myCol2", FieldConfig.EncodingType.DICTIONARY, - FieldConfig.IndexType.INVERTED, null, null, null, fieldConfigProperties); + FieldConfig fieldConfig = + new FieldConfig("myCol2", FieldConfig.EncodingType.DICTIONARY, FieldConfig.IndexType.INVERTED, null, null, + null, fieldConfigProperties); tableConfig.setFieldConfigList(Arrays.asList(fieldConfig)); TableConfigUtils.validate(tableConfig, schema); } catch (Exception e) { @@ -979,8 +980,9 @@ public class TableConfigUtilsTest { // Enable forward index disabled flag for a column with inverted index and is sorted Map<String, String> fieldConfigProperties = new HashMap<>(); fieldConfigProperties.put(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString()); - FieldConfig fieldConfig = new FieldConfig("myCol2", FieldConfig.EncodingType.DICTIONARY, - FieldConfig.IndexType.INVERTED, null, null, null, fieldConfigProperties); + FieldConfig fieldConfig = + new FieldConfig("myCol2", FieldConfig.EncodingType.DICTIONARY, FieldConfig.IndexType.INVERTED, null, null, + null, fieldConfigProperties); tableConfig.setFieldConfigList(Arrays.asList(fieldConfig)); TableConfigUtils.validate(tableConfig, schema); } catch (Exception e) { @@ -994,9 +996,10 @@ public class TableConfigUtilsTest { // Enable forward index disabled flag for a multi-value column with inverted index and range index Map<String, String> fieldConfigProperties = new HashMap<>(); fieldConfigProperties.put(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString()); - FieldConfig fieldConfig = new FieldConfig("myCol2", FieldConfig.EncodingType.DICTIONARY, - FieldConfig.IndexType.INVERTED, Arrays.asList(FieldConfig.IndexType.INVERTED, FieldConfig.IndexType.RANGE), - null, null, fieldConfigProperties); + FieldConfig fieldConfig = + new FieldConfig("myCol2", FieldConfig.EncodingType.DICTIONARY, FieldConfig.IndexType.INVERTED, + Arrays.asList(FieldConfig.IndexType.INVERTED, FieldConfig.IndexType.RANGE), null, null, + fieldConfigProperties); tableConfig.setFieldConfigList(Arrays.asList(fieldConfig)); TableConfigUtils.validate(tableConfig, schema); Assert.fail("Should fail for MV myCol2 with forward index disabled but has range and inverted index"); @@ -1011,9 +1014,10 @@ public class TableConfigUtilsTest { // Enable forward index disabled flag for a singe-value column with inverted index and range index v1 Map<String, String> fieldConfigProperties = new HashMap<>(); fieldConfigProperties.put(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString()); - FieldConfig fieldConfig = new FieldConfig("myCol1", FieldConfig.EncodingType.DICTIONARY, - FieldConfig.IndexType.INVERTED, Arrays.asList(FieldConfig.IndexType.INVERTED, FieldConfig.IndexType.RANGE), - null, null, fieldConfigProperties); + FieldConfig fieldConfig = + new FieldConfig("myCol1", FieldConfig.EncodingType.DICTIONARY, FieldConfig.IndexType.INVERTED, + Arrays.asList(FieldConfig.IndexType.INVERTED, FieldConfig.IndexType.RANGE), null, null, + fieldConfigProperties); tableConfig.setFieldConfigList(Arrays.asList(fieldConfig)); tableConfig.getIndexingConfig().setRangeIndexVersion(1); TableConfigUtils.validate(tableConfig, schema); @@ -1030,14 +1034,15 @@ public class TableConfigUtilsTest { // Enable forward index disabled flag for a column with inverted index and disable dictionary Map<String, String> fieldConfigProperties = new HashMap<>(); fieldConfigProperties.put(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString()); - FieldConfig fieldConfig = new FieldConfig("myCol2", FieldConfig.EncodingType.RAW, - FieldConfig.IndexType.INVERTED, null, null, null, fieldConfigProperties); + FieldConfig fieldConfig = + new FieldConfig("myCol2", FieldConfig.EncodingType.RAW, FieldConfig.IndexType.INVERTED, null, null, null, + fieldConfigProperties); tableConfig.setFieldConfigList(Arrays.asList(fieldConfig)); TableConfigUtils.validate(tableConfig, schema); Assert.fail("Should not be able to disable dictionary but keep inverted index"); } catch (Exception e) { - Assert.assertEquals(e.getMessage(), "Cannot create an Inverted index on column myCol2 specified in the " - + "noDictionaryColumns config"); + Assert.assertEquals(e.getMessage(), + "Cannot create an Inverted index on column myCol2 specified in the " + "noDictionaryColumns config"); } tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) @@ -1046,8 +1051,9 @@ public class TableConfigUtilsTest { // Enable forward index disabled flag for a column with FST index and disable dictionary Map<String, String> fieldConfigProperties = new HashMap<>(); fieldConfigProperties.put(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString()); - FieldConfig fieldConfig = new FieldConfig("myCol2", FieldConfig.EncodingType.RAW, - FieldConfig.IndexType.FST, null, null, null, fieldConfigProperties); + FieldConfig fieldConfig = + new FieldConfig("myCol2", FieldConfig.EncodingType.RAW, FieldConfig.IndexType.FST, null, null, null, + fieldConfigProperties); tableConfig.setFieldConfigList(Arrays.asList(fieldConfig)); TableConfigUtils.validate(tableConfig, schema); Assert.fail("Should not be able to disable dictionary but keep inverted index"); @@ -1061,8 +1067,9 @@ public class TableConfigUtilsTest { // Enable forward index disabled flag for a column with FST index and disable dictionary Map<String, String> fieldConfigProperties = new HashMap<>(); fieldConfigProperties.put(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString()); - FieldConfig fieldConfig = new FieldConfig("intCol", FieldConfig.EncodingType.RAW, - FieldConfig.IndexType.RANGE, null, null, null, fieldConfigProperties); + FieldConfig fieldConfig = + new FieldConfig("intCol", FieldConfig.EncodingType.RAW, FieldConfig.IndexType.RANGE, null, null, null, + fieldConfigProperties); tableConfig.setFieldConfigList(Arrays.asList(fieldConfig)); TableConfigUtils.validate(tableConfig, schema); } catch (Exception e) { @@ -1415,8 +1422,7 @@ public class TableConfigUtilsTest { @Test public void testValidateUpsertConfig() { Schema schema = - new Schema.SchemaBuilder().setSchemaName(TABLE_NAME) - .addSingleValueDimension("myCol", FieldSpec.DataType.STRING) + new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol", FieldSpec.DataType.STRING) .build(); UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL); TableConfig tableConfig = @@ -1526,12 +1532,10 @@ public class TableConfigUtilsTest { // Table upsert with delete column String incorrectTypeDelCol = "incorrectTypeDeleteCol"; String delCol = "myDelCol"; - schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME) - .setPrimaryKeyColumns(Lists.newArrayList("myPkCol")) + schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).setPrimaryKeyColumns(Lists.newArrayList("myPkCol")) .addSingleValueDimension("myCol", FieldSpec.DataType.STRING) .addSingleValueDimension(incorrectTypeDelCol, FieldSpec.DataType.STRING) - .addSingleValueDimension(delCol, FieldSpec.DataType.BOOLEAN) - .build(); + .addSingleValueDimension(delCol, FieldSpec.DataType.BOOLEAN).build(); streamConfigs = getStreamConfigs(); streamConfigs.put("stream.kafka.consumer.type", "simple"); @@ -1783,16 +1787,14 @@ public class TableConfigUtilsTest { InstanceAssignmentConfig instanceAssignmentConfig = Mockito.mock(InstanceAssignmentConfig.class); TableConfig tableConfigWithoutInstancePartitionsMap = - new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) - .build(); + new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build(); // Call validate with a table-config without any instance partitions or instance assignment config TableConfigUtils.validateInstancePartitionsTypeMapConfig(tableConfigWithoutInstancePartitionsMap); TableConfig tableConfigWithInstancePartitionsMap = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) - .setInstancePartitionsMap(ImmutableMap.of(InstancePartitionsType.OFFLINE, "test_OFFLINE")) - .build(); + .setInstancePartitionsMap(ImmutableMap.of(InstancePartitionsType.OFFLINE, "test_OFFLINE")).build(); // Call validate with a table-config with instance partitions set but not instance assignment config TableConfigUtils.validateInstancePartitionsTypeMapConfig(tableConfigWithInstancePartitionsMap); @@ -1800,8 +1802,7 @@ public class TableConfigUtilsTest { TableConfig invalidTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) .setInstancePartitionsMap(ImmutableMap.of(InstancePartitionsType.OFFLINE, "test_OFFLINE")) .setInstanceAssignmentConfigMap( - ImmutableMap.of(InstancePartitionsType.OFFLINE.toString(), instanceAssignmentConfig)) - .build(); + ImmutableMap.of(InstancePartitionsType.OFFLINE.toString(), instanceAssignmentConfig)).build(); try { // Call validate with instance partitions and config set for the same type TableConfigUtils.validateInstancePartitionsTypeMapConfig(invalidTableConfig); @@ -1895,15 +1896,64 @@ public class TableConfigUtilsTest { } } + @Test + public void testUpsertCompactionTaskConfig() { + Schema schema = + new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol", FieldSpec.DataType.STRING) + .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS") + .setPrimaryKeyColumns(Lists.newArrayList("myCol")).build(); + Map<String, String> upsertCompactionTaskConfig = + ImmutableMap.of("bufferTimePeriod", "5d", "invalidRecordsThresholdPercent", "1", "invalidRecordsThresholdCount", + "1"); + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL)) + .setTaskConfig(new TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", upsertCompactionTaskConfig))) + .build(); + + TableConfigUtils.validateTaskConfigs(tableConfig, schema); + + // test with invalid invalidRecordsThresholdPercents + upsertCompactionTaskConfig = ImmutableMap.of("invalidRecordsThresholdPercent", "0"); + TableConfig zeroPercentTableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL)) + .setTaskConfig(new TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", upsertCompactionTaskConfig))) + .build(); + Assert.assertThrows(IllegalStateException.class, + () -> TableConfigUtils.validateTaskConfigs(zeroPercentTableConfig, schema)); + upsertCompactionTaskConfig = ImmutableMap.of("invalidRecordsThresholdPercent", "110"); + TableConfig hundredTenPercentTableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL)) + .setTaskConfig(new TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", upsertCompactionTaskConfig))) + .build(); + Assert.assertThrows(IllegalStateException.class, + () -> TableConfigUtils.validateTaskConfigs(hundredTenPercentTableConfig, schema)); + + // test with invalid invalidRecordsThresholdCount + upsertCompactionTaskConfig = ImmutableMap.of("invalidRecordsThresholdCount", "0"); + TableConfig invalidCountTableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL)) + .setTaskConfig(new TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", upsertCompactionTaskConfig))) + .build(); + Assert.assertThrows(IllegalStateException.class, + () -> TableConfigUtils.validateTaskConfigs(invalidCountTableConfig, schema)); + + // test without invalidRecordsThresholdPercent or invalidRecordsThresholdCount + upsertCompactionTaskConfig = ImmutableMap.of("bufferTimePeriod", "5d"); + TableConfig invalidTableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL)) + .setTaskConfig(new TableTaskConfig(ImmutableMap.of("UpsertCompactionTask", upsertCompactionTaskConfig))) + .build(); + Assert.assertThrows(IllegalStateException.class, + () -> TableConfigUtils.validateTaskConfigs(invalidTableConfig, schema)); + } + @Test public void testValidatePartitionedReplicaGroupInstance() { String partitionColumn = "testPartitionCol"; - ReplicaGroupStrategyConfig replicaGroupStrategyConfig = - new ReplicaGroupStrategyConfig(partitionColumn, 2); + ReplicaGroupStrategyConfig replicaGroupStrategyConfig = new ReplicaGroupStrategyConfig(partitionColumn, 2); TableConfig tableConfigWithoutReplicaGroupStrategyConfig = - new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) - .build(); + new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build(); // Call validate with a table-config without replicaGroupStrategyConfig or replicaGroupPartitionConfig. TableConfigUtils.validatePartitionedReplicaGroupInstance(tableConfigWithoutReplicaGroupStrategyConfig); @@ -1918,12 +1968,12 @@ public class TableConfigUtilsTest { InstanceAssignmentConfig instanceAssignmentConfig = Mockito.mock(InstanceAssignmentConfig.class); InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0, 0, 0, 2, 0, false, partitionColumn); - Mockito.doReturn(instanceReplicaGroupPartitionConfig) - .when(instanceAssignmentConfig).getReplicaGroupPartitionConfig(); + Mockito.doReturn(instanceReplicaGroupPartitionConfig).when(instanceAssignmentConfig) + .getReplicaGroupPartitionConfig(); - TableConfig invalidTableConfig = new TableConfigBuilder(TableType.OFFLINE) - .setTableName(TABLE_NAME).setInstanceAssignmentConfigMap( - ImmutableMap.of(TableType.OFFLINE.toString(), instanceAssignmentConfig)).build(); + TableConfig invalidTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) + .setInstanceAssignmentConfigMap(ImmutableMap.of(TableType.OFFLINE.toString(), instanceAssignmentConfig)) + .build(); invalidTableConfig.getValidationConfig().setReplicaGroupStrategyConfig(replicaGroupStrategyConfig); try { diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java index 8148956ae6..c03328c2bb 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java @@ -71,6 +71,7 @@ import org.apache.pinot.common.restlet.resources.TablesList; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.common.utils.RoaringBitmapUtils; import org.apache.pinot.common.utils.TarGzCompressionUtils; +import org.apache.pinot.common.utils.URIUtils; import org.apache.pinot.common.utils.helix.HelixHelper; import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager; @@ -164,8 +165,8 @@ public class TablesResource { @Encoded @Produces(MediaType.APPLICATION_JSON) @Path("/tables/{tableName}/metadata") - @ApiOperation(value = "List metadata for all segments of a given table", - notes = "List segments metadata of table hosted on this server") + @ApiOperation(value = "List metadata for all segments of a given table", notes = "List segments metadata of table " + + "hosted on this server") @ApiResponses(value = { @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error"), @@ -174,7 +175,7 @@ public class TablesResource { public String getSegmentMetadata( @ApiParam(value = "Table Name with type", required = true) @PathParam("tableName") String tableName, @ApiParam(value = "Column name", allowMultiple = true) @QueryParam("columns") @DefaultValue("") - List<String> columns) + List<String> columns) throws WebApplicationException { InstanceDataManager instanceDataManager = _serverInstance.getInstanceDataManager(); @@ -299,7 +300,7 @@ public class TablesResource { @PathParam("tableName") String tableName, @ApiParam(value = "Segment name", required = true) @PathParam("segmentName") String segmentName, @ApiParam(value = "Column name", allowMultiple = true) @QueryParam("columns") @DefaultValue("") - List<String> columns) { + List<String> columns) { for (int i = 0; i < columns.size(); i++) { try { columns.set(i, URLDecoder.decode(columns.get(i), StandardCharsets.UTF_8.name())); @@ -348,8 +349,8 @@ public class TablesResource { try { Map<String, String> segmentCrcForTable = new HashMap<>(); for (SegmentDataManager segmentDataManager : segmentDataManagers) { - segmentCrcForTable - .put(segmentDataManager.getSegmentName(), segmentDataManager.getSegment().getSegmentMetadata().getCrc()); + segmentCrcForTable.put(segmentDataManager.getSegmentName(), + segmentDataManager.getSegment().getSegmentMetadata().getCrc()); } return ResourceUtils.convertToJsonString(segmentCrcForTable); } catch (Exception e) { @@ -429,6 +430,7 @@ public class TablesResource { @PathParam("tableNameWithType") String tableNameWithType, @ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") @Encoded String segmentName, @Context HttpHeaders httpHeaders) { + segmentName = URIUtils.decode(segmentName); LOGGER.info("Received a request to download validDocIds for segment {} table {}", segmentName, tableNameWithType); // Validate data access ServerResourceUtils.validateDataAccess(_accessControlFactory, tableNameWithType, httpHeaders); @@ -466,6 +468,60 @@ public class TablesResource { } } + @GET + @Path("/tables/{tableNameWithType}/validDocIdMetadata") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Provides segment validDocId metadata", notes = "Provides segment validDocId metadata") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Success"), + @ApiResponse(code = 500, message = "Internal server error", response = ErrorInfo.class), + @ApiResponse(code = 404, message = "Table or segment not found", response = ErrorInfo.class) + }) + public String getValidDocIdMetadata( + @ApiParam(value = "Table name including type", required = true, example = "myTable_REALTIME") + @PathParam("tableNameWithType") String tableNameWithType, + @ApiParam(value = "Segment name", allowMultiple = true, required = true) @QueryParam("segmentNames") + List<String> segmentNames) { + TableDataManager tableDataManager = + ServerResourceUtils.checkGetTableDataManager(_serverInstance, tableNameWithType); + List<String> missingSegments = new ArrayList<>(); + List<SegmentDataManager> segmentDataManagers = tableDataManager.acquireSegments(segmentNames, missingSegments); + if (!missingSegments.isEmpty()) { + throw new WebApplicationException(String.format("Table %s has missing segments", tableNameWithType), + Response.Status.NOT_FOUND); + } + List<Map<String, Object>> allValidDocIdMetadata = new ArrayList<>(); + for (SegmentDataManager segmentDataManager : segmentDataManagers) { + try { + IndexSegment indexSegment = segmentDataManager.getSegment(); + if (!(indexSegment instanceof ImmutableSegmentImpl)) { + throw new WebApplicationException( + String.format("Table %s segment %s is not a immutable segment", tableNameWithType, + segmentDataManager.getSegmentName()), Response.Status.BAD_REQUEST); + } + MutableRoaringBitmap validDocIds = + indexSegment.getValidDocIds() != null ? indexSegment.getValidDocIds().getMutableRoaringBitmap() : null; + if (validDocIds == null) { + throw new WebApplicationException( + String.format("Missing validDocIds for table %s segment %s does not exist", tableNameWithType, + segmentDataManager.getSegmentName()), Response.Status.NOT_FOUND); + } + Map<String, Object> validDocIdMetadata = new HashMap<>(); + int totalDocs = indexSegment.getSegmentMetadata().getTotalDocs(); + int totalValidDocs = validDocIds.getCardinality(); + int totalInvalidDocs = totalDocs - totalValidDocs; + validDocIdMetadata.put("segmentName", segmentDataManager.getSegmentName()); + validDocIdMetadata.put("totalDocs", totalDocs); + validDocIdMetadata.put("totalValidDocs", totalValidDocs); + validDocIdMetadata.put("totalInvalidDocs", totalInvalidDocs); + allValidDocIdMetadata.add(validDocIdMetadata); + } finally { + tableDataManager.releaseSegment(segmentDataManager); + } + } + return ResourceUtils.convertToJsonString(allValidDocIdMetadata); + } + /** * Upload a low level consumer segment to segment store and return the segment download url. This endpoint is used * when segment store copy is unavailable for committed low level consumer segments. @@ -493,7 +549,7 @@ public class TablesResource { }) public String uploadLLCSegment( @ApiParam(value = "Name of the REALTIME table", required = true) @PathParam("realtimeTableName") - String realtimeTableName, + String realtimeTableName, @ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") String segmentName, @QueryParam("uploadTimeoutMs") @DefaultValue("-1") int timeoutMs) throws Exception { @@ -561,13 +617,13 @@ public class TablesResource { @GET @Path("tables/{realtimeTableName}/consumingSegmentsInfo") @Produces(MediaType.APPLICATION_JSON) - @ApiOperation(value = "Get the info for consumers of this REALTIME table", - notes = "Get consumers info from the table data manager. Note that the partitionToOffsetMap has been deprecated " + @ApiOperation(value = "Get the info for consumers of this REALTIME table", notes = + "Get consumers info from the table data manager. Note that the partitionToOffsetMap has been deprecated " + "and will be removed in the next release. The info is now embedded within each partition's state as " + "currentOffsetsMap") public List<SegmentConsumerInfo> getConsumingSegmentsInfo( @ApiParam(value = "Name of the REALTIME table", required = true) @PathParam("realtimeTableName") - String realtimeTableName) { + String realtimeTableName) { TableType tableType = TableNameBuilder.getTableTypeFromTableName(realtimeTableName); if (TableType.OFFLINE == tableType) { throw new WebApplicationException("Cannot get consuming segment info for OFFLINE table: " + realtimeTableName); @@ -590,18 +646,14 @@ public class TablesResource { recordsLagMap.put(k, v.getRecordsLag()); availabilityLagMsMap.put(k, v.getAvailabilityLagMs()); }); - @Deprecated Map<String, String> partitiionToOffsetMap = - realtimeSegmentDataManager.getPartitionToCurrentOffset(); - segmentConsumerInfoList.add( - new SegmentConsumerInfo(segmentDataManager.getSegmentName(), - realtimeSegmentDataManager.getConsumerState().toString(), - realtimeSegmentDataManager.getLastConsumedTimestamp(), - partitiionToOffsetMap, - new SegmentConsumerInfo.PartitionOffsetInfo( - partitiionToOffsetMap, - partitionStateMap.entrySet().stream().collect( - Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getUpstreamLatestOffset().toString()) - ), recordsLagMap, availabilityLagMsMap))); + @Deprecated + Map<String, String> partitiionToOffsetMap = realtimeSegmentDataManager.getPartitionToCurrentOffset(); + segmentConsumerInfoList.add(new SegmentConsumerInfo(segmentDataManager.getSegmentName(), + realtimeSegmentDataManager.getConsumerState().toString(), + realtimeSegmentDataManager.getLastConsumedTimestamp(), partitiionToOffsetMap, + new SegmentConsumerInfo.PartitionOffsetInfo(partitiionToOffsetMap, partitionStateMap.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getUpstreamLatestOffset().toString())), + recordsLagMap, availabilityLagMsMap))); } } } catch (Exception e) { diff --git a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java index bb912dd9e0..7e7c4b7fba 100644 --- a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java +++ b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java @@ -113,29 +113,30 @@ public class TablesResourceTest extends BaseResourceTest { public void getTableMetadata() throws Exception { for (TableType tableType : TableType.values()) { - String tableMetadataPath = "/tables/" + TableNameBuilder.forType(tableType).tableNameWithType(TABLE_NAME) - + "/metadata"; + String tableMetadataPath = + "/tables/" + TableNameBuilder.forType(tableType).tableNameWithType(TABLE_NAME) + "/metadata"; JsonNode jsonResponse = JsonUtils.stringToJsonNode(_webTarget.path(tableMetadataPath).request().get(String.class)); TableMetadataInfo metadataInfo = JsonUtils.jsonNodeToObject(jsonResponse, TableMetadataInfo.class); Assert.assertNotNull(metadataInfo); - Assert.assertEquals(metadataInfo.getTableName(), TableNameBuilder.forType(tableType) - .tableNameWithType(TABLE_NAME)); + Assert.assertEquals(metadataInfo.getTableName(), + TableNameBuilder.forType(tableType).tableNameWithType(TABLE_NAME)); Assert.assertEquals(metadataInfo.getColumnLengthMap().size(), 0); Assert.assertEquals(metadataInfo.getColumnCardinalityMap().size(), 0); Assert.assertEquals(metadataInfo.getColumnIndexSizeMap().size(), 0); - jsonResponse = JsonUtils.stringToJsonNode(_webTarget.path(tableMetadataPath) - .queryParam("columns", "column1").queryParam("columns", "column2").request().get(String.class)); + jsonResponse = JsonUtils.stringToJsonNode( + _webTarget.path(tableMetadataPath).queryParam("columns", "column1").queryParam("columns", "column2").request() + .get(String.class)); metadataInfo = JsonUtils.jsonNodeToObject(jsonResponse, TableMetadataInfo.class); Assert.assertEquals(metadataInfo.getColumnLengthMap().size(), 2); Assert.assertEquals(metadataInfo.getColumnCardinalityMap().size(), 2); Assert.assertEquals(metadataInfo.getColumnIndexSizeMap().size(), 2); - Assert.assertTrue(metadataInfo.getColumnIndexSizeMap().get("column1") - .containsKey(StandardIndexes.dictionary().getId())); - Assert.assertTrue(metadataInfo.getColumnIndexSizeMap().get("column2") - .containsKey(StandardIndexes.forward().getId())); + Assert.assertTrue( + metadataInfo.getColumnIndexSizeMap().get("column1").containsKey(StandardIndexes.dictionary().getId())); + Assert.assertTrue( + metadataInfo.getColumnIndexSizeMap().get("column2").containsKey(StandardIndexes.forward().getId())); } // No such table @@ -148,9 +149,8 @@ public class TablesResourceTest extends BaseResourceTest { public void testSegmentMetadata() throws Exception { IndexSegment defaultSegment = _realtimeIndexSegments.get(0); - String segmentMetadataPath = - "/tables/" + TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME) + "/segments/" + defaultSegment - .getSegmentName() + "/metadata"; + String segmentMetadataPath = "/tables/" + TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME) + "/segments/" + + defaultSegment.getSegmentName() + "/metadata"; JsonNode jsonResponse = JsonUtils.stringToJsonNode(_webTarget.path(segmentMetadataPath).request().get(String.class)); @@ -188,8 +188,8 @@ public class TablesResourceTest extends BaseResourceTest { .get(Response.class); Assert.assertEquals(response.getStatus(), Response.Status.NOT_FOUND.getStatusCode()); - response = _webTarget - .path("/tables/" + TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME) + "/segments/UNKNOWN_SEGMENT") + response = _webTarget.path( + "/tables/" + TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME) + "/segments/UNKNOWN_SEGMENT") .request().get(Response.class); Assert.assertEquals(response.getStatus(), Response.Status.NOT_FOUND.getStatusCode()); } @@ -229,8 +229,8 @@ public class TablesResourceTest extends BaseResourceTest { Response response = _webTarget.path("/tables/UNKNOWN_REALTIME/segments/segmentname").request().get(Response.class); Assert.assertEquals(response.getStatus(), Response.Status.NOT_FOUND.getStatusCode()); - response = _webTarget - .path("/tables/" + TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME) + "/segments/UNKNOWN_SEGMENT") + response = _webTarget.path( + "/tables/" + TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME) + "/segments/UNKNOWN_SEGMENT") .request().get(Response.class); Assert.assertEquals(response.getStatus(), Response.Status.NOT_FOUND.getStatusCode()); } @@ -243,8 +243,8 @@ public class TablesResourceTest extends BaseResourceTest { (ImmutableSegmentImpl) _realtimeIndexSegments.get(0)); // Verify non-existent table and segment download return NOT_FOUND status. - Response response = _webTarget.path("/tables/UNKNOWN_REALTIME/segments/segmentname/validDocIds").request() - .get(Response.class); + Response response = + _webTarget.path("/tables/UNKNOWN_REALTIME/segments/segmentname/validDocIds").request().get(Response.class); Assert.assertEquals(response.getStatus(), Response.Status.NOT_FOUND.getStatusCode()); response = _webTarget.path( @@ -253,6 +253,26 @@ public class TablesResourceTest extends BaseResourceTest { Assert.assertEquals(response.getStatus(), Response.Status.NOT_FOUND.getStatusCode()); } + @Test + public void testValidDocIdMetadata() + throws IOException { + IndexSegment segment = _realtimeIndexSegments.get(0); + // Verify the content of the downloaded snapshot from a realtime table. + downLoadAndVerifyValidDocIdsSnapshot(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME), + (ImmutableSegmentImpl) segment); + + String validDocIdMetadataPath = + "/tables/" + TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME) + "/validDocIdMetadata"; + String metadataResponse = + _webTarget.path(validDocIdMetadataPath).queryParam("segmentNames", segment.getSegmentName()).request() + .get(String.class); + JsonNode validDocIdMetadata = JsonUtils.stringToJsonNode(metadataResponse).get(0); + + Assert.assertEquals(validDocIdMetadata.get("totalDocs").asInt(), 100000); + Assert.assertEquals(validDocIdMetadata.get("totalValidDocs").asInt(), 8); + Assert.assertEquals(validDocIdMetadata.get("totalInvalidDocs").asInt(), 99992); + } + // Verify metadata file from segments. private void downLoadAndVerifySegmentContent(String tableNameWithType, IndexSegment segment) throws IOException { @@ -290,7 +310,7 @@ public class TablesResourceTest extends BaseResourceTest { PartitionUpsertMetadataManager upsertMetadataManager = mock(PartitionUpsertMetadataManager.class); ThreadSafeMutableRoaringBitmap validDocIds = new ThreadSafeMutableRoaringBitmap(); int[] docIds = new int[]{1, 4, 6, 10, 15, 17, 18, 20}; - for (int docId: docIds) { + for (int docId : docIds) { validDocIds.add(docId); } segment.enableUpsert(upsertMetadataManager, validDocIds, null); @@ -315,21 +335,21 @@ public class TablesResourceTest extends BaseResourceTest { _realtimeIndexSegments); // Verify segment uploading succeed. - Response response = _webTarget.path(String - .format("/segments/%s/%s/upload", TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME), + Response response = _webTarget.path( + String.format("/segments/%s/%s/upload", TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME), LLC_SEGMENT_NAME_FOR_UPLOAD_SUCCESS)).request().post(null); Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode()); Assert.assertEquals(response.readEntity(String.class), SEGMENT_DOWNLOAD_URL); // Verify bad request: table type is offline - response = _webTarget.path(String - .format("/segments/%s/%s/upload", TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME), + response = _webTarget.path( + String.format("/segments/%s/%s/upload", TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME), _offlineIndexSegments.get(0).getSegmentName())).request().post(null); Assert.assertEquals(response.getStatus(), Response.Status.BAD_REQUEST.getStatusCode()); // Verify bad request: segment is not low level consumer segment - response = _webTarget.path(String - .format("/segments/%s/%s/upload", TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME), + response = _webTarget.path( + String.format("/segments/%s/%s/upload", TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME), _realtimeIndexSegments.get(0).getSegmentName())).request().post(null); Assert.assertEquals(response.getStatus(), Response.Status.BAD_REQUEST.getStatusCode()); @@ -349,9 +369,8 @@ public class TablesResourceTest extends BaseResourceTest { public void testOfflineTableSegmentMetadata() throws Exception { IndexSegment defaultSegment = _offlineIndexSegments.get(0); - String segmentMetadataPath = - "/tables/" + TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME) + "/segments/" + defaultSegment - .getSegmentName() + "/metadata"; + String segmentMetadataPath = "/tables/" + TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME) + "/segments/" + + defaultSegment.getSegmentName() + "/metadata"; JsonNode jsonResponse = JsonUtils.stringToJsonNode(_webTarget.path(segmentMetadataPath).request().get(String.class)); @@ -383,8 +402,8 @@ public class TablesResourceTest extends BaseResourceTest { .get(Response.class); Assert.assertEquals(response.getStatus(), Response.Status.NOT_FOUND.getStatusCode()); - response = _webTarget - .path("/tables/" + TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME) + "/segments/UNKNOWN_SEGMENT") + response = _webTarget.path( + "/tables/" + TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME) + "/segments/UNKNOWN_SEGMENT") .request().get(Response.class); Assert.assertEquals(response.getStatus(), Response.Status.NOT_FOUND.getStatusCode()); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org