This is an automated email from the ASF dual-hosted git repository.

manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a9f36e438ce Followups on column reader changes (#17293)
a9f36e438ce is described below

commit a9f36e438cec8ab80139acf03993e39947582c66
Author: Krishan Goyal <[email protected]>
AuthorDate: Thu Dec 18 09:45:26 2025 +0530

    Followups on column reader changes (#17293)
    
    * Followups on column reader changes
    
    * Handle nulls in default columns more appropriately
    
    * Add isSingleValue() to ColumnReader
    
    * Minor refactoring in EpochTimeHandler
    
    * Avoid null changes of DefaultValueColumnReader in this PR
---
 .../processing/timehandler/EpochTimeHandler.java   |  3 +-
 .../segment/readers/DefaultValueColumnReader.java  | 13 +++++
 .../readers/PinotSegmentColumnReaderFactory.java   |  9 +--
 .../readers/PinotSegmentColumnReaderImpl.java      | 11 ++++
 .../readers/PinotSegmentColumnarDataSource.java    | 61 ++++++++++++++++++++
 .../pinot/spi/data/readers/ColumnReader.java       | 22 +++++++-
 .../spi/data/readers/ColumnReaderFactory.java      |  6 +-
 .../pinot/spi/data/readers/ColumnarDataSource.java | 66 ++++++++++++++++++++++
 8 files changed, 178 insertions(+), 13 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/timehandler/EpochTimeHandler.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/timehandler/EpochTimeHandler.java
index bf4b4ef48df..a3fbd47b601 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/timehandler/EpochTimeHandler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/timehandler/EpochTimeHandler.java
@@ -105,10 +105,9 @@ public class EpochTimeHandler implements TimeHandler {
   @Override
   @Nullable
   public Object getModifiedTimeValue(Object columnValue) {
-    long timeMs = _formatSpec.fromFormatToMillis(columnValue.toString());
-
     // Round time if needed
     if (_roundBucketMs > 0) {
+      long timeMs = _formatSpec.fromFormatToMillis(columnValue.toString());
       timeMs = (timeMs / _roundBucketMs) * _roundBucketMs;
       return _dataType.convert(_formatSpec.fromMillisToFormat(timeMs));
     } else {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/DefaultValueColumnReader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/DefaultValueColumnReader.java
index ee70b620138..6e97958f96d 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/DefaultValueColumnReader.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/DefaultValueColumnReader.java
@@ -34,6 +34,7 @@ public class DefaultValueColumnReader implements ColumnReader 
{
   private final String _columnName;
   private final int _numDocs;
   private final Object _defaultValue;
+  private final FieldSpec _fieldSpec;
   private final FieldSpec.DataType _dataType;
 
   // Pre-computed multi-value arrays for reuse
@@ -57,6 +58,7 @@ public class DefaultValueColumnReader implements ColumnReader 
{
     _columnName = columnName;
     _numDocs = numDocs;
     _currentIndex = 0;
+    _fieldSpec = fieldSpec;
     _dataType = fieldSpec.getDataType();
 
     // For multi-value fields, wrap the default value in an array
@@ -146,6 +148,11 @@ public class DefaultValueColumnReader implements 
ColumnReader {
     _currentIndex++;
   }
 
+  @Override
+  public boolean isSingleValue() {
+    return _fieldSpec.isSingleValueField();
+  }
+
   @Override
   public boolean isInt() {
     return _dataType == FieldSpec.DataType.INT;
@@ -338,6 +345,12 @@ public class DefaultValueColumnReader implements 
ColumnReader {
     return (byte[]) _defaultValue;
   }
 
+  @Override
+  public Object getValue(int docId) {
+    validateDocId(docId);
+    return _defaultValue;
+  }
+
   // Multi-value accessors
 
   @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReaderFactory.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReaderFactory.java
index e9d16782fdc..2b40444a52a 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReaderFactory.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReaderFactory.java
@@ -91,17 +91,12 @@ public class PinotSegmentColumnReaderFactory implements 
ColumnReaderFactory {
   }
 
   @Override
-  public ColumnReader getColumnReader(String columnName)
-      throws IOException {
+  public ColumnReader getColumnReader(String columnName) {
     if (_targetSchema == null) {
       throw new IllegalStateException("Factory not initialized. Call init() 
first.");
     }
 
-    ColumnReader reader = _columnReaders.get(columnName);
-    if (reader == null) {
-      throw new IOException("Column reader not found for column: " + 
columnName);
-    }
-    return reader;
+    return _columnReaders.get(columnName);
   }
 
   /**
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReaderImpl.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReaderImpl.java
index 827839bdee8..2af71982e56 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReaderImpl.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReaderImpl.java
@@ -109,6 +109,11 @@ public class PinotSegmentColumnReaderImpl implements 
ColumnReader {
     _currentIndex++;
   }
 
+  @Override
+  public boolean isSingleValue() {
+    return _segmentColumnReader.isSingleValue();
+  }
+
   @Override
   public boolean isInt() {
     return _dataType == FieldSpec.DataType.INT;
@@ -314,6 +319,12 @@ public class PinotSegmentColumnReaderImpl implements 
ColumnReader {
     return _segmentColumnReader.getBytes(docId);
   }
 
+  @Override
+  public Object getValue(int docId)
+      throws IOException {
+    return _segmentColumnReader.getValue(docId);
+  }
+
   // Multi-value accessors
 
   @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnarDataSource.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnarDataSource.java
new file mode 100644
index 00000000000..8b3306f6946
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnarDataSource.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.readers;
+
+import java.io.IOException;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.spi.data.readers.ColumnReaderFactory;
+import org.apache.pinot.spi.data.readers.ColumnarDataSource;
+
+
+/**
+ * ColumnarDataSource implementation that wraps a Pinot segment.
+ * Provides columnar access to segment data via ColumnReaderFactory.
+ */
+public class PinotSegmentColumnarDataSource implements ColumnarDataSource {
+
+  private final IndexSegment _indexSegment;
+  private final int _totalDocs;
+
+  public PinotSegmentColumnarDataSource(IndexSegment indexSegment) {
+    _indexSegment = indexSegment;
+    _totalDocs = indexSegment.getSegmentMetadata().getTotalDocs();
+  }
+
+  @Override
+  public int getTotalDocs() {
+    return _totalDocs;
+  }
+
+  @Override
+  public ColumnReaderFactory createColumnReaderFactory() {
+    return new PinotSegmentColumnReaderFactory(_indexSegment);
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    // Segment lifecycle is managed externally, so no cleanup needed here
+  }
+
+  @Override
+  public String toString() {
+    return "PinotSegmentColumnarDataSource{segment=" + 
_indexSegment.getSegmentName() + "}";
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/ColumnReader.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/ColumnReader.java
index faca8bfac07..28bc576acb4 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/ColumnReader.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/ColumnReader.java
@@ -188,6 +188,11 @@ public interface ColumnReader extends Closeable, 
Serializable {
    */
   void skipNext() throws IOException;
 
+  /**
+   * Check if the column data is single-value or multi-value.
+   */
+  boolean isSingleValue();
+
   /**
    * Check if the column data type from the actual reader can be returned as 
the expected type directly.
    * For multi-value columns, this indicates if the multi-value type specific 
methods can be called directly.
@@ -262,7 +267,7 @@ public interface ColumnReader extends Closeable, 
Serializable {
   /**
    * Get int / long / float / double / string / byte[] value at the given 
document ID for single-value columns.
    * Should be called only if isNull(docId) returns false.
-   * <p>Document ID is 0-based. Valid values are 0 to {@link #getTotalDocs()} 
- 1.
+   * Document ID is 0-based. Valid values are 0 to {@link #getTotalDocs()} - 1.
    *
    * @param docId Document ID (0-based)
    * @throws IndexOutOfBoundsException If docId is out of range
@@ -275,6 +280,21 @@ public interface ColumnReader extends Closeable, 
Serializable {
   String getString(int docId) throws IOException;
   byte[] getBytes(int docId) throws IOException;
 
+  /**
+   * Get the value at the given document ID as a Java Object.
+   * Can be used for both single-value and multi-value columns.
+   * This should be used if
+   * 1. Certain API's don't yet support primitive type specific methods (eg: 
TimeHandler, Partitioner, etc.) and
+   *    thus will be boxed anyway.
+   * 2. The required data type does not match the actual type and the client 
will handle the conversion
+   * Document ID is 0-based. Valid values are 0 to {@link #getTotalDocs()} - 1.
+   *
+   * @param docId Document ID (0-based)
+   * @throws IndexOutOfBoundsException If docId is out of range
+   * @throws IOException If an I/O error occurs while reading
+   */
+  Object getValue(int docId) throws IOException;
+
   // Multi-value accessors
 
   /**
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/ColumnReaderFactory.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/ColumnReaderFactory.java
index eadb1ad6564..e31293c0b96 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/ColumnReaderFactory.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/ColumnReaderFactory.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
 import java.util.Set;
+import javax.annotation.Nullable;
 import org.apache.pinot.spi.data.Schema;
 
 
@@ -70,11 +71,10 @@ public interface ColumnReaderFactory extends Closeable, 
Serializable {
    * Implementations may cache and reuse readers for efficiency.
    *
    * @param columnName Name of the column to read
+   *                   Can return null if column doesn't exist in the source
    * @return ColumnReader instance for the specified column (may be cached)
-   * @throws IOException If the column reader cannot be obtained
    */
-  ColumnReader getColumnReader(String columnName)
-      throws IOException;
+  @Nullable ColumnReader getColumnReader(String columnName);
 
   /**
    * Get all column readers for the target schema.
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/ColumnarDataSource.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/ColumnarDataSource.java
new file mode 100644
index 00000000000..a0d1da64cc3
--- /dev/null
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/ColumnarDataSource.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data.readers;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * <p>
+ * This interface is designed to support the creation of multiple {@link 
ColumnReaderFactory} instances
+ * from the same underlying data source. This capability is required because 
different processing contexts
+ * may require distinct column reader objects for the same input file and 
column. For example:
+ * <ul>
+ *   <li>Different processing stages may need independent iterators over the 
same column</li>
+ *   <li>Parallel processing threads may each require their own column 
readers</li>
+ * </ul>
+ * <p>
+ */
+public interface ColumnarDataSource extends Closeable {
+
+  /**
+   * Returns the total number of documents (rows) in this data source.
+   *
+   * @return the total document count
+   */
+  int getTotalDocs();
+
+  /**
+   * Creates a new {@link ColumnReaderFactory} instance for this data source.
+   * <p>
+   * Multiple factories can be created from the same data source, allowing 
different consumers
+   * to independently read and process the columnar data. Each factory can 
then create its own
+   * set of column readers without interfering with readers created by other 
factories.
+   *
+   * @return a new column reader factory instance
+   * @throws IOException if an I/O error occurs while creating the factory
+   */
+  ColumnReaderFactory createColumnReaderFactory()
+      throws IOException;
+
+  /**
+   * Returns a string description of the underlying data source.
+   * <p>
+   * This typically includes information such as the file name, path, or other 
identifiers
+   * that help identify the source of the data.
+   *
+   * @return a description of the underlying source (e.g., file name)
+   */
+  String toString();
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to