This is an automated email from the ASF dual-hosted git repository. kharekartik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 21f3d283d4 Allow server level configuration for Upsert metadata class (#11851) 21f3d283d4 is described below commit 21f3d283d42e1b4f5b16f3ef84549eb34b8b7031 Author: Kartik Khare <kharekar...@gmail.com> AuthorDate: Wed Jan 17 16:40:47 2024 +0530 Allow server level configuration for Upsert metadata class (#11851) * Allow server level configuration for Upsert metadata class * Fix null pointer exceptions * Add tests * Fix tests * fix tests * Fix tests * Move logic to parse upsert configs from instance data manager to table upsert factory * Address review comments --------- Co-authored-by: Kartik Khare <kharekar...@kartiks-macbook-pro.tail8a064.ts.net> --- .../manager/realtime/RealtimeTableDataManager.java | 8 +- .../tests/UpsertTableIntegrationTest.java | 121 ++++++++++++++++----- .../UpsertTableSegmentPreloadIntegrationTest.java | 11 +- .../models/DummyTableUpsertMetadataManager.java | 115 ++++++++++++++++++++ .../upsert/BaseTableUpsertMetadataManager.java | 1 + .../upsert/TableUpsertMetadataManagerFactory.java | 28 ++++- .../MutableSegmentImplUpsertComparisonColTest.java | 3 +- .../mutable/MutableSegmentImplUpsertTest.java | 3 +- .../helix/HelixInstanceDataManagerConfig.java | 7 ++ .../config/instance/InstanceDataManagerConfig.java | 2 + 10 files changed, 265 insertions(+), 34 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index 60cd58199f..333be09b0c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -207,7 +207,8 @@ public class RealtimeTableDataManager extends BaseTableDataManager { Preconditions.checkState(schema != null, "Failed to find schema for table: %s", _tableNameWithType); // NOTE: Set _tableUpsertMetadataManager before initializing it because when preloading is enabled, we need to // load segments into it - _tableUpsertMetadataManager = TableUpsertMetadataManagerFactory.create(tableConfig); + _tableUpsertMetadataManager = TableUpsertMetadataManagerFactory.create(tableConfig, + _tableDataManagerConfig.getInstanceDataManagerConfig().getUpsertConfigs()); _tableUpsertMetadataManager.init(tableConfig, schema, this, _helixManager, _segmentPreloadExecutor); } @@ -697,6 +698,11 @@ public class RealtimeTableDataManager extends BaseTableDataManager { return _instanceId; } + @VisibleForTesting + public TableUpsertMetadataManager getTableUpsertMetadataManager() { + return _tableUpsertMetadataManager; + } + /** * Validate a schema against the table config for real-time record consumption. * Ideally, we should validate these things when schema is added or table is created, but either of these diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java index e140020a39..634390effc 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java @@ -18,20 +18,36 @@ */ package org.apache.pinot.integration.tests; +import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import javax.annotation.Nullable; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.helix.HelixManager; +import org.apache.helix.model.InstanceConfig; import org.apache.pinot.client.ResultSet; +import org.apache.pinot.common.utils.config.TagNameUtils; +import org.apache.pinot.common.utils.helix.HelixHelper; +import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager; +import org.apache.pinot.integration.tests.models.DummyTableUpsertMetadataManager; +import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManagerFactory; +import org.apache.pinot.server.starter.helix.BaseServerStarter; +import org.apache.pinot.server.starter.helix.HelixInstanceDataManagerConfig; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.table.TenantConfig; import org.apache.pinot.spi.config.table.UpsertConfig; import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.util.TestUtils; import org.testng.Assert; @@ -82,8 +98,9 @@ public class UpsertTableIntegrationTest extends BaseClusterIntegrationTestSet { addSchema(schema); Map<String, String> csvDecoderProperties = getCSVDecoderProperties(CSV_DELIMITER, CSV_SCHEMA_HEADER); - TableConfig tableConfig = createCSVUpsertTableConfig(getTableName(), getKafkaTopic(), - getNumKafkaPartitions(), csvDecoderProperties, null, PRIMARY_KEY_COL); + TableConfig tableConfig = + createCSVUpsertTableConfig(getTableName(), getKafkaTopic(), getNumKafkaPartitions(), csvDecoderProperties, null, + PRIMARY_KEY_COL); addTableConfig(tableConfig); // Wait for all documents loaded @@ -136,8 +153,7 @@ public class UpsertTableIntegrationTest extends BaseClusterIntegrationTestSet { private Schema createSchema(String schemaFileName) throws IOException { - InputStream inputStream = - BaseClusterIntegrationTest.class.getClassLoader().getResourceAsStream(schemaFileName); + InputStream inputStream = BaseClusterIntegrationTest.class.getClassLoader().getResourceAsStream(schemaFileName); Assert.assertNotNull(inputStream); return Schema.fromInputStream(inputStream); } @@ -181,8 +197,9 @@ public class UpsertTableIntegrationTest extends BaseClusterIntegrationTestSet { Schema upsertSchema = createSchema(); upsertSchema.setSchemaName(tableName); addSchema(upsertSchema); - TableConfig tableConfig = createCSVUpsertTableConfig(tableName, kafkaTopicName, - getNumKafkaPartitions(), csvDecoderProperties, upsertConfig, PRIMARY_KEY_COL); + TableConfig tableConfig = + createCSVUpsertTableConfig(tableName, kafkaTopicName, getNumKafkaPartitions(), csvDecoderProperties, + upsertConfig, PRIMARY_KEY_COL); addTableConfig(tableConfig); // Push initial 10 upsert records - 3 pks 100, 101 and 102 @@ -222,9 +239,8 @@ public class UpsertTableIntegrationTest extends BaseClusterIntegrationTestSet { Assert.assertEquals(rs.getString(0, playerIdColumnIndex), "101"); // Validate deleted records - rs = getPinotConnection() - .execute("SELECT playerId FROM " + tableName - + " WHERE deleted = true OPTION(skipUpsert=true)").getResultSet(0); + rs = getPinotConnection().execute( + "SELECT playerId FROM " + tableName + " WHERE deleted = true OPTION(skipUpsert=true)").getResultSet(0); Assert.assertEquals(rs.getRowCount(), 2); for (int i = 0; i < rs.getRowCount(); i++) { String playerId = rs.getString(i, 0); @@ -245,17 +261,16 @@ public class UpsertTableIntegrationTest extends BaseClusterIntegrationTestSet { }, 100L, 600_000L, "Failed to load all upsert records for testDeleteWithFullUpsert"); // Validate: pk is queryable and all columns are overwritten with new value - rs = getPinotConnection() - .execute("SELECT playerId, name, game FROM " + tableName + " WHERE playerId = 100").getResultSet(0); + rs = getPinotConnection().execute("SELECT playerId, name, game FROM " + tableName + " WHERE playerId = 100") + .getResultSet(0); Assert.assertEquals(rs.getRowCount(), 1); Assert.assertEquals(rs.getInt(0, 0), 100); Assert.assertEquals(rs.getString(0, 1), "Zook-New"); Assert.assertEquals(rs.getString(0, 2), "null"); // Validate: pk lineage still exists - rs = getPinotConnection() - .execute("SELECT playerId, name FROM " + tableName - + " WHERE playerId = 100 OPTION(skipUpsert=true)").getResultSet(0); + rs = getPinotConnection().execute( + "SELECT playerId, name FROM " + tableName + " WHERE playerId = 100 OPTION(skipUpsert=true)").getResultSet(0); Assert.assertTrue(rs.getRowCount() > 1); @@ -269,8 +284,8 @@ public class UpsertTableIntegrationTest extends BaseClusterIntegrationTestSet { final UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.PARTIAL); upsertConfig.setDeleteRecordColumn(DELETE_COL); - testDeleteWithPartialUpsert(getKafkaTopic() + "-partial-upsert-with-deletes", - "gameScoresPartialUpsertWithDelete", upsertConfig); + testDeleteWithPartialUpsert(getKafkaTopic() + "-partial-upsert-with-deletes", "gameScoresPartialUpsertWithDelete", + upsertConfig); } protected void testDeleteWithPartialUpsert(String kafkaTopicName, String tableName, UpsertConfig upsertConfig) @@ -288,8 +303,9 @@ public class UpsertTableIntegrationTest extends BaseClusterIntegrationTestSet { Schema partialUpsertSchema = createSchema(PARTIAL_UPSERT_TABLE_SCHEMA); partialUpsertSchema.setSchemaName(tableName); addSchema(partialUpsertSchema); - TableConfig tableConfig = createCSVUpsertTableConfig(tableName, kafkaTopicName, - getNumKafkaPartitions(), csvDecoderProperties, upsertConfig, PRIMARY_KEY_COL); + TableConfig tableConfig = + createCSVUpsertTableConfig(tableName, kafkaTopicName, getNumKafkaPartitions(), csvDecoderProperties, + upsertConfig, PRIMARY_KEY_COL); addTableConfig(tableConfig); // Push initial 10 upsert records - 3 pks 100, 101 and 102 @@ -329,9 +345,8 @@ public class UpsertTableIntegrationTest extends BaseClusterIntegrationTestSet { Assert.assertEquals(rs.getString(0, playerIdColumnIndex), "101"); // Validate deleted records - rs = getPinotConnection() - .execute("SELECT playerId FROM " + tableName - + " WHERE deleted = true OPTION(skipUpsert=true)").getResultSet(0); + rs = getPinotConnection().execute( + "SELECT playerId FROM " + tableName + " WHERE deleted = true OPTION(skipUpsert=true)").getResultSet(0); Assert.assertEquals(rs.getRowCount(), 2); for (int i = 0; i < rs.getRowCount(); i++) { String playerId = rs.getString(i, 0); @@ -352,22 +367,74 @@ public class UpsertTableIntegrationTest extends BaseClusterIntegrationTestSet { }, 100L, 600_000L, "Failed to load all upsert records for testDeleteWithFullUpsert"); // Validate: pk is queryable and all columns are overwritten with new value - rs = getPinotConnection() - .execute("SELECT playerId, name, game FROM " + tableName - + " WHERE playerId = 100").getResultSet(0); + rs = getPinotConnection().execute("SELECT playerId, name, game FROM " + tableName + " WHERE playerId = 100") + .getResultSet(0); Assert.assertEquals(rs.getRowCount(), 1); Assert.assertEquals(rs.getInt(0, 0), 100); Assert.assertEquals(rs.getString(0, 1), "Zook"); Assert.assertEquals(rs.getString(0, 2), "[\"null\"]"); // Validate: pk lineage still exists - rs = getPinotConnection() - .execute("SELECT playerId, name FROM " + tableName - + " WHERE playerId = 100 OPTION(skipUpsert=true)").getResultSet(0); + rs = getPinotConnection().execute( + "SELECT playerId, name FROM " + tableName + " WHERE playerId = 100 OPTION(skipUpsert=true)").getResultSet(0); Assert.assertTrue(rs.getRowCount() > 1); // TEARDOWN dropRealtimeTable(tableName); } + + @Test + public void testDefaultMetadataManagerClass() + throws Exception { + PinotConfiguration config = getServerConf(12345); + config.setProperty(Joiner.on(".").join(CommonConstants.Server.INSTANCE_DATA_MANAGER_CONFIG_PREFIX, + HelixInstanceDataManagerConfig.PREFIX_OF_CONFIG_OF_UPSERT, + TableUpsertMetadataManagerFactory.UPSERT_DEFAULT_METADATA_MANAGER_CLASS), + DummyTableUpsertMetadataManager.class.getName()); + + BaseServerStarter serverStarter = null; + try { + serverStarter = startOneServer(config); + HelixManager helixManager = serverStarter.getServerInstance().getHelixManager(); + InstanceConfig instanceConfig = HelixHelper.getInstanceConfig(helixManager, serverStarter.getInstanceId()); + // updateInstanceTags + String tagsString = "DummyTag_REALTIME,DummyTag_OFFLINE"; + List<String> newTags = Arrays.asList(StringUtils.split(tagsString, ',')); + instanceConfig.getRecord().setListField(InstanceConfig.InstanceConfigProperty.TAG_LIST.name(), newTags); + + if (!_helixDataAccessor.setProperty(_helixDataAccessor.keyBuilder().instanceConfig(serverStarter.getInstanceId()), + instanceConfig)) { + throw new RuntimeException("Failed to set instance config for instance: " + serverStarter.getInstanceId()); + } + + String dummyTableName = "dummyTable123"; + Map<String, String> csvDecoderProperties = getCSVDecoderProperties(CSV_DELIMITER, CSV_SCHEMA_HEADER); + + TableConfig tableConfig = + createCSVUpsertTableConfig(dummyTableName, getKafkaTopic(), getNumKafkaPartitions(), csvDecoderProperties, + null, PRIMARY_KEY_COL); + + TenantConfig tenantConfig = new TenantConfig(TagNameUtils.DEFAULT_TENANT_NAME, "DummyTag", null); + + tableConfig.setTenantConfig(tenantConfig); + Schema schema = createSchema(); + schema.setSchemaName(dummyTableName); + addSchema(schema); + addTableConfig(tableConfig); + + Thread.sleep(1000L); + RealtimeTableDataManager tableDataManager = + (RealtimeTableDataManager) serverStarter.getServerInstance().getInstanceDataManager() + .getTableDataManager(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(dummyTableName)); + Assert.assertTrue(tableDataManager.getTableUpsertMetadataManager() instanceof DummyTableUpsertMetadataManager); + dropRealtimeTable(dummyTableName); + deleteSchema(dummyTableName); + waitForEVToDisappear(dummyTableName); + } finally { + if (serverStarter != null) { + serverStarter.stop(); + } + } + } } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java index b88f5fbdab..f10a58bf72 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.integration.tests; +import com.google.common.base.Joiner; import java.io.File; import java.io.IOException; import java.util.List; @@ -26,8 +27,10 @@ import org.apache.commons.io.FileUtils; import org.apache.helix.model.IdealState; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.common.utils.helix.HelixHelper; +import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManagerFactory; import org.apache.pinot.segment.spi.V1Constants; import org.apache.pinot.server.starter.helix.BaseServerStarter; +import org.apache.pinot.server.starter.helix.HelixInstanceDataManagerConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.UpsertConfig; @@ -87,8 +90,6 @@ public class UpsertTableSegmentPreloadIntegrationTest extends BaseClusterIntegra TableConfig tableConfig = createUpsertTableConfig(avroFiles.get(0), PRIMARY_KEY_COL, null, getNumKafkaPartitions()); UpsertConfig upsertConfig = tableConfig.getUpsertConfig(); assertNotNull(upsertConfig); - upsertConfig.setEnableSnapshot(true); - upsertConfig.setEnablePreload(true); addTableConfig(tableConfig); // Create and upload segments @@ -104,6 +105,12 @@ public class UpsertTableSegmentPreloadIntegrationTest extends BaseClusterIntegra protected void overrideServerConf(PinotConfiguration serverConf) { serverConf.setProperty(CommonConstants.Server.INSTANCE_DATA_MANAGER_CONFIG_PREFIX + ".max.segment.preload.threads", "1"); + serverConf.setProperty(Joiner.on(".").join(CommonConstants.Server.INSTANCE_DATA_MANAGER_CONFIG_PREFIX, + HelixInstanceDataManagerConfig.PREFIX_OF_CONFIG_OF_UPSERT, + TableUpsertMetadataManagerFactory.UPSERT_DEFAULT_ENABLE_SNAPSHOT), "true"); + serverConf.setProperty(Joiner.on(".").join(CommonConstants.Server.INSTANCE_DATA_MANAGER_CONFIG_PREFIX, + HelixInstanceDataManagerConfig.PREFIX_OF_CONFIG_OF_UPSERT, + TableUpsertMetadataManagerFactory.UPSERT_DEFAULT_ENABLE_PRELOAD), "true"); } @AfterClass diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/models/DummyTableUpsertMetadataManager.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/models/DummyTableUpsertMetadataManager.java new file mode 100644 index 0000000000..502834a6b6 --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/models/DummyTableUpsertMetadataManager.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests.models; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; +import java.util.concurrent.ExecutorService; +import org.apache.helix.HelixManager; +import org.apache.pinot.segment.local.data.manager.TableDataManager; +import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl; +import org.apache.pinot.segment.local.upsert.BasePartitionUpsertMetadataManager; +import org.apache.pinot.segment.local.upsert.BaseTableUpsertMetadataManager; +import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager; +import org.apache.pinot.segment.local.upsert.RecordInfo; +import org.apache.pinot.segment.local.upsert.UpsertContext; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.segment.spi.MutableSegment; +import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap; +import org.apache.pinot.spi.config.table.HashFunction; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.roaringbitmap.buffer.MutableRoaringBitmap; + + +public class DummyTableUpsertMetadataManager extends BaseTableUpsertMetadataManager { + + private TableConfig _tableConfig; + private Schema _schema; + + public DummyTableUpsertMetadataManager() { + super(); + } + + @Override + public void init(TableConfig tableConfig, Schema schema, TableDataManager tableDataManager, HelixManager helixManager, + @org.jetbrains.annotations.Nullable ExecutorService segmentPreloadExecutor) { + super.init(tableConfig, schema, tableDataManager, helixManager, segmentPreloadExecutor); + _tableConfig = tableConfig; + _schema = schema; + } + + @Override + public PartitionUpsertMetadataManager getOrCreatePartitionManager(int partitionId) { + UpsertContext context = new UpsertContext.Builder().setTableConfig(_tableConfig).setSchema(_schema) + .setPrimaryKeyColumns(_schema.getPrimaryKeyColumns()) + .setComparisonColumns(Collections.singletonList(_tableConfig.getValidationConfig().getTimeColumnName())) + .setHashFunction(HashFunction.NONE).setTableIndexDir(new File("/tmp/tableIndexDirDummy")).build(); + + return new DummyPartitionUpsertMetadataManager("dummy", partitionId, context); + } + + @Override + public void stop() { + } + + @Override + public void close() + throws IOException { + } + + class DummyPartitionUpsertMetadataManager extends BasePartitionUpsertMetadataManager { + public DummyPartitionUpsertMetadataManager(String tableNameWithType, int partitionId, UpsertContext context) { + super(tableNameWithType, partitionId, context); + } + + @Override + protected long getNumPrimaryKeys() { + return 0; + } + + @Override + protected void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutableRoaringBitmap validDocIds, + @org.jetbrains.annotations.Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, + Iterator<RecordInfo> recordInfoIterator, @org.jetbrains.annotations.Nullable IndexSegment oldSegment, + @org.jetbrains.annotations.Nullable MutableRoaringBitmap validDocIdsForOldSegment) { + } + + @Override + protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) { + return false; + } + + @Override + protected void removeSegment(IndexSegment segment, MutableRoaringBitmap validDocIds) { + } + + @Override + protected GenericRow doUpdateRecord(GenericRow record, RecordInfo recordInfo) { + return null; + } + + @Override + protected void doRemoveExpiredPrimaryKeys() { + } + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java index 845930a54c..72f57d4086 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java @@ -116,6 +116,7 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad initCustomVariables(); if (enableSnapshot && enablePreload && segmentPreloadExecutor != null) { + // Preloading the segments with snapshots for fast upsert metadata recovery. // Note that there is an implicit waiting logic between the thread doing the segment preloading here and the // other helix threads about to process segment state transitions (e.g. taking segments from OFFLINE to ONLINE). diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java index 1750aaefc9..1ee4a834e1 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java @@ -19,9 +19,11 @@ package org.apache.pinot.segment.local.upsert; import com.google.common.base.Preconditions; +import javax.annotation.Nullable; import org.apache.commons.lang3.StringUtils; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.UpsertConfig; +import org.apache.pinot.spi.env.PinotConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,20 +33,42 @@ public class TableUpsertMetadataManagerFactory { } private static final Logger LOGGER = LoggerFactory.getLogger(TableUpsertMetadataManagerFactory.class); + public static final String UPSERT_DEFAULT_METADATA_MANAGER_CLASS = "default.metadata.manager.class"; + public static final String UPSERT_DEFAULT_ENABLE_SNAPSHOT = "default.enable.snapshot"; + public static final String UPSERT_DEFAULT_ENABLE_PRELOAD = "default.enable.preload"; - public static TableUpsertMetadataManager create(TableConfig tableConfig) { + public static TableUpsertMetadataManager create(TableConfig tableConfig, + @Nullable PinotConfiguration instanceUpsertConfigs) { String tableNameWithType = tableConfig.getTableName(); UpsertConfig upsertConfig = tableConfig.getUpsertConfig(); Preconditions.checkArgument(upsertConfig != null, "Must provide upsert config for table: %s", tableNameWithType); TableUpsertMetadataManager metadataManager; String metadataManagerClass = upsertConfig.getMetadataManagerClass(); + + if (instanceUpsertConfigs != null) { + if (metadataManagerClass == null) { + metadataManagerClass = instanceUpsertConfigs.getProperty(UPSERT_DEFAULT_METADATA_MANAGER_CLASS); + } + // Server level config honoured only when table level config is not set to true + if (!upsertConfig.isEnableSnapshot()) { + upsertConfig.setEnableSnapshot( + Boolean.parseBoolean(instanceUpsertConfigs.getProperty(UPSERT_DEFAULT_ENABLE_SNAPSHOT, "false"))); + } + + // Server level config honoured only when table level config is not set to true + if (!upsertConfig.isEnablePreload()) { + upsertConfig.setEnablePreload( + Boolean.parseBoolean(instanceUpsertConfigs.getProperty(UPSERT_DEFAULT_ENABLE_PRELOAD, "false"))); + } + } + if (StringUtils.isNotEmpty(metadataManagerClass)) { LOGGER.info("Creating TableUpsertMetadataManager with class: {} for table: {}", metadataManagerClass, tableNameWithType); try { metadataManager = - (TableUpsertMetadataManager) Class.forName(metadataManagerClass).getConstructor().newInstance(); + (TableUpsertMetadataManager) Class.forName(metadataManagerClass).newInstance(); } catch (Exception e) { throw new RuntimeException( String.format("Caught exception while constructing TableUpsertMetadataManager with class: %s for table: %s", diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java index 38bc66ade5..d3508b7540 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java @@ -86,7 +86,8 @@ public class MutableSegmentImplUpsertComparisonColTest { _schema = Schema.fromFile(new File(schemaResourceUrl.getFile())); _recordTransformer = CompositeTransformer.getDefaultTransformer(_tableConfig, _schema); File jsonFile = new File(dataResourceUrl.getFile()); - TableUpsertMetadataManager tableUpsertMetadataManager = TableUpsertMetadataManagerFactory.create(_tableConfig); + TableUpsertMetadataManager tableUpsertMetadataManager = + TableUpsertMetadataManagerFactory.create(_tableConfig, null); tableUpsertMetadataManager.init(_tableConfig, _schema, _tableDataManager, mock(HelixManager.class), mock(ExecutorService.class)); _partitionUpsertMetadataManager = tableUpsertMetadataManager.getOrCreatePartitionManager(0); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java index 702bd47f80..4f14bdd408 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java @@ -96,7 +96,8 @@ public class MutableSegmentImplUpsertTest { _schema = Schema.fromFile(new File(schemaResourceUrl.getFile())); _recordTransformer = CompositeTransformer.getDefaultTransformer(_tableConfig, _schema); File jsonFile = new File(dataResourceUrl.getFile()); - TableUpsertMetadataManager tableUpsertMetadataManager = TableUpsertMetadataManagerFactory.create(_tableConfig); + TableUpsertMetadataManager tableUpsertMetadataManager = + TableUpsertMetadataManagerFactory.create(_tableConfig, null); tableUpsertMetadataManager.init(_tableConfig, _schema, _tableDataManager, mock(HelixManager.class), mock(ExecutorService.class)); _partitionUpsertMetadataManager = tableUpsertMetadataManager.getOrCreatePartitionManager(0); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java index 9c69e8afd4..c7e15fe106 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java @@ -131,6 +131,8 @@ public class HelixInstanceDataManagerConfig implements InstanceDataManagerConfig private static final String EXTERNAL_VIEW_DROPPED_MAX_WAIT_MS = "external.view.dropped.max.wait.ms"; private static final String EXTERNAL_VIEW_DROPPED_CHECK_INTERVAL_MS = "external.view.dropped.check.interval.ms"; + public static final String PREFIX_OF_CONFIG_OF_UPSERT = "upsert"; + private final static String[] REQUIRED_KEYS = {INSTANCE_ID}; private static final long DEFAULT_ERROR_CACHE_SIZE = 100L; private static final int DEFAULT_DELETED_SEGMENTS_CACHE_SIZE = 10_000; @@ -294,6 +296,11 @@ public class HelixInstanceDataManagerConfig implements InstanceDataManagerConfig DEFAULT_EXTERNAL_VIEW_DROPPED_CHECK_INTERVAL_MS); } + @Override + public PinotConfiguration getUpsertConfigs() { + return _instanceDataManagerConfiguration.subset(PREFIX_OF_CONFIG_OF_UPSERT); + } + @Override public String toString() { String configString = ""; diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java index a97f1639e1..6d035890c9 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java @@ -68,4 +68,6 @@ public interface InstanceDataManagerConfig { long getExternalViewDroppedMaxWaitMs(); long getExternalViewDroppedCheckIntervalMs(); + + PinotConfiguration getUpsertConfigs(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org