jackjlli commented on a change in pull request #6046:
URL: https://github.com/apache/incubator-pinot/pull/6046#discussion_r496310227



##########
File path: 
pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java
##########
@@ -175,68 +186,102 @@ public GenericRow next(GenericRow reuse)
       }
       String field = _orcFields.get(i);
       TypeDescription fieldType = _orcFieldTypes.get(i);
-      TypeDescription.Category category = fieldType.getCategory();
-      if (category == TypeDescription.Category.LIST) {
-        // Multi-value field, extract to Object[]
-        TypeDescription.Category childCategory = 
fieldType.getChildren().get(0).getCategory();
-        ListColumnVector listColumnVector = (ListColumnVector) 
_rowBatch.cols[i];
-        int rowId = listColumnVector.isRepeating ? 0 : _nextRowId;
-        if ((listColumnVector.noNulls || !listColumnVector.isNull[rowId])) {
-          int offset = (int) listColumnVector.offsets[rowId];
-          int length = (int) listColumnVector.lengths[rowId];
-          List<Object> values = new ArrayList<>(length);
-          for (int j = 0; j < length; j++) {
-            Object value = extractSingleValue(field, listColumnVector.child, 
offset + j, childCategory);
-            // NOTE: Only keep non-null values
-            // TODO: Revisit
-            if (value != null) {
-              values.add(value);
-            }
-          }
-          if (!values.isEmpty()) {
-            reuse.putValue(field, values.toArray());
-          } else {
-            // NOTE: Treat empty list as null
-            // TODO: Revisit
-            reuse.putValue(field, null);
-          }
-        } else {
-          reuse.putValue(field, null);
+      reuse.putValue(field, extractValue(field, _rowBatch.cols[i], fieldType, 
_nextRowId));
+    }
+
+    if (++_nextRowId == _rowBatch.size) {
+      _hasNext = _orcRecordReader.nextBatch(_rowBatch);
+      _nextRowId = 0;
+    }
+    return reuse;
+  }
+
+  /**
+   * Extracts the values for a given column vector.
+   *
+   * @param field name of the field being extracted
+   * @param columnVector contains values of the field and its sub-types
+   * @param fieldType information about the field such as the category 
(STRUCT, LIST, MAP, INT, etc)
+   * @param rowId the ID of the row value being extracted
+   * @return extracted row value from the column
+   */
+  @Nullable
+  private Object extractValue(String field, ColumnVector columnVector, 
TypeDescription fieldType, int rowId) {
+    TypeDescription.Category category = fieldType.getCategory();
+
+    if (category == TypeDescription.Category.STRUCT) {
+      StructColumnVector structColumnVector = (StructColumnVector) 
columnVector;
+      if (!structColumnVector.isNull[rowId]) {
+        List<TypeDescription> childrenFieldTypes = fieldType.getChildren();
+        List<String> childrenFieldNames = fieldType.getFieldNames();
+
+        Map<Object, Object> convertedMap = new HashMap<>();
+        for (int i = 0; i < childrenFieldNames.size(); i++) {
+          convertedMap.put(childrenFieldNames.get(i),
+              extractValue(childrenFieldNames.get(i), 
structColumnVector.fields[i], childrenFieldTypes.get(i), rowId));
         }
-      } else if (category == TypeDescription.Category.MAP) {
-        // Map field
-        List<TypeDescription> children = fieldType.getChildren();
-        TypeDescription.Category keyCategory = children.get(0).getCategory();
-        TypeDescription.Category valueCategory = children.get(1).getCategory();
-        MapColumnVector mapColumnVector = (MapColumnVector) _rowBatch.cols[i];
-        int rowId = mapColumnVector.isRepeating ? 0 : _nextRowId;
-        if ((mapColumnVector.noNulls || !mapColumnVector.isNull[rowId])) {
-          int offset = (int) mapColumnVector.offsets[rowId];
-          int length = (int) mapColumnVector.lengths[rowId];
-          Map<Object, Object> map = new HashMap<>();
-          for (int j = 0; j < length; j++) {
-            int childRowId = offset + j;
-            Object key = extractSingleValue(field, mapColumnVector.keys, 
childRowId, keyCategory);
-            Object value = extractSingleValue(field, mapColumnVector.values, 
childRowId, valueCategory);
-            map.put(key, value);
+        return convertedMap;
+      } else {
+        return null;
+      }
+    } else if (category == TypeDescription.Category.LIST) {
+      TypeDescription childType = fieldType.getChildren().get(0);
+      ListColumnVector listColumnVector = (ListColumnVector) columnVector;
+      if (columnVector.isRepeating) {
+        rowId = 0;
+      }
+      if ((listColumnVector.noNulls || !listColumnVector.isNull[rowId])) {
+        int offset = (int) listColumnVector.offsets[rowId];
+        int length = (int) listColumnVector.lengths[rowId];
+        List<Object> values = new ArrayList<>(length);
+        for (int j = 0; j < length; j++) {
+          Object value = extractValue(field, listColumnVector.child, 
childType,offset + j);
+          // NOTE: Only keep non-null values
+          if (value != null) {
+            values.add(value);
           }
-          reuse.putValue(field, map);
+        }
+        if (!values.isEmpty()) {
+          return values.toArray();
         } else {
-          reuse.putValue(field, null);
+          // NOTE: Treat empty list as null
+          return null;
         }
       } else {
-        // Single-value field
-        reuse.putValue(field, extractSingleValue(field, _rowBatch.cols[i], 
_nextRowId, category));
+        return null;
       }
-    }
+    } else if (category == TypeDescription.Category.MAP) {

Review comment:
       Same here.

##########
File path: 
pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java
##########
@@ -175,68 +186,102 @@ public GenericRow next(GenericRow reuse)
       }
       String field = _orcFields.get(i);
       TypeDescription fieldType = _orcFieldTypes.get(i);
-      TypeDescription.Category category = fieldType.getCategory();
-      if (category == TypeDescription.Category.LIST) {
-        // Multi-value field, extract to Object[]
-        TypeDescription.Category childCategory = 
fieldType.getChildren().get(0).getCategory();
-        ListColumnVector listColumnVector = (ListColumnVector) 
_rowBatch.cols[i];
-        int rowId = listColumnVector.isRepeating ? 0 : _nextRowId;
-        if ((listColumnVector.noNulls || !listColumnVector.isNull[rowId])) {
-          int offset = (int) listColumnVector.offsets[rowId];
-          int length = (int) listColumnVector.lengths[rowId];
-          List<Object> values = new ArrayList<>(length);
-          for (int j = 0; j < length; j++) {
-            Object value = extractSingleValue(field, listColumnVector.child, 
offset + j, childCategory);
-            // NOTE: Only keep non-null values
-            // TODO: Revisit
-            if (value != null) {
-              values.add(value);
-            }
-          }
-          if (!values.isEmpty()) {
-            reuse.putValue(field, values.toArray());
-          } else {
-            // NOTE: Treat empty list as null
-            // TODO: Revisit
-            reuse.putValue(field, null);
-          }
-        } else {
-          reuse.putValue(field, null);
+      reuse.putValue(field, extractValue(field, _rowBatch.cols[i], fieldType, 
_nextRowId));
+    }
+
+    if (++_nextRowId == _rowBatch.size) {
+      _hasNext = _orcRecordReader.nextBatch(_rowBatch);
+      _nextRowId = 0;
+    }
+    return reuse;
+  }
+
+  /**
+   * Extracts the values for a given column vector.
+   *
+   * @param field name of the field being extracted
+   * @param columnVector contains values of the field and its sub-types
+   * @param fieldType information about the field such as the category 
(STRUCT, LIST, MAP, INT, etc)
+   * @param rowId the ID of the row value being extracted
+   * @return extracted row value from the column
+   */
+  @Nullable
+  private Object extractValue(String field, ColumnVector columnVector, 
TypeDescription fieldType, int rowId) {
+    TypeDescription.Category category = fieldType.getCategory();
+
+    if (category == TypeDescription.Category.STRUCT) {
+      StructColumnVector structColumnVector = (StructColumnVector) 
columnVector;
+      if (!structColumnVector.isNull[rowId]) {
+        List<TypeDescription> childrenFieldTypes = fieldType.getChildren();
+        List<String> childrenFieldNames = fieldType.getFieldNames();
+
+        Map<Object, Object> convertedMap = new HashMap<>();
+        for (int i = 0; i < childrenFieldNames.size(); i++) {
+          convertedMap.put(childrenFieldNames.get(i),
+              extractValue(childrenFieldNames.get(i), 
structColumnVector.fields[i], childrenFieldTypes.get(i), rowId));
         }
-      } else if (category == TypeDescription.Category.MAP) {
-        // Map field
-        List<TypeDescription> children = fieldType.getChildren();
-        TypeDescription.Category keyCategory = children.get(0).getCategory();
-        TypeDescription.Category valueCategory = children.get(1).getCategory();
-        MapColumnVector mapColumnVector = (MapColumnVector) _rowBatch.cols[i];
-        int rowId = mapColumnVector.isRepeating ? 0 : _nextRowId;
-        if ((mapColumnVector.noNulls || !mapColumnVector.isNull[rowId])) {
-          int offset = (int) mapColumnVector.offsets[rowId];
-          int length = (int) mapColumnVector.lengths[rowId];
-          Map<Object, Object> map = new HashMap<>();
-          for (int j = 0; j < length; j++) {
-            int childRowId = offset + j;
-            Object key = extractSingleValue(field, mapColumnVector.keys, 
childRowId, keyCategory);
-            Object value = extractSingleValue(field, mapColumnVector.values, 
childRowId, valueCategory);
-            map.put(key, value);
+        return convertedMap;
+      } else {
+        return null;
+      }
+    } else if (category == TypeDescription.Category.LIST) {

Review comment:
       Same here.

##########
File path: 
pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java
##########
@@ -72,7 +73,7 @@
   private int _nextRowId;
 
   @Override
-  public void init(File dataFile, Set<String> fieldsToRead, @Nullable 
RecordReaderConfig recordReaderConfig)
+  public void init(File dataFile, @Nullable Set<String> fieldsToRead, 
@Nullable RecordReaderConfig recordReaderConfig)

Review comment:
       Can we have an extractor for ORC as the other RecordReaders do?

##########
File path: 
pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java
##########
@@ -175,68 +186,102 @@ public GenericRow next(GenericRow reuse)
       }
       String field = _orcFields.get(i);
       TypeDescription fieldType = _orcFieldTypes.get(i);
-      TypeDescription.Category category = fieldType.getCategory();
-      if (category == TypeDescription.Category.LIST) {
-        // Multi-value field, extract to Object[]
-        TypeDescription.Category childCategory = 
fieldType.getChildren().get(0).getCategory();
-        ListColumnVector listColumnVector = (ListColumnVector) 
_rowBatch.cols[i];
-        int rowId = listColumnVector.isRepeating ? 0 : _nextRowId;
-        if ((listColumnVector.noNulls || !listColumnVector.isNull[rowId])) {
-          int offset = (int) listColumnVector.offsets[rowId];
-          int length = (int) listColumnVector.lengths[rowId];
-          List<Object> values = new ArrayList<>(length);
-          for (int j = 0; j < length; j++) {
-            Object value = extractSingleValue(field, listColumnVector.child, 
offset + j, childCategory);
-            // NOTE: Only keep non-null values
-            // TODO: Revisit
-            if (value != null) {
-              values.add(value);
-            }
-          }
-          if (!values.isEmpty()) {
-            reuse.putValue(field, values.toArray());
-          } else {
-            // NOTE: Treat empty list as null
-            // TODO: Revisit
-            reuse.putValue(field, null);
-          }
-        } else {
-          reuse.putValue(field, null);
+      reuse.putValue(field, extractValue(field, _rowBatch.cols[i], fieldType, 
_nextRowId));
+    }
+
+    if (++_nextRowId == _rowBatch.size) {
+      _hasNext = _orcRecordReader.nextBatch(_rowBatch);
+      _nextRowId = 0;
+    }
+    return reuse;
+  }
+
+  /**
+   * Extracts the values for a given column vector.
+   *
+   * @param field name of the field being extracted
+   * @param columnVector contains values of the field and its sub-types
+   * @param fieldType information about the field such as the category 
(STRUCT, LIST, MAP, INT, etc)
+   * @param rowId the ID of the row value being extracted
+   * @return extracted row value from the column
+   */
+  @Nullable
+  private Object extractValue(String field, ColumnVector columnVector, 
TypeDescription fieldType, int rowId) {
+    TypeDescription.Category category = fieldType.getCategory();
+
+    if (category == TypeDescription.Category.STRUCT) {

Review comment:
       You can still use the method you added such as `isInstanceOfRecord` 
here. So the in the method:
   ```
     protected boolean isInstanceOfRecord(Object value) {
       return (TypeDescription.Category) value == 
TypeDescription.Category.STRUCT;
     }
   ```

##########
File path: 
pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java
##########
@@ -175,68 +186,102 @@ public GenericRow next(GenericRow reuse)
       }
       String field = _orcFields.get(i);
       TypeDescription fieldType = _orcFieldTypes.get(i);
-      TypeDescription.Category category = fieldType.getCategory();
-      if (category == TypeDescription.Category.LIST) {
-        // Multi-value field, extract to Object[]
-        TypeDescription.Category childCategory = 
fieldType.getChildren().get(0).getCategory();
-        ListColumnVector listColumnVector = (ListColumnVector) 
_rowBatch.cols[i];
-        int rowId = listColumnVector.isRepeating ? 0 : _nextRowId;
-        if ((listColumnVector.noNulls || !listColumnVector.isNull[rowId])) {
-          int offset = (int) listColumnVector.offsets[rowId];
-          int length = (int) listColumnVector.lengths[rowId];
-          List<Object> values = new ArrayList<>(length);
-          for (int j = 0; j < length; j++) {
-            Object value = extractSingleValue(field, listColumnVector.child, 
offset + j, childCategory);
-            // NOTE: Only keep non-null values
-            // TODO: Revisit
-            if (value != null) {
-              values.add(value);
-            }
-          }
-          if (!values.isEmpty()) {
-            reuse.putValue(field, values.toArray());
-          } else {
-            // NOTE: Treat empty list as null
-            // TODO: Revisit
-            reuse.putValue(field, null);
-          }
-        } else {
-          reuse.putValue(field, null);
+      reuse.putValue(field, extractValue(field, _rowBatch.cols[i], fieldType, 
_nextRowId));
+    }
+
+    if (++_nextRowId == _rowBatch.size) {
+      _hasNext = _orcRecordReader.nextBatch(_rowBatch);
+      _nextRowId = 0;
+    }
+    return reuse;
+  }
+
+  /**
+   * Extracts the values for a given column vector.
+   *
+   * @param field name of the field being extracted
+   * @param columnVector contains values of the field and its sub-types
+   * @param fieldType information about the field such as the category 
(STRUCT, LIST, MAP, INT, etc)
+   * @param rowId the ID of the row value being extracted
+   * @return extracted row value from the column
+   */
+  @Nullable
+  private Object extractValue(String field, ColumnVector columnVector, 
TypeDescription fieldType, int rowId) {
+    TypeDescription.Category category = fieldType.getCategory();
+
+    if (category == TypeDescription.Category.STRUCT) {

Review comment:
       And pls adjust the sequence to make it consistent in all extractors 
(first check collection, then map, then record, and finally single value).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to