This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch support-json-to-map-transform-during-ingestion
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 9bbd0e3578f2c282e1251df33643e7508c6d5e9c
Author: Xiang Fu <[email protected]>
AuthorDate: Thu Oct 30 06:10:08 2025 -0700

    Support MAP type in derived column creation during segment reload
---
 .../pinot/common/function/FunctionUtils.java       |   3 +
 .../apache/pinot/common/utils/PinotDataType.java   |  13 +++
 .../stats/MapColumnPreIndexStatsCollector.java     | 107 +++++++++++++++++++--
 .../defaultcolumn/BaseDefaultColumnHandler.java    |  42 ++++++--
 .../ExpressionTransformerTest.java                 |  34 ++++++-
 5 files changed, 182 insertions(+), 17 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionUtils.java
index c1445e98bde..e4f6760929d 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionUtils.java
@@ -149,6 +149,9 @@ public class FunctionUtils {
     if (Collection.class.isAssignableFrom(clazz)) {
       return PinotDataType.COLLECTION;
     }
+    if (Map.class.isAssignableFrom(clazz)) {
+      return PinotDataType.MAP;
+    }
     return ARGUMENT_TYPE_MAP.get(clazz);
   }
 
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java
index 030c5a9df9f..c8cc2a242b8 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java
@@ -32,6 +32,7 @@ import org.apache.pinot.spi.utils.BigDecimalUtils;
 import org.apache.pinot.spi.utils.BooleanUtils;
 import org.apache.pinot.spi.utils.BytesUtils;
 import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.MapUtils;
 import org.apache.pinot.spi.utils.TimestampUtils;
 
 
