This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ba0c0e01853 Add Parquet output format to segment converter tool 
(#17990)
ba0c0e01853 is described below

commit ba0c0e0185309337cb28f4f7d98af58bfb174d12
Author: Xiang Fu <[email protected]>
AuthorDate: Fri Mar 27 01:07:58 2026 -0700

    Add Parquet output format to segment converter tool (#17990)
    
    * Add Parquet output format support to PinotSegmentConvertCommand
    
    Add PinotSegmentToParquetConverter that converts Pinot segments to
    Parquet format using AvroParquetWriter with GZIP compression. This
    extends the existing segment converter tool to support PARQUET as
    an output format alongside AVRO, CSV, and JSON.
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
    
    * Add configurable compression codec for Parquet output
    
    Add -parquetCompression CLI option supporting GZIP, SNAPPY, ZSTD,
    LZ4, and UNCOMPRESSED codecs (defaults to GZIP). The compression
    codec is passed through to the AvroParquetWriter.
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
    
    * Add forward-index-only loading mode for segment converter
    
    Add -forwardIndexOnly CLI option that loads only forward index,
    dictionary, and null value vector when reading segments. This skips
    secondary indexes (inverted, range, bloom, text, etc.) for faster
    loading and better tolerance of segments with missing indexes.
    
    Changes:
    - IndexLoadingConfig: add forwardIndexOnly flag
    - PhysicalColumnIndexContainer: skip non-essential indexes when flag set
    - ImmutableSegmentLoader: add load overload with forwardIndexOnly param
    - PinotSegmentRecordReader: add init overload with forwardIndexOnly param
    - PinotSegmentToParquetConverter: pass through forwardIndexOnly option
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
    
    * Address review: reuse utility, typed enum option, add Parquet test
    
    - Use SegmentProcessorAvroUtils.convertGenericRowToAvroRecord() instead
      of duplicating GenericRow to Avro Record conversion logic
    - Cache ParquetUtils.getParquetHadoopConfiguration() in a local variable
      to avoid repeated allocation
    - Change -parquetCompression CLI option from String to CompressionCodecName
      for picocli validation of enum values
    - Add testParquetConverter to PinotSegmentConverterTest that verifies
      SV/MV and BYTES column handling via round-trip conversion
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
    
    * Address review: clarify forwardIndexOnly scope, add reader test
    
    - Update -forwardIndexOnly description to clarify it only applies to
      PARQUET output format and is ignored for AVRO/CSV/JSON
    - Add testPinotSegmentRecordReaderForwardIndexOnly to verify that
      PinotSegmentRecordReader reads all rows correctly when loading
      with forwardIndexOnly=true
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
    
    * Apply forwardIndexOnly to all converters (AVRO/CSV/JSON/PARQUET)
    
    For data dump use cases, only the forward index is needed regardless
    of output format. Add forwardIndexOnly support to all converters:
    - PinotSegmentToAvroConverter: also reuse SegmentProcessorAvroUtils
    - PinotSegmentToCsvConverter
    - PinotSegmentToJsonConverter
    - Update CLI description to reflect it applies to all formats
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
    
    * Fix csvListDelimiter bug and clarify forwardIndexOnly Javadocs
    
    - Fix pre-existing bug where _csvDelimiter was passed as both row
      delimiter and list delimiter, making -csvListDelimiter ignored
    - Clarify Javadocs on ImmutableSegmentLoader and PinotSegmentRecordReader
      to note that forwardIndexOnly skips column-level secondary indexes
      but segment-level indexes (star-tree, multi-column text) are still
      loaded when present
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
    
    ---------
    
    Co-authored-by: Claude Opus 4.6 <[email protected]>
---
 .../immutable/ImmutableSegmentLoader.java          | 12 +++
 .../index/column/PhysicalColumnIndexContainer.java |  9 +++
 .../segment/index/loader/IndexLoadingConfig.java   |  9 +++
 .../segment/readers/PinotSegmentRecordReader.java  | 17 ++++-
 .../readers/PinotSegmentRecordReaderTest.java      | 26 +++++++
 .../converter/PinotSegmentConvertCommand.java      | 27 +++++--
 .../converter/PinotSegmentToAvroConverter.java     | 29 +++-----
 .../converter/PinotSegmentToCsvConverter.java      | 12 ++-
 .../converter/PinotSegmentToJsonConverter.java     | 11 ++-
 .../converter/PinotSegmentToParquetConverter.java  | 85 ++++++++++++++++++++++
 .../converter/PinotSegmentConverterTest.java       | 29 ++++++++
 11 files changed, 238 insertions(+), 28 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
index abd37a0e9bc..20c16ad962d 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
@@ -68,8 +68,20 @@ public class ImmutableSegmentLoader {
    */
   public static ImmutableSegment load(File indexDir, ReadMode readMode)
       throws Exception {
+    return load(indexDir, readMode, false);
+  }
+
+  /**
+   * Loads the segment in read-only mode with an option to load only 
column-level forward index, dictionary,
+   * and null value vector, skipping other column-level secondary indexes. 
Segment-level indexes (such as
+   * star-tree or multi-column text index) are still loaded when present. This 
is useful for tools like segment
+   * converters that only need to read data without requiring column-level 
secondary indexes.
+   */
+  public static ImmutableSegment load(File indexDir, ReadMode readMode, 
boolean forwardIndexOnly)
+      throws Exception {
     IndexLoadingConfig defaultIndexLoadingConfig = new IndexLoadingConfig();
     defaultIndexLoadingConfig.setReadMode(readMode);
+    defaultIndexLoadingConfig.setForwardIndexOnly(forwardIndexOnly);
     return load(indexDir, defaultIndexLoadingConfig, false, null, null);
   }
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
index edc996c7a02..e1cd12af0ed 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
@@ -23,6 +23,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import 
org.apache.pinot.segment.local.segment.index.readers.text.MultiColumnLuceneTextIndexReader;
@@ -33,6 +34,7 @@ import 
org.apache.pinot.segment.spi.index.IndexReaderConstraintException;
 import org.apache.pinot.segment.spi.index.IndexReaderFactory;
 import org.apache.pinot.segment.spi.index.IndexService;
 import org.apache.pinot.segment.spi.index.IndexType;
+import org.apache.pinot.segment.spi.index.StandardIndexes;
 import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
 import org.slf4j.Logger;
@@ -42,6 +44,9 @@ import org.slf4j.LoggerFactory;
 public final class PhysicalColumnIndexContainer implements 
ColumnIndexContainer {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PhysicalColumnIndexContainer.class);
 
+  private static final Set<String> FORWARD_INDEX_ONLY_TYPES =
+      Set.of(StandardIndexes.FORWARD_ID, StandardIndexes.DICTIONARY_ID, 
StandardIndexes.NULL_VALUE_VECTOR_ID);
+
   private final IndexTypeMap _indexTypeMap;
 
   // Reference to shared segment-level multi-column text index reader.
@@ -61,8 +66,12 @@ public final class PhysicalColumnIndexContainer implements 
ColumnIndexContainer
     ArrayList<IndexType> indexTypes = new ArrayList<>();
     ArrayList<IndexReader> readers = new ArrayList<>();
 
+    boolean forwardIndexOnly = indexLoadingConfig.isForwardIndexOnly();
     try {
       for (IndexType<?, ?, ?> indexType : 
IndexService.getInstance().getAllIndexes()) {
+        if (forwardIndexOnly && 
!FORWARD_INDEX_ONLY_TYPES.contains(indexType.getId())) {
+          continue;
+        }
         if (segmentReader.hasIndexFor(columnName, indexType)) {
           IndexReaderFactory<?> readerProvider = indexType.getReaderFactory();
           try {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
index 51f17d66914..a2565f09ddc 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
@@ -66,6 +66,7 @@ public class IndexLoadingConfig {
   private Set<String> _knownColumns;
   private String _tableDataDir;
   private boolean _errorOnColumnBuildFailure;
+  private boolean _forwardIndexOnly;
 
   // Initialized by instance data manager config
   private String _instanceId;
@@ -349,6 +350,14 @@ public class IndexLoadingConfig {
     _errorOnColumnBuildFailure = errorOnColumnBuildFailure;
   }
 
+  public boolean isForwardIndexOnly() {
+    return _forwardIndexOnly;
+  }
+
+  public void setForwardIndexOnly(boolean forwardIndexOnly) {
+    _forwardIndexOnly = forwardIndexOnly;
+  }
+
   public boolean isSkipSegmentPreprocess() {
     return _tableConfig != null && 
_tableConfig.getIndexingConfig().isSkipSegmentPreprocess();
   }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReader.java
index f1e5834205f..d284ce76d71 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReader.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReader.java
@@ -117,9 +117,24 @@ public class PinotSegmentRecordReader implements 
RecordReader {
    */
   public void init(File indexDir, @Nullable Set<String> fieldsToRead, 
@Nullable List<String> sortOrder,
       boolean skipDefaultNullValues) {
+    init(indexDir, fieldsToRead, sortOrder, skipDefaultNullValues, false);
+  }
+
+  /**
+   * Initializes the record reader from an index directory with an option to 
skip column-level secondary indexes.
+   *
+   * @param indexDir Index directory
+   * @param fieldsToRead The fields to read from the segment. If null or 
empty, reads all fields
+   * @param sortOrder List of sorted columns
+   * @param skipDefaultNullValues Whether to skip putting default null values 
into the record
+   * @param forwardIndexOnly Whether to load only column-level forward index, 
dictionary, and null value vector,
+   *                         skipping other column-level secondary indexes
+   */
+  public void init(File indexDir, @Nullable Set<String> fieldsToRead, 
@Nullable List<String> sortOrder,
+      boolean skipDefaultNullValues, boolean forwardIndexOnly) {
     IndexSegment indexSegment;
     try {
-      indexSegment = ImmutableSegmentLoader.load(indexDir, ReadMode.mmap);
+      indexSegment = ImmutableSegmentLoader.load(indexDir, ReadMode.mmap, 
forwardIndexOnly);
     } catch (Exception e) {
       throw new RuntimeException("Caught exception while loading the segment 
from: " + indexDir, e);
     }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReaderTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReaderTest.java
index b888cede9b2..4238eed0e11 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReaderTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReaderTest.java
@@ -128,6 +128,32 @@ public class PinotSegmentRecordReaderTest {
     }
   }
 
+  @Test
+  public void testPinotSegmentRecordReaderForwardIndexOnly()
+      throws Exception {
+    List<GenericRow> outputRows = new ArrayList<>();
+
+    PinotSegmentRecordReader pinotSegmentRecordReader = new 
PinotSegmentRecordReader();
+    pinotSegmentRecordReader.init(_segmentIndexDir, null, null, false, true);
+    try (pinotSegmentRecordReader) {
+      while (pinotSegmentRecordReader.hasNext()) {
+        outputRows.add(pinotSegmentRecordReader.next());
+      }
+    }
+
+    Assert.assertEquals(outputRows.size(), _rows.size(),
+        "Number of rows returned by PinotSegmentRecordReader with 
forwardIndexOnly is incorrect");
+    for (int i = 0; i < outputRows.size(); i++) {
+      GenericRow outputRow = outputRows.get(i);
+      GenericRow row = _rows.get(i);
+      Assert.assertEquals(outputRow.getValue(D_SV_1), row.getValue(D_SV_1));
+      
Assert.assertTrue(PinotSegmentUtil.compareMultiValueColumn(outputRow.getValue(D_MV_1),
 row.getValue(D_MV_1)));
+      Assert.assertEquals(outputRow.getValue(M1), row.getValue(M1));
+      Assert.assertEquals(outputRow.getValue(M2), row.getValue(M2));
+      Assert.assertEquals(outputRow.getValue(TIME), row.getValue(TIME));
+    }
+  }
+
   @AfterClass
   public void cleanup() {
     FileUtils.deleteQuietly(new File(_segmentOutputDir));
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentConvertCommand.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentConvertCommand.java
index 2176e61a2bd..e4f8dfb99ae 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentConvertCommand.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentConvertCommand.java
@@ -22,6 +22,7 @@ import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.commons.io.FileUtils;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.pinot.common.utils.TarCompressionUtils;
 import org.apache.pinot.spi.data.readers.FileFormat;
 import org.apache.pinot.tools.AbstractBaseCommand;
@@ -38,6 +39,7 @@ import picocli.CommandLine;
  *   <li>AVRO format</li>
  *   <li>CSV format</li>
  *   <li>JSON format</li>
+ *   <li>PARQUET format</li>
  * </ul>
  */
 @SuppressWarnings("FieldCanBeLocal")
@@ -54,7 +56,7 @@ public class PinotSegmentConvertCommand extends 
AbstractBaseCommand implements C
   private String _outputDir;
 
   @CommandLine.Option(names = {"-outputFormat"}, required = true,
-      description = "Format to convert to (AVRO/CSV/JSON).")
+      description = "Format to convert to (AVRO/CSV/JSON/PARQUET).")
   private String _outputFormat;
 
   @CommandLine.Option(names = {"-csvDelimiter"}, required = false, description 
= "CSV delimiter (default ',').")
@@ -67,6 +69,14 @@ public class PinotSegmentConvertCommand extends 
AbstractBaseCommand implements C
   @CommandLine.Option(names = {"-csvWithHeader"}, required = false, 
description = "Print CSV Header (default false).")
   private boolean _csvWithHeader;
 
+  @CommandLine.Option(names = {"-parquetCompression"}, required = false,
+      description = "Parquet compression codec 
(GZIP/SNAPPY/ZSTD/LZ4/UNCOMPRESSED, default GZIP).")
+  private CompressionCodecName _parquetCompression = CompressionCodecName.GZIP;
+
+  @CommandLine.Option(names = {"-forwardIndexOnly"}, required = false,
+      description = "Load only forward index from the segment, skipping 
secondary indexes (default false).")
+  private boolean _forwardIndexOnly;
+
   @CommandLine.Option(names = {"-overwrite"}, required = false,
       description = "Overwrite the existing file (default false).")
   private boolean _overwrite;
@@ -127,16 +137,21 @@ public class PinotSegmentConvertCommand extends 
AbstractBaseCommand implements C
         switch (FileFormat.fromString(_outputFormat)) {
           case AVRO:
             outputPath += ".avro";
-            new PinotSegmentToAvroConverter(inputPath, outputPath).convert();
+            new PinotSegmentToAvroConverter(inputPath, outputPath, 
_forwardIndexOnly).convert();
             break;
           case CSV:
             outputPath += ".csv";
-            new PinotSegmentToCsvConverter(inputPath, outputPath, 
_csvDelimiter, _csvDelimiter, _csvWithHeader)
-                .convert();
+            new PinotSegmentToCsvConverter(inputPath, outputPath, 
_csvDelimiter, _csvListDelimiter, _csvWithHeader,
+                _forwardIndexOnly).convert();
             break;
           case JSON:
             outputPath += ".json";
-            new PinotSegmentToJsonConverter(inputPath, outputPath).convert();
+            new PinotSegmentToJsonConverter(inputPath, outputPath, 
_forwardIndexOnly).convert();
+            break;
+          case PARQUET:
+            outputPath += ".parquet";
+            new PinotSegmentToParquetConverter(inputPath, outputPath, 
_parquetCompression, _forwardIndexOnly)
+                .convert();
             break;
           default:
             throw new RuntimeException("Unsupported conversion to file format: 
" + _outputFormat);
@@ -152,6 +167,6 @@ public class PinotSegmentConvertCommand extends 
AbstractBaseCommand implements C
 
   @Override
   public String description() {
-    return "Convert Pinot segments to another format such as AVRO/CSV/JSON.";
+    return "Convert Pinot segments to another format such as 
AVRO/CSV/JSON/PARQUET.";
   }
 }
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToAvroConverter.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToAvroConverter.java
index f988f050ab9..dd0d56d2cf4 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToAvroConverter.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToAvroConverter.java
@@ -19,13 +19,11 @@
 package org.apache.pinot.tools.segment.converter;
 
 import java.io.File;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Map;
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.pinot.core.util.SegmentProcessorAvroUtils;
 import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
 import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
@@ -38,10 +36,16 @@ import org.apache.pinot.spi.data.readers.GenericRow;
 public class PinotSegmentToAvroConverter implements PinotSegmentConverter {
   private final String _segmentDir;
   private final String _outputFile;
+  private final boolean _forwardIndexOnly;
 
   public PinotSegmentToAvroConverter(String segmentDir, String outputFile) {
+    this(segmentDir, outputFile, false);
+  }
+
+  public PinotSegmentToAvroConverter(String segmentDir, String outputFile, 
boolean forwardIndexOnly) {
     _segmentDir = segmentDir;
     _outputFile = outputFile;
+    _forwardIndexOnly = forwardIndexOnly;
   }
 
   @Override
@@ -49,26 +53,17 @@ public class PinotSegmentToAvroConverter implements 
PinotSegmentConverter {
       throws Exception {
     File indexDir = new File(_segmentDir);
     Schema avroSchema = AvroUtils.getAvroSchemaFromPinotSchema(new 
SegmentMetadataImpl(indexDir).getSchema());
-    try (PinotSegmentRecordReader pinotSegmentRecordReader = new 
PinotSegmentRecordReader(new File(_segmentDir))) {
+    PinotSegmentRecordReader pinotSegmentRecordReader = new 
PinotSegmentRecordReader();
+    pinotSegmentRecordReader.init(indexDir, null, null, false, 
_forwardIndexOnly);
+    try (pinotSegmentRecordReader) {
       try (DataFileWriter<Record> recordWriter = new DataFileWriter<>(new 
GenericDatumWriter<>(avroSchema))) {
         recordWriter.create(avroSchema, new File(_outputFile));
 
         GenericRow row = new GenericRow();
+        Record reusableRecord = new Record(avroSchema);
         while (pinotSegmentRecordReader.hasNext()) {
           row = pinotSegmentRecordReader.next(row);
-          Record record = new Record(avroSchema);
-          for (Map.Entry<String, Object> entry : 
row.getFieldToValueMap().entrySet()) {
-            String field = entry.getKey();
-            Object value = entry.getValue();
-            if (value instanceof Object[]) {
-              record.put(field, Arrays.asList((Object[]) value));
-            } else if (value instanceof byte[]) {
-              record.put(field, ByteBuffer.wrap((byte[]) value));
-            } else {
-              record.put(field, value);
-            }
-          }
-
+          Record record = 
SegmentProcessorAvroUtils.convertGenericRowToAvroRecord(row, reusableRecord);
           recordWriter.append(record);
           row.clear();
         }
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToCsvConverter.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToCsvConverter.java
index 439cfb991ed..022a0d6fc9f 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToCsvConverter.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToCsvConverter.java
@@ -37,21 +37,29 @@ public class PinotSegmentToCsvConverter implements 
PinotSegmentConverter {
   private final char _delimiter;
   private final char _listDelimiter;
   private final boolean _withHeader;
+  private final boolean _forwardIndexOnly;
 
   PinotSegmentToCsvConverter(String segmentDir, String outputFile, char 
delimiter, char listDelimiter,
       boolean withHeader) {
+    this(segmentDir, outputFile, delimiter, listDelimiter, withHeader, false);
+  }
+
+  PinotSegmentToCsvConverter(String segmentDir, String outputFile, char 
delimiter, char listDelimiter,
+      boolean withHeader, boolean forwardIndexOnly) {
     _segmentDir = segmentDir;
     _outputFile = outputFile;
     _delimiter = delimiter;
     _listDelimiter = listDelimiter;
     _withHeader = withHeader;
+    _forwardIndexOnly = forwardIndexOnly;
   }
 
   @Override
   public void convert()
       throws Exception {
-    try (PinotSegmentRecordReader recordReader = new 
PinotSegmentRecordReader(new File(_segmentDir));
-        BufferedWriter recordWriter = new BufferedWriter(new 
FileWriter(_outputFile))) {
+    PinotSegmentRecordReader recordReader = new PinotSegmentRecordReader();
+    recordReader.init(new File(_segmentDir), null, null, false, 
_forwardIndexOnly);
+    try (recordReader; BufferedWriter recordWriter = new BufferedWriter(new 
FileWriter(_outputFile))) {
       GenericRow row = new GenericRow();
       row = recordReader.next(row);
       String[] fields = row.getFieldToValueMap().keySet().toArray(new 
String[0]);
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToJsonConverter.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToJsonConverter.java
index e0abf097496..f6cf356adc9 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToJsonConverter.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToJsonConverter.java
@@ -35,17 +35,24 @@ import org.apache.pinot.spi.utils.JsonUtils;
 public class PinotSegmentToJsonConverter implements PinotSegmentConverter {
   private final String _segmentDir;
   private final String _outputFile;
+  private final boolean _forwardIndexOnly;
 
   public PinotSegmentToJsonConverter(String segmentDir, String outputFile) {
+    this(segmentDir, outputFile, false);
+  }
+
+  public PinotSegmentToJsonConverter(String segmentDir, String outputFile, 
boolean forwardIndexOnly) {
     _segmentDir = segmentDir;
     _outputFile = outputFile;
+    _forwardIndexOnly = forwardIndexOnly;
   }
 
   @Override
   public void convert()
       throws Exception {
-    try (PinotSegmentRecordReader recordReader = new 
PinotSegmentRecordReader(new File(_segmentDir));
-        BufferedWriter recordWriter = new BufferedWriter(new 
FileWriter(_outputFile))) {
+    PinotSegmentRecordReader recordReader = new PinotSegmentRecordReader();
+    recordReader.init(new File(_segmentDir), null, null, false, 
_forwardIndexOnly);
+    try (recordReader; BufferedWriter recordWriter = new BufferedWriter(new 
FileWriter(_outputFile))) {
       GenericRow row = new GenericRow();
       while (recordReader.hasNext()) {
         row = recordReader.next(row);
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToParquetConverter.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToParquetConverter.java
new file mode 100644
index 00000000000..8a901d56779
--- /dev/null
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToParquetConverter.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools.segment.converter;
+
+import java.io.File;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+import org.apache.parquet.io.OutputFile;
+import org.apache.pinot.core.util.SegmentProcessorAvroUtils;
+import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
+import org.apache.pinot.plugin.inputformat.parquet.ParquetUtils;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * The <code>PinotSegmentToParquetConverter</code> class is the tool to 
convert Pinot segment to Parquet format.
+ */
+public class PinotSegmentToParquetConverter implements PinotSegmentConverter {
+  private final String _segmentDir;
+  private final String _outputFile;
+  private final CompressionCodecName _compressionCodec;
+  private final boolean _forwardIndexOnly;
+
+  public PinotSegmentToParquetConverter(String segmentDir, String outputFile) {
+    this(segmentDir, outputFile, CompressionCodecName.GZIP, false);
+  }
+
+  public PinotSegmentToParquetConverter(String segmentDir, String outputFile,
+      CompressionCodecName compressionCodec, boolean forwardIndexOnly) {
+    _segmentDir = segmentDir;
+    _outputFile = outputFile;
+    _compressionCodec = compressionCodec;
+    _forwardIndexOnly = forwardIndexOnly;
+  }
+
+  @Override
+  public void convert()
+      throws Exception {
+    File indexDir = new File(_segmentDir);
+    Schema avroSchema = AvroUtils.getAvroSchemaFromPinotSchema(new 
SegmentMetadataImpl(indexDir).getSchema());
+    Configuration hadoopConf = ParquetUtils.getParquetHadoopConfiguration();
+    OutputFile outputFile = HadoopOutputFile.fromPath(new Path(_outputFile), 
hadoopConf);
+    PinotSegmentRecordReader pinotSegmentRecordReader = new 
PinotSegmentRecordReader();
+    pinotSegmentRecordReader.init(new File(_segmentDir), null, null, false, 
_forwardIndexOnly);
+    try (pinotSegmentRecordReader) {
+      try (ParquetWriter<Record> parquetWriter =
+          AvroParquetWriter.<Record>builder(outputFile).withSchema(avroSchema)
+              .withCompressionCodec(_compressionCodec)
+              .withConf(hadoopConf).build()) {
+        GenericRow row = new GenericRow();
+        Record reusableRecord = new Record(avroSchema);
+        while (pinotSegmentRecordReader.hasNext()) {
+          row = pinotSegmentRecordReader.next(row);
+          Record record = 
SegmentProcessorAvroUtils.convertGenericRowToAvroRecord(row, reusableRecord);
+          parquetWriter.write(record);
+          row.clear();
+        }
+      }
+    }
+  }
+}
diff --git 
a/pinot-tools/src/test/java/org/apache/pinot/tools/segment/converter/PinotSegmentConverterTest.java
 
b/pinot-tools/src/test/java/org/apache/pinot/tools/segment/converter/PinotSegmentConverterTest.java
index f4a76a70188..1cae431f614 100644
--- 
a/pinot-tools/src/test/java/org/apache/pinot/tools/segment/converter/PinotSegmentConverterTest.java
+++ 
b/pinot-tools/src/test/java/org/apache/pinot/tools/segment/converter/PinotSegmentConverterTest.java
@@ -26,6 +26,7 @@ import 
org.apache.pinot.plugin.inputformat.avro.AvroRecordReader;
 import org.apache.pinot.plugin.inputformat.csv.CSVRecordReader;
 import org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig;
 import org.apache.pinot.plugin.inputformat.json.JSONRecordReader;
+import org.apache.pinot.plugin.inputformat.parquet.ParquetRecordReader;
 import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
@@ -187,6 +188,34 @@ public class PinotSegmentConverterTest {
     }
   }
 
+  @Test
+  public void testParquetConverter()
+      throws Exception {
+    File outputFile = new File(TEMP_DIR, "segment.parquet");
+    PinotSegmentToParquetConverter parquetConverter =
+        new PinotSegmentToParquetConverter(_segmentDir, outputFile.getPath());
+    parquetConverter.convert();
+
+    try (ParquetRecordReader recordReader = new ParquetRecordReader()) {
+      recordReader.init(outputFile, SCHEMA.getFieldSpecMap().keySet(), null);
+
+      GenericRow record = recordReader.next();
+      assertEquals(record.getValue(INT_SV_COLUMN), 1);
+      assertEquals(record.getValue(LONG_SV_COLUMN), 2L);
+      assertEquals(record.getValue(FLOAT_SV_COLUMN), 3.0f);
+      assertEquals(record.getValue(DOUBLE_SV_COLUMN), 4.0);
+      assertEquals(record.getValue(STRING_SV_COLUMN), "5");
+      assertEquals(record.getValue(BYTES_SV_COLUMN), new byte[]{6, 12, 34, 
56});
+      assertEquals(record.getValue(INT_MV_COLUMN), new Object[]{7, 8});
+      assertEquals(record.getValue(LONG_MV_COLUMN), new Object[]{9L, 10L});
+      assertEquals(record.getValue(FLOAT_MV_COLUMN), new Object[]{11.0f, 
12.0f});
+      assertEquals(record.getValue(DOUBLE_MV_COLUMN), new Object[]{13.0, 
14.0});
+      assertEquals(record.getValue(STRING_MV_COLUMN), new Object[]{"15", 
"16"});
+
+      assertFalse(recordReader.hasNext());
+    }
+  }
+
   @AfterClass
   public void tearDown()
       throws IOException {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to