This is an automated email from the ASF dual-hosted git repository. sajjad pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 26e5952d75 Bug Fix: Segment Purger cannot purge old segments after schema evolution (#10869) 26e5952d75 is described below commit 26e5952d75ff5accedd05c5eb9812cf900d3f413 Author: Sajjad Moradi <moradi.saj...@gmail.com> AuthorDate: Wed Jun 7 20:55:29 2023 -0700 Bug Fix: Segment Purger cannot purge old segments after schema evolution (#10869) --- .../controller/helix/ControllerRequestClient.java | 10 ++ .../pinot/controller/helix/ControllerTest.java | 5 + .../apache/pinot/core/minion/SegmentPurger.java | 9 +- .../pinot/core/minion/SegmentPurgerTest.java | 7 +- .../tests/PurgeMinionClusterIntegrationTest.java | 151 +++++++++++++++------ .../minion/tasks/purge/PurgeTaskExecutor.java | 7 +- .../minion/tasks/purge/PurgeTaskExecutorTest.java | 3 + 7 files changed, 140 insertions(+), 52 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java index 2da40ef3e5..e6aa83dea1 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java @@ -81,6 +81,16 @@ public class ControllerRequestClient { } } + public void updateSchema(Schema schema) + throws IOException { + String url = _controllerRequestURLBuilder.forSchemaUpdate(schema.getSchemaName()); + try { + HttpClient.wrapAndThrowHttpException(_httpClient.sendMultipartPutRequest(url, schema.toSingleLineJsonString())); + } catch (HttpErrorStatusException e) { + throw new IOException(e); + } + } + public void deleteSchema(String schemaName) throws IOException { String url = _controllerRequestURLBuilder.forSchemaDelete(schemaName); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java index 41b34a4911..63f575293d 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java @@ -622,6 +622,11 @@ public class ControllerTest { getControllerRequestClient().addSchema(schema); } + public void updateSchema(Schema schema) + throws IOException { + getControllerRequestClient().updateSchema(schema); + } + public Schema getSchema(String schemaName) { Schema schema = _helixResourceManager.getSchema(schemaName); assertNotNull(schema); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java index 2b65508585..4094b6b6e3 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java @@ -28,6 +28,7 @@ import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader; import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.RecordReader; import org.apache.pinot.spi.data.readers.RecordReaderConfig; @@ -45,19 +46,21 @@ public class SegmentPurger { private final File _indexDir; private final File _workingDir; private final TableConfig _tableConfig; + private final Schema _schema; private final RecordPurger _recordPurger; private final RecordModifier _recordModifier; private int _numRecordsPurged; private int _numRecordsModified; - public SegmentPurger(File indexDir, File workingDir, TableConfig tableConfig, @Nullable RecordPurger recordPurger, - @Nullable RecordModifier recordModifier) { + public SegmentPurger(File indexDir, File workingDir, TableConfig tableConfig, Schema schema, + @Nullable RecordPurger recordPurger, @Nullable RecordModifier recordModifier) { Preconditions.checkArgument(recordPurger != null || recordModifier != null, "At least one of record purger and modifier should be non-null"); _indexDir = indexDir; _workingDir = workingDir; _tableConfig = tableConfig; + _schema = schema; _recordPurger = recordPurger; _recordModifier = recordModifier; } @@ -79,7 +82,7 @@ public class SegmentPurger { return null; } - SegmentGeneratorConfig config = new SegmentGeneratorConfig(_tableConfig, segmentMetadata.getSchema()); + SegmentGeneratorConfig config = new SegmentGeneratorConfig(_tableConfig, _schema); config.setOutDir(_workingDir.getPath()); config.setSegmentName(segmentName); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/minion/SegmentPurgerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/minion/SegmentPurgerTest.java index d29c0d94e3..2dd6b37a3e 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/minion/SegmentPurgerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/minion/SegmentPurgerTest.java @@ -67,6 +67,7 @@ public class SegmentPurgerTest { private static final String D2 = "d2"; private TableConfig _tableConfig; + private Schema _schema; private File _originalIndexDir; private int _expectedNumRecordsPurged; private int _expectedNumRecordsModified; @@ -79,7 +80,7 @@ public class SegmentPurgerTest { _tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) .setInvertedIndexColumns(Collections.singletonList(D1)).setCreateInvertedIndexDuringSegmentGeneration(true) .build(); - Schema schema = new Schema.SchemaBuilder().addSingleValueDimension(D1, FieldSpec.DataType.INT) + _schema = new Schema.SchemaBuilder().addSingleValueDimension(D1, FieldSpec.DataType.INT) .addSingleValueDimension(D2, FieldSpec.DataType.INT).build(); List<GenericRow> rows = new ArrayList<>(NUM_ROWS); @@ -98,7 +99,7 @@ public class SegmentPurgerTest { } GenericRowRecordReader genericRowRecordReader = new GenericRowRecordReader(rows); - SegmentGeneratorConfig config = new SegmentGeneratorConfig(_tableConfig, schema); + SegmentGeneratorConfig config = new SegmentGeneratorConfig(_tableConfig, _schema); config.setOutDir(ORIGINAL_SEGMENT_DIR.getPath()); config.setSegmentName(SEGMENT_NAME); @@ -125,7 +126,7 @@ public class SegmentPurgerTest { }; SegmentPurger segmentPurger = - new SegmentPurger(_originalIndexDir, PURGED_SEGMENT_DIR, _tableConfig, recordPurger, recordModifier); + new SegmentPurger(_originalIndexDir, PURGED_SEGMENT_DIR, _tableConfig, _schema, recordPurger, recordModifier); File purgedIndexDir = segmentPurger.purgeSegment(); // Check the purge/modify counter in segment purger diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java index 284e7655b5..c4ba131f6d 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java @@ -18,7 +18,9 @@ */ package org.apache.pinot.integration.tests; +import com.google.common.collect.ImmutableList; import java.io.File; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -34,8 +36,11 @@ import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManag import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; import org.apache.pinot.core.common.MinionConstants; import org.apache.pinot.minion.MinionContext; +import org.apache.pinot.spi.config.table.IndexingConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableTaskConfig; +import org.apache.pinot.spi.data.DimensionFieldSpec; +import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.util.TestUtils; @@ -56,25 +61,21 @@ public class PurgeMinionClusterIntegrationTest extends BaseClusterIntegrationTes private static final String PURGE_FIRST_RUN_TABLE = "myTable1"; private static final String PURGE_DELTA_PASSED_TABLE = "myTable2"; private static final String PURGE_DELTA_NOT_PASSED_TABLE = "myTable3"; + private static final String PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE = "myTable4"; + protected PinotHelixTaskResourceManager _helixTaskResourceManager; protected PinotTaskManager _taskManager; protected PinotHelixResourceManager _pinotHelixResourceManager; protected String _tableName; - protected final File _segmentDir1 = new File(_tempDir, "segmentDir1"); - protected final File _segmentDir2 = new File(_tempDir, "segmentDir2"); - protected final File _segmentDir3 = new File(_tempDir, "segmentDir3"); - - protected final File _tarDir1 = new File(_tempDir, "tarDir1"); - protected final File _tarDir2 = new File(_tempDir, "tarDir2"); - protected final File _tarDir3 = new File(_tempDir, "tarDir3"); + protected final File _segmentDataDir = new File(_tempDir, "segmentDataDir"); + protected final File _segmentTarDir = new File(_tempDir, "segmentTarDir"); @BeforeClass public void setUp() throws Exception { - TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir1, _tarDir1, _segmentDir2, _tarDir2, _segmentDir3, - _tarDir3); + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDataDir, _segmentTarDir); // Start the Pinot cluster startZk(); @@ -82,37 +83,38 @@ public class PurgeMinionClusterIntegrationTest extends BaseClusterIntegrationTes startBrokers(1); startServers(1); - // Create and upload the schema and table config - Schema schema = createSchema(); - addSchema(schema); - setTableName(PURGE_DELTA_NOT_PASSED_TABLE); - TableConfig purgeDeltaNotPassedTableConfig = createOfflineTableConfig(); - purgeDeltaNotPassedTableConfig.setTaskConfig(getPurgeTaskConfig()); - setTableName(PURGE_FIRST_RUN_TABLE); - TableConfig purgeTableConfig = createOfflineTableConfig(); - purgeTableConfig.setTaskConfig(getPurgeTaskConfig()); - - setTableName(PURGE_DELTA_PASSED_TABLE); - TableConfig purgeDeltaPassedTableConfig = createOfflineTableConfig(); - purgeDeltaPassedTableConfig.setTaskConfig(getPurgeTaskConfig()); - - addTableConfig(purgeTableConfig); - addTableConfig(purgeDeltaPassedTableConfig); - addTableConfig(purgeDeltaNotPassedTableConfig); + List<String> allTables = ImmutableList.of( + PURGE_FIRST_RUN_TABLE, + PURGE_DELTA_PASSED_TABLE, + PURGE_DELTA_NOT_PASSED_TABLE, + PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE + ); + Schema schema = null; + TableConfig tableConfig = null; + for (String tableName : allTables) { + // create and upload schema + schema = createSchema(); + schema.setSchemaName(tableName); + addSchema(schema); + + // create and upload table config + setTableName(tableName); + tableConfig = createOfflineTableConfig(); + tableConfig.setTaskConfig(getPurgeTaskConfig()); + addTableConfig(tableConfig); + } // Unpack the Avro files List<File> avroFiles = unpackAvroData(_tempDir); - // Create and upload segments - ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, purgeTableConfig, schema, 0, _segmentDir1, _tarDir1); - ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, purgeDeltaPassedTableConfig, schema, 0, _segmentDir2, - _tarDir2); - ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, purgeDeltaNotPassedTableConfig, schema, 0, - _segmentDir3, _tarDir3); + // Create segments + ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDataDir, + _segmentTarDir); - uploadSegments(PURGE_FIRST_RUN_TABLE, _tarDir1); - uploadSegments(PURGE_DELTA_PASSED_TABLE, _tarDir2); - uploadSegments(PURGE_DELTA_NOT_PASSED_TABLE, _tarDir3); + // Upload segments for all tables + for (String tableName : allTables) { + uploadSegments(tableName, _segmentTarDir); + } startMinion(); setRecordPurger(); @@ -150,10 +152,14 @@ public class PurgeMinionClusterIntegrationTest extends BaseClusterIntegrationTes private void setRecordPurger() { MinionContext minionContext = MinionContext.getInstance(); minionContext.setRecordPurgerFactory(rawTableName -> { - List<String> tableNames = - Arrays.asList(PURGE_FIRST_RUN_TABLE, PURGE_DELTA_PASSED_TABLE, PURGE_DELTA_NOT_PASSED_TABLE); + List<String> tableNames = Arrays.asList( + PURGE_FIRST_RUN_TABLE, + PURGE_DELTA_PASSED_TABLE, + PURGE_DELTA_NOT_PASSED_TABLE, + PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE + ); if (tableNames.contains(rawTableName)) { - return row -> row.getValue("Quarter").equals(1); + return row -> row.getValue("ArrTime").equals(1); } else { return null; } @@ -205,11 +211,11 @@ public class PurgeMinionClusterIntegrationTest extends BaseClusterIntegrationTes // Should not generate new purge task as the last time purge is not greater than last + 1day (default purge delay) assertNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE)); - // 28057 rows with quarter = 1 + // 52 rows with ArrTime = 1 // 115545 totals rows - // Expecting 115545 - 28057 = 87488 rows after purging + // Expecting 115545 - 52 = 115493 rows after purging // It might take some time for server to load the purged segments - TestUtils.waitForCondition(aVoid -> getCurrentCountStarResult(PURGE_FIRST_RUN_TABLE) == 87488, 60_000L, + TestUtils.waitForCondition(aVoid -> getCurrentCountStarResult(PURGE_FIRST_RUN_TABLE) == 115493, 60_000L, "Failed to get expected purged records"); // Drop the table @@ -251,11 +257,11 @@ public class PurgeMinionClusterIntegrationTest extends BaseClusterIntegrationTes // Should not generate new purge task as the last time purge is not greater than last + 1day (default purge delay) assertNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE)); - // 28057 rows with quarter = 1 + // 52 rows with ArrTime = 1 // 115545 totals rows - // Expecting 115545 - 28057 = 87488 rows after purging + // Expecting 115545 - 52 = 115493 rows after purging // It might take some time for server to load the purged segments - TestUtils.waitForCondition(aVoid -> getCurrentCountStarResult(PURGE_DELTA_PASSED_TABLE) == 87488, 60_000L, + TestUtils.waitForCondition(aVoid -> getCurrentCountStarResult(PURGE_DELTA_PASSED_TABLE) == 115493, 60_000L, "Failed to get expected purged records"); // Drop the table @@ -302,6 +308,63 @@ public class PurgeMinionClusterIntegrationTest extends BaseClusterIntegrationTes verifyTableDelete(offlineTableName); } + /** + * Test purge on segments which were built by older schema and table config. + * Two new columns are added after segments are built and indices are defined for the new columns in the table config. + */ + @Test + public void testPurgeOnOldSegmentsWithIndicesOnNewColumns() + throws Exception { + + // add new columns to schema + Schema schema = createSchema(); + schema.addField(new DimensionFieldSpec("ColumnABC", FieldSpec.DataType.INT, true)); + schema.addField(new DimensionFieldSpec("ColumnXYZ", FieldSpec.DataType.INT, true)); + schema.setSchemaName(PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE); + updateSchema(schema); + + // add indices to the new columns + setTableName(PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE); + TableConfig tableConfig = createOfflineTableConfig(); + tableConfig.setTaskConfig(getPurgeTaskConfig()); + IndexingConfig indexingConfig = tableConfig.getIndexingConfig(); + List<String> invertedIndices = new ArrayList<>(indexingConfig.getInvertedIndexColumns()); + invertedIndices.add("ColumnABC"); + List<String> rangeIndices = new ArrayList<>(indexingConfig.getRangeIndexColumns()); + rangeIndices.add("ColumnXYZ"); + indexingConfig.setInvertedIndexColumns(invertedIndices); + indexingConfig.setRangeIndexColumns(rangeIndices); + updateTableConfig(tableConfig); + + // schedule purge tasks + String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE); + assertNotNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE)); + assertTrue(_helixTaskResourceManager.getTaskQueues() + .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.PurgeTask.TASK_TYPE))); + assertNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE)); + waitForTaskToComplete(); + + // Check that metadata contains expected values + for (SegmentZKMetadata metadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) { + // Check purge time + assertTrue( + metadata.getCustomMap().containsKey(MinionConstants.PurgeTask.TASK_TYPE + MinionConstants.TASK_TIME_SUFFIX)); + } + + // 52 rows with ArrTime = 1 + // 115545 totals rows + // Expecting 115545 - 52 = 115493 rows after purging + // It might take some time for server to load the purged segments + TestUtils.waitForCondition(aVoid -> getCurrentCountStarResult(PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE) == 115493, + 60_000L, "Failed to get expected purged records"); + + // Drop the table + dropOfflineTable(PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE); + + // Check if the task metadata is cleaned up on table deletion + verifyTableDelete(offlineTableName); + } + protected void verifyTableDelete(String tableNameWithType) { TestUtils.waitForCondition(input -> { // Check if the segment lineage is cleaned up diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutor.java index 542bbde7da..267ed0874e 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutor.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutor.java @@ -28,6 +28,7 @@ import org.apache.pinot.core.minion.SegmentPurger; import org.apache.pinot.plugin.minion.tasks.BaseSingleSegmentConversionExecutor; import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.builder.TableNameBuilder; @@ -44,7 +45,6 @@ public class PurgeTaskExecutor extends BaseSingleSegmentConversionExecutor { String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY); String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); - TableConfig tableConfig = getTableConfig(tableNameWithType); SegmentPurger.RecordPurgerFactory recordPurgerFactory = MINION_CONTEXT.getRecordPurgerFactory(); SegmentPurger.RecordPurger recordPurger = recordPurgerFactory != null ? recordPurgerFactory.getRecordPurger(rawTableName) : null; @@ -52,8 +52,11 @@ public class PurgeTaskExecutor extends BaseSingleSegmentConversionExecutor { SegmentPurger.RecordModifier recordModifier = recordModifierFactory != null ? recordModifierFactory.getRecordModifier(rawTableName) : null; + TableConfig tableConfig = getTableConfig(tableNameWithType); + Schema schema = getSchema(tableNameWithType); _eventObserver.notifyProgress(pinotTaskConfig, "Purging segment: " + indexDir); - SegmentPurger segmentPurger = new SegmentPurger(indexDir, workingDir, tableConfig, recordPurger, recordModifier); + SegmentPurger segmentPurger = + new SegmentPurger(indexDir, workingDir, tableConfig, schema, recordPurger, recordModifier); File purgedSegmentFile = segmentPurger.purgeSegment(); if (purgedSegmentFile == null) { purgedSegmentFile = indexDir; diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutorTest.java index d8f955be8a..29a2cdd25a 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutorTest.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutorTest.java @@ -26,6 +26,7 @@ import org.apache.commons.io.FileUtils; import org.apache.helix.AccessOption; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.pinot.common.utils.SchemaUtils; import org.apache.pinot.common.utils.config.TableConfigUtils; import org.apache.pinot.core.common.MinionConstants; import org.apache.pinot.core.minion.PinotTaskConfig; @@ -95,6 +96,8 @@ public class PurgeTaskExecutorTest { ZkHelixPropertyStore<ZNRecord> helixPropertyStore = Mockito.mock(ZkHelixPropertyStore.class); Mockito.when(helixPropertyStore.get("/CONFIGS/TABLE/testTable_OFFLINE", null, AccessOption.PERSISTENT)) .thenReturn(TableConfigUtils.toZNRecord(tableConfig)); + Mockito.when(helixPropertyStore.get("/SCHEMAS/testTable", null, AccessOption.PERSISTENT)) + .thenReturn(SchemaUtils.toZNRecord(schema)); minionContext.setHelixPropertyStore(helixPropertyStore); minionContext.setRecordPurgerFactory(rawTableName -> { if (rawTableName.equals(TABLE_NAME)) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org