@@ -827,6 +828,8 @@ public enum PinotDataType {
           } catch (Exception e) {
             throw new RuntimeException("Unable to convert String to Map. Input 
value: " + value, e);
           }
+        case BYTES:
+          return MapUtils.deserializeMap((byte[]) value);
         case OBJECT:
         case MAP:
           if (value instanceof Map) {
@@ -840,6 +843,16 @@ public enum PinotDataType {
               sourceType, value.getClass()));
       }
     }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public byte[] toBytes(Object value) {
+      if (!(value instanceof Map)) {
+        throw new UnsupportedOperationException("Cannot convert non-Map value 
to BYTES for MAP type: "
+            + (value == null ? "null" : value.getClass()));
+      }
+      return MapUtils.serializeMap((Map<String, Object>) value);
+    }
   },
 
   BYTE_ARRAY {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollector.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollector.java
index 99c65e5835b..ca27d0c221c 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollector.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/MapColumnPreIndexStatsCollector.java
@@ -18,9 +18,14 @@
  */
 package org.apache.pinot.segment.local.segment.creator.impl.stats;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap;
+import java.math.BigDecimal;
+import java.text.NumberFormat;
 import java.util.Arrays;
+import java.util.Locale;
 import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.utils.PinotDataType;
 import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
 import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
@@ -32,8 +37,11 @@ import org.apache.pinot.spi.data.ComplexFieldSpec;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.MapUtils;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -51,6 +59,7 @@ import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
  * heterogeneous value types for a key are encountered will construct the Map 
statistics it can be raised as a fault.
  */
 public class MapColumnPreIndexStatsCollector extends 
AbstractColumnStatisticsCollector {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MapColumnPreIndexStatsCollector.class);
   private final Object2ObjectOpenHashMap<String, 
AbstractColumnStatisticsCollector> _keyStats =
       new Object2ObjectOpenHashMap<>(INITIAL_HASH_SET_SIZE);
   private final Map<String, Integer> _keyFrequencies = new 
Object2ObjectOpenHashMap<>(INITIAL_HASH_SET_SIZE);
@@ -58,7 +67,7 @@ public class MapColumnPreIndexStatsCollector extends 
AbstractColumnStatisticsCol
   private int _minLength = Integer.MAX_VALUE;
   private int _maxLength = 0;
   private boolean _sealed = false;
-  private ComplexFieldSpec _colFieldSpec;
+  private final ComplexFieldSpec _colFieldSpec;
   private boolean _createNoDictCollectorsForKeys = false;
 
   public MapColumnPreIndexStatsCollector(String column, StatsCollectorConfig 
statsCollectorConfig) {
@@ -96,6 +105,9 @@ public class MapColumnPreIndexStatsCollector extends 
AbstractColumnStatisticsCol
       for (Map.Entry<String, Object> mapValueEntry : mapValue.entrySet()) {
         String key = mapValueEntry.getKey();
         Object value = mapValueEntry.getValue();
+        if (value == null) {
+          continue;
+        }
         _keyFrequencies.merge(key, 1, Integer::sum);
         AbstractColumnStatisticsCollector keyStats = _keyStats.get(key);
         if (keyStats == null) {
@@ -105,6 +117,55 @@ public class MapColumnPreIndexStatsCollector extends 
AbstractColumnStatisticsCol
             updatePartition(key);
           }
         }
+        if (keyStats instanceof NoDictColumnStatisticsCollector) {
+          keyStats.collect(value);
+          continue;
+        }
+        if (keyStats instanceof StringColumnPreIndexStatsCollector) {
+          if (value instanceof String || value instanceof Number || value 
instanceof Boolean) {
+            keyStats.collect(String.valueOf(value));
+            continue;
+          }
+          try {
+            keyStats.collect(JsonUtils.objectToString(value));
+            continue;
+          } catch (JsonProcessingException e) {
+            throw new RuntimeException("Failed to serialize value for key '" + 
key + "': " + value, e);
+          }
+        }
+        // Parse the value once for all numeric collector types
+        Number valueNumber = null;
+        if (keyStats instanceof IntColumnPreIndexStatsCollector
+            || keyStats instanceof LongColumnPreIndexStatsCollector
+            || keyStats instanceof FloatColumnPreIndexStatsCollector
+            || keyStats instanceof DoubleColumnPreIndexStatsCollector
+            || keyStats instanceof BigDecimalColumnPreIndexStatsCollector) {
+          valueNumber = parseFlexibleNumber(value);
+          if (valueNumber == null) {
+            continue;
+          }
+        }
+        if (keyStats instanceof IntColumnPreIndexStatsCollector) {
+          keyStats.collect(valueNumber.intValue());
+          continue;
+        }
+        if (keyStats instanceof LongColumnPreIndexStatsCollector) {
+          keyStats.collect(valueNumber.longValue());
+          continue;
+        }
+        if (keyStats instanceof FloatColumnPreIndexStatsCollector) {
+          keyStats.collect(valueNumber.floatValue());
+          continue;
+        }
+        if (keyStats instanceof DoubleColumnPreIndexStatsCollector) {
+          keyStats.collect(valueNumber.doubleValue());
+          continue;
+        }
+        if (keyStats instanceof BigDecimalColumnPreIndexStatsCollector) {
+          keyStats.collect(new BigDecimal(valueNumber.toString()));
+          continue;
+        }
+        // Catch all
         keyStats.collect(value);
       }
       _totalNumberOfEntries++;
@@ -113,6 +174,30 @@ public class MapColumnPreIndexStatsCollector extends 
AbstractColumnStatisticsCol
     }
   }
 
