This is an automated email from the ASF dual-hosted git repository.
somandal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f828ecd6536 Pass instanceType to SegmentIndexCreationDriverImpl::init
and update minion or server metrics based on this (#17057)
f828ecd6536 is described below
commit f828ecd6536bbe871370394926accc4b90f954c7
Author: Sonam Mandal <[email protected]>
AuthorDate: Thu Oct 23 14:56:10 2025 -0700
Pass instanceType to SegmentIndexCreationDriverImpl::init and update minion
or server metrics based on this (#17057)
---
.../apache/pinot/common/metrics/ServerMeter.java | 6 +-
.../apache/pinot/core/minion/SegmentPurger.java | 3 +-
.../framework/SegmentProcessorFramework.java | 3 +-
.../pinot/core/minion/SegmentPurgerTest.java | 3 +-
.../batch/common/SegmentGenerationTaskRunner.java | 3 +-
.../refreshsegment/RefreshSegmentTaskExecutor.java | 3 +-
.../UpsertCompactionTaskExecutor.java | 3 +-
.../mergerollup/MergeRollupTaskExecutorTest.java | 3 +-
.../minion/tasks/purge/PurgeTaskExecutorTest.java | 3 +-
.../RealtimeToOfflineSegmentsTaskExecutorTest.java | 7 ++-
.../converter/RealtimeSegmentConverter.java | 4 +-
.../impl/SegmentIndexCreationDriverImpl.java | 68 +++++++++++++++++-----
.../mutable/MutableSegmentImplTest.java | 3 +-
.../index/loader/SegmentPreProcessorTest.java | 5 +-
.../spi/creator/SegmentIndexCreationDriver.java | 12 ++++
15 files changed, 99 insertions(+), 30 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index 89cdb7ef4bc..5575a0b89e4 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -236,7 +236,11 @@ public enum ServerMeter implements AbstractMetrics.Meter {
// Workload Budget exceeded counter
WORKLOAD_BUDGET_EXCEEDED("workloadBudgetExceeded", false, "Number of times
workload budget exceeded"),
INGESTION_DELAY_TRACKING_ERRORS("errors", false,
- "Indicates the count of errors encountered while tracking ingestion
delay.");
+ "Indicates the count of errors encountered while tracking ingestion
delay."),
+
+ TRANSFORMATION_ERROR_COUNT("rows", false),
+ DROPPED_RECORD_COUNT("rows", false),
+ CORRUPTED_RECORD_COUNT("rows", false);
private final String _meterName;
private final String _unit;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
index 4e255903d4b..25474199588 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
@@ -30,6 +30,7 @@ import
org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorCustomConfigs;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.spi.config.instance.InstanceType;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
@@ -108,7 +109,7 @@ public class SegmentPurger {
SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
purgeRecordReader.rewind();
- driver.init(_segmentGeneratorConfig, purgeRecordReader);
+ driver.init(_segmentGeneratorConfig, purgeRecordReader,
InstanceType.MINION);
driver.build();
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
index d90c773dea0..b84c243afcf 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
@@ -38,6 +38,7 @@ import
org.apache.pinot.segment.local.segment.creator.TransformPipeline;
import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.creator.name.SegmentNameGeneratorFactory;
+import org.apache.pinot.spi.config.instance.InstanceType;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.RecordReader;
@@ -342,7 +343,7 @@ public class SegmentProcessorFramework {
GenericRowFileRecordReader recordReaderForRange =
recordReader.getRecordReaderForRange(startRowId, endRowId);
SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
driver.init(generatorConfig, new
RecordReaderSegmentCreationDataSource(recordReaderForRange),
-
TransformPipeline.getPassThroughPipeline(tableConfig.getTableName()));
+
TransformPipeline.getPassThroughPipeline(tableConfig.getTableName()),
InstanceType.MINION);
driver.build();
_incompleteRowsFound += driver.getIncompleteRowsFound();
_skippedRowsFound += driver.getSkippedRowsFound();
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/minion/SegmentPurgerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/minion/SegmentPurgerTest.java
index 7f99cd831a4..5c80dc6f455 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/minion/SegmentPurgerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/minion/SegmentPurgerTest.java
@@ -37,6 +37,7 @@ import
org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
+import org.apache.pinot.spi.config.instance.InstanceType;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
@@ -105,7 +106,7 @@ public class SegmentPurgerTest {
config.setSegmentName(SEGMENT_NAME);
SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
- driver.init(config, genericRowRecordReader);
+ driver.init(config, genericRowRecordReader, InstanceType.MINION);
driver.build();
_originalIndexDir = new File(ORIGINAL_SEGMENT_DIR, SEGMENT_NAME);
}
diff --git
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
index c18a8e3ce77..bd8aaac61b5 100644
---
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
+++
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
@@ -31,6 +31,7 @@ import
org.apache.pinot.segment.spi.creator.name.NormalizedDateSegmentNameGenera
import org.apache.pinot.segment.spi.creator.name.SegmentNameGenerator;
import org.apache.pinot.segment.spi.creator.name.SimpleSegmentNameGenerator;
import
org.apache.pinot.segment.spi.creator.name.UploadedRealtimeSegmentNameGenerator;
+import org.apache.pinot.spi.config.instance.InstanceType;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
@@ -112,7 +113,7 @@ public class SegmentGenerationTaskRunner implements
Serializable {
//build segment
SegmentIndexCreationDriverImpl segmentIndexCreationDriver = new
SegmentIndexCreationDriverImpl();
- segmentIndexCreationDriver.init(segmentGeneratorConfig);
+ segmentIndexCreationDriver.init(segmentGeneratorConfig,
InstanceType.MINION);
segmentIndexCreationDriver.build();
return segmentIndexCreationDriver.getSegmentName();
}
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskExecutor.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskExecutor.java
index 12e3e1a2c07..f336f4858c7 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskExecutor.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskExecutor.java
@@ -39,6 +39,7 @@ import
org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
+import org.apache.pinot.spi.config.instance.InstanceType;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
@@ -149,7 +150,7 @@ public class RefreshSegmentTaskExecutor extends
BaseSingleSegmentConversionExecu
SegmentGeneratorConfig config = getSegmentGeneratorConfig(workingDir,
tableConfig, segmentMetadata, segmentName,
getSchema(tableNameWithType));
SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
- driver.init(config, recordReader);
+ driver.init(config, recordReader, InstanceType.MINION);
driver.build();
_eventObserver.notifyProgress(pinotTaskConfig,
"Segment processing stats - incomplete rows:" +
driver.getIncompleteRowsFound() + ", dropped rows:"
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
index 490387f727d..f4497b69ebd 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
@@ -34,6 +34,7 @@ import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationD
import
org.apache.pinot.segment.local.segment.readers.CompactedPinotSegmentRecordReader;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.spi.config.instance.InstanceType;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.Obfuscator;
@@ -104,7 +105,7 @@ public class UpsertCompactionTaskExecutor extends
BaseSingleSegmentConversionExe
SegmentGeneratorConfig config = getSegmentGeneratorConfig(workingDir,
tableConfig, segmentMetadata, segmentName,
getSchema(tableNameWithType));
SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
- driver.init(config, compactedRecordReader);
+ driver.init(config, compactedRecordReader, InstanceType.MINION);
driver.build();
_eventObserver.notifyProgress(pinotTaskConfig,
"Segment processing stats - incomplete rows:" +
driver.getIncompleteRowsFound() + ", dropped rows:"
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutorTest.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutorTest.java
index 22033e53189..a274c65fdab 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutorTest.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutorTest.java
@@ -40,6 +40,7 @@ import
org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.spi.config.instance.InstanceType;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
@@ -89,7 +90,7 @@ public class MergeRollupTaskExecutorTest {
config.setTableName(TABLE_NAME);
config.setSegmentName(segmentName);
SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
- driver.init(config, recordReader);
+ driver.init(config, recordReader, InstanceType.MINION);
driver.build();
_segmentIndexDirList.add(new File(ORIGINAL_SEGMENT_DIR, segmentName));
}
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutorTest.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutorTest.java
index ac310a86cfa..08a2fae7dca 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutorTest.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutorTest.java
@@ -37,6 +37,7 @@ import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationD
import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.instance.InstanceType;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
@@ -88,7 +89,7 @@ public class PurgeTaskExecutorTest {
config.setSegmentName(SEGMENT_NAME);
SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
- driver.init(config, genericRowRecordReader);
+ driver.init(config, genericRowRecordReader, InstanceType.MINION);
driver.build();
_originalIndexDir = new File(ORIGINAL_SEGMENT_DIR, SEGMENT_NAME);
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java
index a9c284658fa..6bb5552517d 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java
@@ -41,6 +41,7 @@ import
org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.spi.config.instance.InstanceType;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -152,7 +153,7 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
config.setTableName(TABLE_NAME);
config.setSegmentName(segmentName);
SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
- driver.init(config, recordReader);
+ driver.init(config, recordReader, InstanceType.MINION);
driver.build();
_segmentIndexDirList.add(new File(ORIGINAL_SEGMENT_DIR, segmentName));
}
@@ -167,7 +168,7 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
config.setTableName(TABLE_NAME_EPOCH_HOURS);
config.setSegmentName(segmentName);
SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
- driver.init(config, recordReader);
+ driver.init(config, recordReader, InstanceType.MINION);
driver.build();
_segmentIndexDirListEpochHours.add(new File(ORIGINAL_SEGMENT_DIR,
segmentName));
}
@@ -182,7 +183,7 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
config.setTableName(TABLE_NAME_SDF);
config.setSegmentName(segmentName);
SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
- driver.init(config, recordReader);
+ driver.init(config, recordReader, InstanceType.MINION);
driver.build();
_segmentIndexDirListSDF.add(new File(ORIGINAL_SEGMENT_DIR, segmentName));
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
index c19c577daf7..0d6544b2976 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
@@ -35,6 +35,7 @@ import org.apache.pinot.segment.local.utils.TableConfigUtils;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.spi.config.instance.InstanceType;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.SegmentZKPropsConfig;
@@ -213,7 +214,8 @@ public class RealtimeSegmentConverter {
dataSource =
new RealtimeSegmentSegmentCreationDataSource(_realtimeSegmentImpl,
(PinotSegmentRecordReader) recordReader);
}
- driver.init(genConfig, dataSource,
TransformPipeline.getPassThroughPipeline(_tableName)); // initializes reader
+ // initializes reader
+ driver.init(genConfig, dataSource,
TransformPipeline.getPassThroughPipeline(_tableName), InstanceType.SERVER);
if (!_enableColumnMajor) {
driver.build();
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
index ca4917caf1b..62e35215cbe 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
@@ -37,6 +37,8 @@ import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.metrics.MinionMeter;
import org.apache.pinot.common.metrics.MinionMetrics;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
import
org.apache.pinot.segment.local.realtime.converter.stats.RealtimeSegmentSegmentCreationDataSource;
import
org.apache.pinot.segment.local.segment.creator.RecordReaderSegmentCreationDataSource;
import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
@@ -73,6 +75,7 @@ import
org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
+import org.apache.pinot.spi.config.instance.InstanceType;
import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.FieldSpec;
@@ -120,11 +123,18 @@ public class SegmentIndexCreationDriverImpl implements
SegmentIndexCreationDrive
private int _incompleteRowsFound = 0;
private int _skippedRowsFound = 0;
private int _sanitizedRowsFound = 0;
+ @Nullable private InstanceType _instanceType;
@Override
public void init(SegmentGeneratorConfig config)
throws Exception {
- init(config, getRecordReader(config));
+ init(config, getRecordReader(config), null);
+ }
+
+ @Override
+ public void init(SegmentGeneratorConfig config, @Nullable InstanceType
instanceType)
+ throws Exception {
+ init(config, getRecordReader(config), instanceType);
}
private RecordReader getRecordReader(SegmentGeneratorConfig
segmentGeneratorConfig)
@@ -168,16 +178,25 @@ public class SegmentIndexCreationDriverImpl implements
SegmentIndexCreationDrive
public void init(SegmentGeneratorConfig config, RecordReader recordReader)
throws Exception {
init(config, new RecordReaderSegmentCreationDataSource(recordReader),
- new TransformPipeline(config.getTableConfig(), config.getSchema()));
+ new TransformPipeline(config.getTableConfig(), config.getSchema()),
null);
+ }
+
+ public void init(SegmentGeneratorConfig config, RecordReader recordReader,
@Nullable InstanceType instanceType)
+ throws Exception {
+ init(config, new RecordReaderSegmentCreationDataSource(recordReader),
+ new TransformPipeline(config.getTableConfig(), config.getSchema()),
instanceType);
}
public void init(SegmentGeneratorConfig config, SegmentCreationDataSource
dataSource,
- TransformPipeline transformPipeline)
+ TransformPipeline transformPipeline, @Nullable InstanceType instanceType)
throws Exception {
_config = config;
_recordReader = dataSource.getRecordReader();
_dataSchema = config.getSchema();
_continueOnError = config.isContinueOnError();
+ Preconditions.checkState(instanceType == null || instanceType ==
InstanceType.SERVER
+ || instanceType == InstanceType.MINION, "InstanceType passed must be
for minion or server or null");
+ _instanceType = instanceType;
if (config.isFailOnEmptySegment()) {
Preconditions.checkState(_recordReader.hasNext(), "No record in data
source");
@@ -329,23 +348,44 @@ public class SegmentIndexCreationDriverImpl implements
SegmentIndexCreationDrive
LOGGER.info("Sanitized {} records during transformation",
_sanitizedRowsFound);
}
- MinionMetrics metrics = MinionMetrics.get();
- String tableNameWithType = _config.getTableConfig().getTableName();
- if (_incompleteRowsFound > 0) {
- metrics.addMeteredTableValue(tableNameWithType,
MinionMeter.TRANSFORMATION_ERROR_COUNT, _incompleteRowsFound);
- }
- if (_skippedRowsFound > 0) {
- metrics.addMeteredTableValue(tableNameWithType,
MinionMeter.DROPPED_RECORD_COUNT, _skippedRowsFound);
- }
- if (_sanitizedRowsFound > 0) {
- metrics.addMeteredTableValue(tableNameWithType,
MinionMeter.CORRUPTED_RECORD_COUNT, _sanitizedRowsFound);
- }
+ updateMetrics(_config.getTableConfig().getTableName());
LOGGER.info("Finished records indexing in IndexCreator!");
handlePostCreation();
}
+ private void updateMetrics(String tableNameWithType) {
+ if (_instanceType == null) {
+ return;
+ }
+
+ // Use appropriate metrics based on instance type
+ if (_instanceType == InstanceType.MINION) {
+ MinionMetrics metrics = MinionMetrics.get();
+ if (_incompleteRowsFound > 0) {
+ metrics.addMeteredTableValue(tableNameWithType,
MinionMeter.TRANSFORMATION_ERROR_COUNT, _incompleteRowsFound);
+ }
+ if (_skippedRowsFound > 0) {
+ metrics.addMeteredTableValue(tableNameWithType,
MinionMeter.DROPPED_RECORD_COUNT, _skippedRowsFound);
+ }
+ if (_sanitizedRowsFound > 0) {
+ metrics.addMeteredTableValue(tableNameWithType,
MinionMeter.CORRUPTED_RECORD_COUNT, _sanitizedRowsFound);
+ }
+ } else if (_instanceType == InstanceType.SERVER) {
+ ServerMetrics metrics = ServerMetrics.get();
+ if (_incompleteRowsFound > 0) {
+ metrics.addMeteredTableValue(tableNameWithType,
ServerMeter.TRANSFORMATION_ERROR_COUNT, _incompleteRowsFound);
+ }
+ if (_skippedRowsFound > 0) {
+ metrics.addMeteredTableValue(tableNameWithType,
ServerMeter.DROPPED_RECORD_COUNT, _skippedRowsFound);
+ }
+ if (_sanitizedRowsFound > 0) {
+ metrics.addMeteredTableValue(tableNameWithType,
ServerMeter.CORRUPTED_RECORD_COUNT, _sanitizedRowsFound);
+ }
+ }
+ }
+
public void buildByColumn(IndexSegment indexSegment,
ThreadSafeMutableRoaringBitmap validDocIds)
throws Exception {
// Count the number of documents and gather per-column statistics
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTest.java
index 64a432d0390..43850ae3710 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTest.java
@@ -35,6 +35,7 @@ import
org.apache.pinot.segment.spi.datasource.DataSourceMetadata;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
+import org.apache.pinot.spi.config.instance.InstanceType;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.FileFormat;
@@ -78,7 +79,7 @@ public class MutableSegmentImplTest {
SegmentGeneratorConfig config =
SegmentTestUtils.getSegmentGeneratorConfigWithoutTimeColumn(avroFile,
TEMP_DIR, "testTable");
SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
- driver.init(config);
+ driver.init(config, InstanceType.SERVER);
driver.build();
_immutableSegment = ImmutableSegmentLoader.load(new File(TEMP_DIR,
driver.getSegmentName()), ReadMode.mmap);
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
index 8d9364f1035..1fafd2de2b7 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
@@ -62,6 +62,7 @@ import
org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
import org.apache.pinot.segment.spi.utils.SegmentMetadataUtils;
+import org.apache.pinot.spi.config.instance.InstanceType;
import org.apache.pinot.spi.config.table.BloomFilterConfig;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec;
@@ -267,7 +268,7 @@ public class SegmentPreProcessorTest implements
PinotBuffersAfterClassCheckRule
config.setOutDir(TEMP_DIR.getPath());
config.setSegmentName(SEGMENT_NAME);
SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
- driver.init(config);
+ driver.init(config, InstanceType.SERVER);
driver.build();
}
@@ -1740,7 +1741,7 @@ public class SegmentPreProcessorTest implements
PinotBuffersAfterClassCheckRule
}
SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
- driver.init(config, new GenericRowRecordReader(rows));
+ driver.init(config, new GenericRowRecordReader(rows), InstanceType.SERVER);
driver.build();
}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentIndexCreationDriver.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentIndexCreationDriver.java
index bb7e846684f..962bc71ef72 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentIndexCreationDriver.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentIndexCreationDriver.java
@@ -20,6 +20,7 @@ package org.apache.pinot.segment.spi.creator;
import java.io.File;
import java.io.Serializable;
+import org.apache.pinot.spi.config.instance.InstanceType;
import org.apache.pinot.spi.data.IngestionSchemaValidator;
@@ -38,6 +39,17 @@ public interface SegmentIndexCreationDriver extends
Serializable {
void init(SegmentGeneratorConfig config)
throws Exception;
+ /**
+ * Configures the segment generator with the given segment generator
configuration, which contains the input file
+ * location, format, schema and other necessary information to create an
index segment.
+ * The instance type is used to determine which metrics to instantiate
(minion vs server).
+ *
+ * @param config The configuration to use when building an index segment
+ * @param instanceType The type of instance (MINION, SERVER, etc.) for
appropriate metrics
+ */
+ void init(SegmentGeneratorConfig config, InstanceType instanceType)
+ throws Exception;
+
/**
* Builds an index segment and writes it to disk. The index segment creation
extracts data from the input files,
* profiles each column and then builds indices based on the profiling
information gathered.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]