klsince commented on code in PR #11851: URL: https://github.com/apache/pinot/pull/11851#discussion_r1454626612
########## pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java: ########## @@ -352,22 +367,74 @@ protected void testDeleteWithPartialUpsert(String kafkaTopicName, String tableNa }, 100L, 600_000L, "Failed to load all upsert records for testDeleteWithFullUpsert"); // Validate: pk is queryable and all columns are overwritten with new value - rs = getPinotConnection() - .execute("SELECT playerId, name, game FROM " + tableName - + " WHERE playerId = 100").getResultSet(0); + rs = getPinotConnection().execute("SELECT playerId, name, game FROM " + tableName + " WHERE playerId = 100") + .getResultSet(0); Assert.assertEquals(rs.getRowCount(), 1); Assert.assertEquals(rs.getInt(0, 0), 100); Assert.assertEquals(rs.getString(0, 1), "Zook"); Assert.assertEquals(rs.getString(0, 2), "[\"null\"]"); // Validate: pk lineage still exists - rs = getPinotConnection() - .execute("SELECT playerId, name FROM " + tableName - + " WHERE playerId = 100 OPTION(skipUpsert=true)").getResultSet(0); + rs = getPinotConnection().execute( + "SELECT playerId, name FROM " + tableName + " WHERE playerId = 100 OPTION(skipUpsert=true)").getResultSet(0); Assert.assertTrue(rs.getRowCount() > 1); // TEARDOWN dropRealtimeTable(tableName); } + + @Test + public void testDefaultMetadataManagerClass() + throws Exception { + PinotConfiguration config = getServerConf(12345); + config.setProperty(Joiner.on(".").join(CommonConstants.Server.INSTANCE_DATA_MANAGER_CONFIG_PREFIX, + HelixInstanceDataManagerConfig.PREFIX_OF_CONFIG_OF_UPSERT, + TableUpsertMetadataManagerFactory.UPSERT_DEFAULT_METADATA_MANAGER_CLASS), + DummyTableUpsertMetadataManager.class.getName()); + + BaseServerStarter serverStarter = null; + try { + serverStarter = startOneServer(config); + HelixManager helixManager = serverStarter.getServerInstance().getHelixManager(); + InstanceConfig instanceConfig = HelixHelper.getInstanceConfig(helixManager, serverStarter.getInstanceId()); + // updateInstanceTags + String tagsString = "DummyTag_REALTIME,DummyTag_OFFLINE"; + List<String> newTags = Arrays.asList(StringUtils.split(tagsString, ',')); + instanceConfig.getRecord().setListField(InstanceConfig.InstanceConfigProperty.TAG_LIST.name(), newTags); + + if (!_helixDataAccessor.setProperty(_helixDataAccessor.keyBuilder().instanceConfig(serverStarter.getInstanceId()), + instanceConfig)) { + throw new RuntimeException("Failed to set instance config for instance: " + serverStarter.getInstanceId()); + } + + String dummyTableName = "dummyTable123"; + Map<String, String> csvDecoderProperties = getCSVDecoderProperties(CSV_DELIMITER, CSV_SCHEMA_HEADER); + + TableConfig tableConfig = + createCSVUpsertTableConfig(dummyTableName, getKafkaTopic(), getNumKafkaPartitions(), csvDecoderProperties, + null, PRIMARY_KEY_COL); + + TenantConfig tenantConfig = new TenantConfig(TagNameUtils.DEFAULT_TENANT_NAME, "DummyTag", null); + + tableConfig.setTenantConfig(tenantConfig); + Schema schema = createSchema(); + schema.setSchemaName(dummyTableName); + addSchema(schema); + addTableConfig(tableConfig); + + Thread.sleep(1000L); Review Comment: how about use the TestUtil.waitTill to assert on the true condition below ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java: ########## @@ -101,6 +101,7 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD double metadataTTL = upsertConfig.getMetadataTTL(); double deletedKeysTTL = upsertConfig.getDeletedKeysTTL(); File tableIndexDir = tableDataManager.getTableDataDir(); + Review Comment: nit: remove the change of adding empty lines ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java: ########## @@ -31,20 +33,45 @@ private TableUpsertMetadataManagerFactory() { } private static final Logger LOGGER = LoggerFactory.getLogger(TableUpsertMetadataManagerFactory.class); + public static final String UPSERT_DEFAULT_METADATA_MANAGER_CLASS = "default.metadata.manager.class"; + public static final String UPSERT_DEFAULT_ENABLE_SNAPSHOT = "default.enable.snapshot"; + public static final String UPSERT_DEFAULT_ENABLE_PRELOAD = "default.enable.preload"; - public static TableUpsertMetadataManager create(TableConfig tableConfig) { + public static TableUpsertMetadataManager create(TableConfig tableConfig, + @Nullable PinotConfiguration instanceUpsertConfigs) { String tableNameWithType = tableConfig.getTableName(); UpsertConfig upsertConfig = tableConfig.getUpsertConfig(); Preconditions.checkArgument(upsertConfig != null, "Must provide upsert config for table: %s", tableNameWithType); TableUpsertMetadataManager metadataManager; - String metadataManagerClass = upsertConfig.getMetadataManagerClass(); + String tableMetadataManagerClass = upsertConfig.getMetadataManagerClass(); + Review Comment: nit: may save a little bit by ``` if (instanceUpsertConfigs != null) { if (metadataManagerClass == null) { metadataManagerClass = instanceUpsertConfigs.getProperty(UPSERT_DEFAULT_METADATA_MANAGER_CLASS); } if (!upsertConfig.isEnableSnapshot()) { ... if (!upsertConfig.isEnablePreload()) { ... } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org