+  @Nullable
+  private Number parseFlexibleNumber(Object input) {
+    if (input instanceof Number) {
+      return (Number) input;
+    }
+
+    String s = input.toString().trim();
+    if (s.isEmpty()) {
+      return null;
+    }
+    try {
+      // Try BigDecimal first — it supports everything cleanly
+      return new BigDecimal(s);
+    } catch (NumberFormatException e) {
+      try {
+        // Try locale parsing fallback
+        NumberFormat nf = NumberFormat.getInstance(Locale.US);
+        return nf.parse(s);
+      } catch (Exception ignored) {
+        return null;
+      }
+    }
+  }
+
   @Override
   public String getMinValue() {
     if (_sealed) {
@@ -196,7 +281,6 @@ public class MapColumnPreIndexStatsCollector extends 
AbstractColumnStatisticsCol
     if (_createNoDictCollectorsForKeys) {
       return new NoDictColumnStatisticsCollector(key, config);
     }
-
     switch (type) {
       case INTEGER:
         return new IntColumnPreIndexStatsCollector(key, config);
@@ -208,18 +292,23 @@ public class MapColumnPreIndexStatsCollector extends 
AbstractColumnStatisticsCol
         return new DoubleColumnPreIndexStatsCollector(key, config);
       case BIG_DECIMAL:
         return new BigDecimalColumnPreIndexStatsCollector(key, config);
+      case BOOLEAN:
       case STRING:
+      case MAP:
+      case OBJECT:
         return new StringColumnPreIndexStatsCollector(key, config);
       default:
-        throw new UnsupportedOperationException(String.format("MAP column does 
not yet support '%s'", type));
+        LOGGER.warn("Unknown data type {} for key {} and value {}", type, key, 
value);
+        return new StringColumnPreIndexStatsCollector(key, config);
     }
   }
 
+  /**
+   * Convert Map value data type to stored field type.
+   * Note that all unknown types are automatically converted to String type.
+   */
   private static FieldSpec.DataType convertToDataType(PinotDataType ty) {
-    // TODO: I've been told that we already have a function to do this, so 
find that function and replace this
     switch (ty) {
-      case BOOLEAN:
-        return FieldSpec.DataType.BOOLEAN;
       case SHORT:
       case INTEGER:
         return FieldSpec.DataType.INT;
@@ -233,10 +322,12 @@ public class MapColumnPreIndexStatsCollector extends 
AbstractColumnStatisticsCol
         return FieldSpec.DataType.BIG_DECIMAL;
       case TIMESTAMP:
         return FieldSpec.DataType.TIMESTAMP;
+      case BOOLEAN:
       case STRING:
-        return FieldSpec.DataType.STRING;
+      case OBJECT:
+      case MAP:
       default:
-        throw new UnsupportedOperationException();
+        return FieldSpec.DataType.STRING;
     }
   }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
index d2acb18f4d8..33f9112ecde 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
@@ -47,11 +47,10 @@ import 
org.apache.pinot.segment.local.segment.creator.impl.stats.DoubleColumnPre
 import 
org.apache.pinot.segment.local.segment.creator.impl.stats.FloatColumnPreIndexStatsCollector;
 import 
org.apache.pinot.segment.local.segment.creator.impl.stats.IntColumnPreIndexStatsCollector;
 import 
org.apache.pinot.segment.local.segment.creator.impl.stats.LongColumnPreIndexStatsCollector;
+import 
org.apache.pinot.segment.local.segment.creator.impl.stats.MapColumnPreIndexStatsCollector;
 import 
org.apache.pinot.segment.local.segment.creator.impl.stats.NoDictColumnStatisticsCollector;
 import 
org.apache.pinot.segment.local.segment.creator.impl.stats.StringColumnPreIndexStatsCollector;
 import 
org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexType;
-import 
org.apache.pinot.segment.local.segment.index.forward.ForwardIndexCreatorFactory;
-import org.apache.pinot.segment.local.segment.index.forward.ForwardIndexPlugin;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
 import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
@@ -809,6 +808,30 @@ public abstract class BaseDefaultColumnHandler implements 
DefaultColumnHandler {
                   new ByteArray((byte[]) fieldSpec.getDefaultNullValue()));
           break;
         }
+        case MAP: {
+          // Ensure each value is non-null; default for MAP is an empty map
+          for (int i = 0; i < numDocs; i++) {
+            if (outputValues[i] == null) {
+              outputValues[i] = fieldSpec.getDefaultNullValue();
+            }
+          }
+
+          // Use MapColumnPreIndexStatsCollector for collecting MAP stats
+          AbstractColumnStatisticsCollector statsCollector =
+              new MapColumnPreIndexStatsCollector(column, 
statsCollectorConfig);
+          for (Object value : outputValues) {
+            statsCollector.collect(value);
+          }
+          statsCollector.seal();
+
+          // MAP does not use dictionary encoding
+          createDictionary = false;
+          indexCreationInfo =
+              new ColumnIndexCreationInfo(statsCollector, /* createDictionary 
*/ false, false, true,
+                  fieldSpec.getDefaultNullValue());
+          break;
+        }
+
         default:
           throw new IllegalStateException();
       }
@@ -1166,8 +1189,12 @@ public abstract class BaseDefaultColumnHandler 
implements DefaultColumnHandler {
             case BYTES:
               forwardIndexCreator.putBytes((byte[]) outputValue);
               break;
+            case MAP:
+              forwardIndexCreator.add(outputValue, -1);
+              break;
             default:
-              throw new IllegalStateException();
+              throw new IllegalStateException(
+                  "Unsupported data type: " + fieldSpec.getDataType() + ", for 
value: " + outputValue);
           }
         }
       } else {
@@ -1193,10 +1220,12 @@ public abstract class BaseDefaultColumnHandler 
implements DefaultColumnHandler {
               forwardIndexCreator.putBytesMV((byte[][]) outputValue);
               break;
             default:
-              throw new IllegalStateException();
+              throw new IllegalStateException(
+                  "Unsupported data type: " + fieldSpec.getDataType() + ", for 
value: " + outputValue);
           }
         }
       }
