This is an automated email from the ASF dual-hosted git repository.

somandal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f828ecd6536 Pass instanceType to SegmentIndexCreationDriverImpl::init 
and update minion or server metrics based on this (#17057)
f828ecd6536 is described below

commit f828ecd6536bbe871370394926accc4b90f954c7
Author: Sonam Mandal <[email protected]>
AuthorDate: Thu Oct 23 14:56:10 2025 -0700

    Pass instanceType to SegmentIndexCreationDriverImpl::init and update minion 
or server metrics based on this (#17057)
---
 .../apache/pinot/common/metrics/ServerMeter.java   |  6 +-
 .../apache/pinot/core/minion/SegmentPurger.java    |  3 +-
 .../framework/SegmentProcessorFramework.java       |  3 +-
 .../pinot/core/minion/SegmentPurgerTest.java       |  3 +-
 .../batch/common/SegmentGenerationTaskRunner.java  |  3 +-
 .../refreshsegment/RefreshSegmentTaskExecutor.java |  3 +-
 .../UpsertCompactionTaskExecutor.java              |  3 +-
 .../mergerollup/MergeRollupTaskExecutorTest.java   |  3 +-
 .../minion/tasks/purge/PurgeTaskExecutorTest.java  |  3 +-
 .../RealtimeToOfflineSegmentsTaskExecutorTest.java |  7 ++-
 .../converter/RealtimeSegmentConverter.java        |  4 +-
 .../impl/SegmentIndexCreationDriverImpl.java       | 68 +++++++++++++++++-----
 .../mutable/MutableSegmentImplTest.java            |  3 +-
 .../index/loader/SegmentPreProcessorTest.java      |  5 +-
 .../spi/creator/SegmentIndexCreationDriver.java    | 12 ++++
 15 files changed, 99 insertions(+), 30 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index 89cdb7ef4bc..5575a0b89e4 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -236,7 +236,11 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   // Workload Budget exceeded counter
   WORKLOAD_BUDGET_EXCEEDED("workloadBudgetExceeded", false, "Number of times 
workload budget exceeded"),
   INGESTION_DELAY_TRACKING_ERRORS("errors", false,
-      "Indicates the count of errors encountered while tracking ingestion 
delay.");
+      "Indicates the count of errors encountered while tracking ingestion 
delay."),
+
+  TRANSFORMATION_ERROR_COUNT("rows", false),
+  DROPPED_RECORD_COUNT("rows", false),
+  CORRUPTED_RECORD_COUNT("rows", false);
 
   private final String _meterName;
   private final String _unit;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java 
b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
index 4e255903d4b..25474199588 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
@@ -30,6 +30,7 @@ import 
org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorCustomConfigs;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.spi.config.instance.InstanceType;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
@@ -108,7 +109,7 @@ public class SegmentPurger {
 
       SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
       purgeRecordReader.rewind();
-      driver.init(_segmentGeneratorConfig, purgeRecordReader);
+      driver.init(_segmentGeneratorConfig, purgeRecordReader, 
InstanceType.MINION);
       driver.build();
     }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
index d90c773dea0..b84c243afcf 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
@@ -38,6 +38,7 @@ import 
org.apache.pinot.segment.local.segment.creator.TransformPipeline;
 import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.segment.spi.creator.name.SegmentNameGeneratorFactory;
+import org.apache.pinot.spi.config.instance.InstanceType;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.RecordReader;
@@ -342,7 +343,7 @@ public class SegmentProcessorFramework {
           GenericRowFileRecordReader recordReaderForRange = 
recordReader.getRecordReaderForRange(startRowId, endRowId);
           SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
           driver.init(generatorConfig, new 
RecordReaderSegmentCreationDataSource(recordReaderForRange),
-              
TransformPipeline.getPassThroughPipeline(tableConfig.getTableName()));
+              
TransformPipeline.getPassThroughPipeline(tableConfig.getTableName()), 
InstanceType.MINION);
           driver.build();
           _incompleteRowsFound += driver.getIncompleteRowsFound();
           _skippedRowsFound += driver.getSkippedRowsFound();
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/minion/SegmentPurgerTest.java 
b/pinot-core/src/test/java/org/apache/pinot/core/minion/SegmentPurgerTest.java
index 7f99cd831a4..5c80dc6f455 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/minion/SegmentPurgerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/minion/SegmentPurgerTest.java
@@ -37,6 +37,7 @@ import 
org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
 import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
+import org.apache.pinot.spi.config.instance.InstanceType;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -105,7 +106,7 @@ public class SegmentPurgerTest {
     config.setSegmentName(SEGMENT_NAME);
 
     SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
-    driver.init(config, genericRowRecordReader);
+    driver.init(config, genericRowRecordReader, InstanceType.MINION);
     driver.build();
     _originalIndexDir = new File(ORIGINAL_SEGMENT_DIR, SEGMENT_NAME);
   }
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
index c18a8e3ce77..bd8aaac61b5 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
@@ -31,6 +31,7 @@ import 
org.apache.pinot.segment.spi.creator.name.NormalizedDateSegmentNameGenera
 import org.apache.pinot.segment.spi.creator.name.SegmentNameGenerator;
 import org.apache.pinot.segment.spi.creator.name.SimpleSegmentNameGenerator;
 import 
org.apache.pinot.segment.spi.creator.name.UploadedRealtimeSegmentNameGenerator;
+import org.apache.pinot.spi.config.instance.InstanceType;
 import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
@@ -112,7 +113,7 @@ public class SegmentGenerationTaskRunner implements 
Serializable {
 
     //build segment
     SegmentIndexCreationDriverImpl segmentIndexCreationDriver = new 
SegmentIndexCreationDriverImpl();
-    segmentIndexCreationDriver.init(segmentGeneratorConfig);
+    segmentIndexCreationDriver.init(segmentGeneratorConfig, 
InstanceType.MINION);
     segmentIndexCreationDriver.build();
     return segmentIndexCreationDriver.getSegmentName();
   }
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskExecutor.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskExecutor.java
index 12e3e1a2c07..f336f4858c7 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskExecutor.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskExecutor.java
@@ -39,6 +39,7 @@ import 
org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
 import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
+import org.apache.pinot.spi.config.instance.InstanceType;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
@@ -149,7 +150,7 @@ public class RefreshSegmentTaskExecutor extends 
BaseSingleSegmentConversionExecu
       SegmentGeneratorConfig config = getSegmentGeneratorConfig(workingDir, 
tableConfig, segmentMetadata, segmentName,
           getSchema(tableNameWithType));
       SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
-      driver.init(config, recordReader);
+      driver.init(config, recordReader, InstanceType.MINION);
       driver.build();
       _eventObserver.notifyProgress(pinotTaskConfig,
           "Segment processing stats - incomplete rows:" + 
driver.getIncompleteRowsFound() + ", dropped rows:"
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
index 490387f727d..f4497b69ebd 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
@@ -34,6 +34,7 @@ import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationD
 import 
org.apache.pinot.segment.local.segment.readers.CompactedPinotSegmentRecordReader;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.spi.config.instance.InstanceType;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.Obfuscator;
@@ -104,7 +105,7 @@ public class UpsertCompactionTaskExecutor extends 
BaseSingleSegmentConversionExe
       SegmentGeneratorConfig config = getSegmentGeneratorConfig(workingDir, 
tableConfig, segmentMetadata, segmentName,
           getSchema(tableNameWithType));
       SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
-      driver.init(config, compactedRecordReader);
+      driver.init(config, compactedRecordReader, InstanceType.MINION);
       driver.build();
       _eventObserver.notifyProgress(pinotTaskConfig,
           "Segment processing stats - incomplete rows:" + 
driver.getIncompleteRowsFound() + ", dropped rows:"
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutorTest.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutorTest.java
index 22033e53189..a274c65fdab 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutorTest.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutorTest.java
@@ -40,6 +40,7 @@ import 
org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.spi.config.instance.InstanceType;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -89,7 +90,7 @@ public class MergeRollupTaskExecutorTest {
       config.setTableName(TABLE_NAME);
       config.setSegmentName(segmentName);
       SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
-      driver.init(config, recordReader);
+      driver.init(config, recordReader, InstanceType.MINION);
       driver.build();
       _segmentIndexDirList.add(new File(ORIGINAL_SEGMENT_DIR, segmentName));
     }
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutorTest.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutorTest.java
index ac310a86cfa..08a2fae7dca 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutorTest.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskExecutorTest.java
@@ -37,6 +37,7 @@ import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationD
 import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
 import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.instance.InstanceType;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -88,7 +89,7 @@ public class PurgeTaskExecutorTest {
     config.setSegmentName(SEGMENT_NAME);
 
     SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
-    driver.init(config, genericRowRecordReader);
+    driver.init(config, genericRowRecordReader, InstanceType.MINION);
     driver.build();
     _originalIndexDir = new File(ORIGINAL_SEGMENT_DIR, SEGMENT_NAME);
 
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java
index a9c284658fa..6bb5552517d 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java
@@ -41,6 +41,7 @@ import 
org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.spi.config.instance.InstanceType;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
 import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -152,7 +153,7 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
       config.setTableName(TABLE_NAME);
       config.setSegmentName(segmentName);
       SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
-      driver.init(config, recordReader);
+      driver.init(config, recordReader, InstanceType.MINION);
       driver.build();
       _segmentIndexDirList.add(new File(ORIGINAL_SEGMENT_DIR, segmentName));
     }
@@ -167,7 +168,7 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
       config.setTableName(TABLE_NAME_EPOCH_HOURS);
       config.setSegmentName(segmentName);
       SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
-      driver.init(config, recordReader);
+      driver.init(config, recordReader, InstanceType.MINION);
       driver.build();
       _segmentIndexDirListEpochHours.add(new File(ORIGINAL_SEGMENT_DIR, 
segmentName));
     }
@@ -182,7 +183,7 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
       config.setTableName(TABLE_NAME_SDF);
       config.setSegmentName(segmentName);
       SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
-      driver.init(config, recordReader);
+      driver.init(config, recordReader, InstanceType.MINION);
       driver.build();
       _segmentIndexDirListSDF.add(new File(ORIGINAL_SEGMENT_DIR, segmentName));
     }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
index c19c577daf7..0d6544b2976 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
@@ -35,6 +35,7 @@ import org.apache.pinot.segment.local.utils.TableConfigUtils;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
 import 
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.spi.config.instance.InstanceType;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
 import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
 import org.apache.pinot.spi.config.table.SegmentZKPropsConfig;
@@ -213,7 +214,8 @@ public class RealtimeSegmentConverter {
       dataSource =
           new RealtimeSegmentSegmentCreationDataSource(_realtimeSegmentImpl, 
(PinotSegmentRecordReader) recordReader);
     }
-    driver.init(genConfig, dataSource, 
TransformPipeline.getPassThroughPipeline(_tableName)); // initializes reader
+    // initializes reader
+    driver.init(genConfig, dataSource, 
TransformPipeline.getPassThroughPipeline(_tableName), InstanceType.SERVER);
 
     if (!_enableColumnMajor) {
       driver.build();
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
index ca4917caf1b..62e35215cbe 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
@@ -37,6 +37,8 @@ import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.metrics.MinionMeter;
 import org.apache.pinot.common.metrics.MinionMetrics;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
 import 
org.apache.pinot.segment.local.realtime.converter.stats.RealtimeSegmentSegmentCreationDataSource;
 import 
org.apache.pinot.segment.local.segment.creator.RecordReaderSegmentCreationDataSource;
 import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
@@ -73,6 +75,7 @@ import 
org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
 import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
 import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
+import org.apache.pinot.spi.config.instance.InstanceType;
 import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -120,11 +123,18 @@ public class SegmentIndexCreationDriverImpl implements 
SegmentIndexCreationDrive
   private int _incompleteRowsFound = 0;
   private int _skippedRowsFound = 0;
   private int _sanitizedRowsFound = 0;
+  @Nullable private InstanceType _instanceType;
 
   @Override
   public void init(SegmentGeneratorConfig config)
       throws Exception {
-    init(config, getRecordReader(config));
+    init(config, getRecordReader(config), null);
+  }
+
+  @Override
+  public void init(SegmentGeneratorConfig config, @Nullable InstanceType 
instanceType)
+      throws Exception {
+    init(config, getRecordReader(config), instanceType);
   }
 
   private RecordReader getRecordReader(SegmentGeneratorConfig 
segmentGeneratorConfig)
@@ -168,16 +178,25 @@ public class SegmentIndexCreationDriverImpl implements 
SegmentIndexCreationDrive
   public void init(SegmentGeneratorConfig config, RecordReader recordReader)
       throws Exception {
     init(config, new RecordReaderSegmentCreationDataSource(recordReader),
-        new TransformPipeline(config.getTableConfig(), config.getSchema()));
+        new TransformPipeline(config.getTableConfig(), config.getSchema()), 
null);
+  }
+
+  public void init(SegmentGeneratorConfig config, RecordReader recordReader, 
@Nullable InstanceType instanceType)
+      throws Exception {
+    init(config, new RecordReaderSegmentCreationDataSource(recordReader),
+        new TransformPipeline(config.getTableConfig(), config.getSchema()), 
instanceType);
   }
 
   public void init(SegmentGeneratorConfig config, SegmentCreationDataSource 
dataSource,
-      TransformPipeline transformPipeline)
+      TransformPipeline transformPipeline, @Nullable InstanceType instanceType)
       throws Exception {
     _config = config;
     _recordReader = dataSource.getRecordReader();
     _dataSchema = config.getSchema();
     _continueOnError = config.isContinueOnError();
+    Preconditions.checkState(instanceType == null || instanceType == 
InstanceType.SERVER
+        || instanceType == InstanceType.MINION, "InstanceType passed must be 
for minion or server or null");
+    _instanceType = instanceType;
 
     if (config.isFailOnEmptySegment()) {
       Preconditions.checkState(_recordReader.hasNext(), "No record in data 
source");
@@ -329,23 +348,44 @@ public class SegmentIndexCreationDriverImpl implements 
SegmentIndexCreationDrive
       LOGGER.info("Sanitized {} records during transformation", 
_sanitizedRowsFound);
     }
 
-    MinionMetrics metrics = MinionMetrics.get();
-    String tableNameWithType = _config.getTableConfig().getTableName();
-    if (_incompleteRowsFound > 0) {
-      metrics.addMeteredTableValue(tableNameWithType, 
MinionMeter.TRANSFORMATION_ERROR_COUNT, _incompleteRowsFound);
-    }
-    if (_skippedRowsFound > 0) {
-      metrics.addMeteredTableValue(tableNameWithType, 
MinionMeter.DROPPED_RECORD_COUNT, _skippedRowsFound);
-    }
-    if (_sanitizedRowsFound > 0) {
-      metrics.addMeteredTableValue(tableNameWithType, 
MinionMeter.CORRUPTED_RECORD_COUNT, _sanitizedRowsFound);
-    }
+    updateMetrics(_config.getTableConfig().getTableName());
 
     LOGGER.info("Finished records indexing in IndexCreator!");
 
     handlePostCreation();
   }
 
+  private void updateMetrics(String tableNameWithType) {
+    if (_instanceType == null) {
+      return;
+    }
+
+    // Use appropriate metrics based on instance type
+    if (_instanceType == InstanceType.MINION) {
+      MinionMetrics metrics = MinionMetrics.get();
+      if (_incompleteRowsFound > 0) {
+        metrics.addMeteredTableValue(tableNameWithType, 
MinionMeter.TRANSFORMATION_ERROR_COUNT, _incompleteRowsFound);
+      }
+      if (_skippedRowsFound > 0) {
+        metrics.addMeteredTableValue(tableNameWithType, 
MinionMeter.DROPPED_RECORD_COUNT, _skippedRowsFound);
+      }
+      if (_sanitizedRowsFound > 0) {
+        metrics.addMeteredTableValue(tableNameWithType, 
MinionMeter.CORRUPTED_RECORD_COUNT, _sanitizedRowsFound);
+      }
+    } else if (_instanceType == InstanceType.SERVER) {
+      ServerMetrics metrics = ServerMetrics.get();
+      if (_incompleteRowsFound > 0) {
+        metrics.addMeteredTableValue(tableNameWithType, 
ServerMeter.TRANSFORMATION_ERROR_COUNT, _incompleteRowsFound);
+      }
+      if (_skippedRowsFound > 0) {
+        metrics.addMeteredTableValue(tableNameWithType, 
ServerMeter.DROPPED_RECORD_COUNT, _skippedRowsFound);
+      }
+      if (_sanitizedRowsFound > 0) {
+        metrics.addMeteredTableValue(tableNameWithType, 
ServerMeter.CORRUPTED_RECORD_COUNT, _sanitizedRowsFound);
+      }
+    }
+  }
+
   public void buildByColumn(IndexSegment indexSegment, 
ThreadSafeMutableRoaringBitmap validDocIds)
       throws Exception {
     // Count the number of documents and gather per-column statistics
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTest.java
index 64a432d0390..43850ae3710 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTest.java
@@ -35,6 +35,7 @@ import 
org.apache.pinot.segment.spi.datasource.DataSourceMetadata;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
+import org.apache.pinot.spi.config.instance.InstanceType;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.FileFormat;
@@ -78,7 +79,7 @@ public class MutableSegmentImplTest {
     SegmentGeneratorConfig config =
         SegmentTestUtils.getSegmentGeneratorConfigWithoutTimeColumn(avroFile, 
TEMP_DIR, "testTable");
     SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
-    driver.init(config);
+    driver.init(config, InstanceType.SERVER);
     driver.build();
     _immutableSegment = ImmutableSegmentLoader.load(new File(TEMP_DIR, 
driver.getSegmentName()), ReadMode.mmap);
 
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
index 8d9364f1035..1fafd2de2b7 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
@@ -62,6 +62,7 @@ import 
org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
 import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
 import org.apache.pinot.segment.spi.utils.SegmentMetadataUtils;
+import org.apache.pinot.spi.config.instance.InstanceType;
 import org.apache.pinot.spi.config.table.BloomFilterConfig;
 import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec;
@@ -267,7 +268,7 @@ public class SegmentPreProcessorTest implements 
PinotBuffersAfterClassCheckRule
     config.setOutDir(TEMP_DIR.getPath());
     config.setSegmentName(SEGMENT_NAME);
     SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
-    driver.init(config);
+    driver.init(config, InstanceType.SERVER);
     driver.build();
   }
 
@@ -1740,7 +1741,7 @@ public class SegmentPreProcessorTest implements 
PinotBuffersAfterClassCheckRule
     }
 
     SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
-    driver.init(config, new GenericRowRecordReader(rows));
+    driver.init(config, new GenericRowRecordReader(rows), InstanceType.SERVER);
     driver.build();
   }
 
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentIndexCreationDriver.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentIndexCreationDriver.java
index bb7e846684f..962bc71ef72 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentIndexCreationDriver.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentIndexCreationDriver.java
@@ -20,6 +20,7 @@ package org.apache.pinot.segment.spi.creator;
 
 import java.io.File;
 import java.io.Serializable;
+import org.apache.pinot.spi.config.instance.InstanceType;
 import org.apache.pinot.spi.data.IngestionSchemaValidator;
 
 
@@ -38,6 +39,17 @@ public interface SegmentIndexCreationDriver extends 
Serializable {
   void init(SegmentGeneratorConfig config)
       throws Exception;
 
+  /**
+   * Configures the segment generator with the given segment generator 
configuration, which contains the input file
+   * location, format, schema and other necessary information to create an 
index segment.
+   * The instance type is used to determine which metrics to instantiate 
(minion vs server).
+   *
+   * @param config The configuration to use when building an index segment
+   * @param instanceType The type of instance (MINION, SERVER, etc.) for 
appropriate metrics
+   */
+  void init(SegmentGeneratorConfig config, InstanceType instanceType)
+      throws Exception;
+
   /**
    * Builds an index segment and writes it to disk. The index segment creation 
extracts data from the input files,
    * profiles each column and then builds indices based on the profiling 
information gathered.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to