+      forwardIndexCreator.seal();
     }
 
     // Add the column metadata
@@ -1222,13 +1251,12 @@ public abstract class BaseDefaultColumnHandler 
implements DefaultColumnHandler {
     ForwardIndexConfig forwardIndexConfig = null;
     FieldIndexConfigs fieldIndexConfig = 
_indexLoadingConfig.getFieldIndexConfig(column);
     if (fieldIndexConfig != null) {
-      forwardIndexConfig = fieldIndexConfig.getConfig(new 
ForwardIndexPlugin().getIndexType());
+      forwardIndexConfig = 
fieldIndexConfig.getConfig(StandardIndexes.forward());
     }
     if (forwardIndexConfig == null) {
       forwardIndexConfig = new ForwardIndexConfig(false, null, null, null, 
null, null, null);
     }
-
-    return ForwardIndexCreatorFactory.createIndexCreator(indexCreationContext, 
forwardIndexConfig);
+    return StandardIndexes.forward().createIndexCreator(indexCreationContext, 
forwardIndexConfig);
   }
 
   @SuppressWarnings("rawtypes")
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java
index 8906d6f88eb..70935130b92 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -393,11 +394,40 @@ public class ExpressionTransformerTest {
     genericRow = new GenericRow();
     genericRow.putValue("x", "abcd");
     expressionTransformer.transform(genericRow);
-    Assert.assertEquals(genericRow.getValue("y"), null);
+    Assert.assertNull(genericRow.getValue("y"));
     // Invalid case: x is null, y is int
     genericRow = new GenericRow();
     genericRow.putValue("x", null);
     expressionTransformer.transform(genericRow);
-    Assert.assertEquals(genericRow.getValue("y"), null);
+    Assert.assertNull(genericRow.getValue("y"));
+  }
+
+  @Test
+  public void testJsonToMapIngestionTransform() {
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension("columnJson", FieldSpec.DataType.STRING)
+        .addComplex("columnMap", FieldSpec.DataType.MAP, Map.of(
+            "a", new DimensionFieldSpec("a", FieldSpec.DataType.INT, true),
+            "b", new DimensionFieldSpec("b", FieldSpec.DataType.STRING, true)
+        ))
+        .build();
+
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setTransformConfigs(Collections.singletonList(
+        new TransformConfig("columnMap", "jsonStringToMap(columnJson)")));
+    TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE)
+        .setTableName("testJsonToMapIngestionTransform")
+        .setIngestionConfig(ingestionConfig)
+        .build();
+
+    ExpressionTransformer expressionTransformer = new 
ExpressionTransformer(tableConfig, schema);
+
+    GenericRow row = new GenericRow();
+    row.putValue("columnJson", "{\"a\":1,\"b\":\"x\"}");
+
+    expressionTransformer.transform(row);
+    Map<String, Object> map = (Map<String, Object>) row.getValue("columnMap");
+    Assert.assertEquals(map.get("a"), 1);
+    Assert.assertEquals(map.get("b"), "x");
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to