JNSimba commented on code in PR #33:
URL: 
https://github.com/apache/incubator-doris-flink-connector/pull/33#discussion_r886640397


##########
flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowDataSerializer.java:
##########
@@ -114,6 +118,93 @@ public String parseDeleteSign(RowKind rowKind) {
         }
     }
 
+    private RowData.FieldGetter createFieldGetter(LogicalType fieldType, int 
fieldPos) {
+        final RowData.FieldGetter fieldGetter;
+        // ordered by type root definition
+        switch (fieldType.getTypeRoot()) {
+            case CHAR:
+            case VARCHAR:
+                fieldGetter = row -> row.getString(fieldPos);
+                break;
+            case BOOLEAN:
+                fieldGetter = row -> row.getBoolean(fieldPos);
+                break;
+            case BINARY:
+            case VARBINARY:
+                fieldGetter = row -> row.getBinary(fieldPos);
+                break;
+            case DECIMAL:
+                final int decimalPrecision = getPrecision(fieldType);
+                final int decimalScale = getScale(fieldType);
+                fieldGetter = row -> row.getDecimal(fieldPos, 
decimalPrecision, decimalScale);
+                break;
+            case TINYINT:
+                fieldGetter = row -> row.getByte(fieldPos);
+                break;
+            case SMALLINT:
+                fieldGetter = row -> row.getShort(fieldPos);
+                break;
+            case INTEGER:
+            case TIME_WITHOUT_TIME_ZONE:
+            case INTERVAL_YEAR_MONTH:
+                fieldGetter = row -> row.getInt(fieldPos);
+                break;
+            case BIGINT:
+            case INTERVAL_DAY_TIME:
+                fieldGetter = row -> row.getLong(fieldPos);
+                break;
+            case FLOAT:
+                fieldGetter = row -> row.getFloat(fieldPos);
+                break;
+            case DOUBLE:
+                fieldGetter = row -> row.getDouble(fieldPos);
+                break;
+            case DATE:
+                fieldGetter = row -> 
Date.valueOf(LocalDate.ofEpochDay(row.getInt(fieldPos)));
+                break;
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                final int timestampPrecision = getPrecision(fieldType);
+                fieldGetter = row -> row.getTimestamp(fieldPos, 
timestampPrecision);
+                break;
+            case TIMESTAMP_WITH_TIME_ZONE:
+                throw new UnsupportedOperationException();
+            case ARRAY:
+                fieldGetter = row -> row.getArray(fieldPos);
+                break;
+            case MULTISET:
+            case MAP:
+                fieldGetter = row -> row.getMap(fieldPos);
+                break;
+            case ROW:
+            case STRUCTURED_TYPE:
+                final int rowFieldCount = getFieldCount(fieldType);
+                fieldGetter = row -> row.getRow(fieldPos, rowFieldCount);
+                break;
+            case DISTINCT_TYPE:
+                fieldGetter =
+                        createFieldGetter(((DistinctType) 
fieldType).getSourceType(), fieldPos);
+                break;
+            case RAW:
+                fieldGetter = row -> row.getRawValue(fieldPos);
+                break;
+            case NULL:
+            case SYMBOL:
+            case UNRESOLVED:
+            default:
+                throw new IllegalArgumentException();
+        }
+        if (!fieldType.isNullable()) {
+            return fieldGetter;
+        }
+        return row -> {
+            if (row.isNullAt(fieldPos)) {
+                return null;
+            }
+            return fieldGetter.getFieldOrNull(row);
+        };
+    }
+
     public static Builder builder() {

Review Comment:
   The type conversion here is recommended to be abstracted and placed in 
`DorisRowConverter`, you can refer to `DeserializationConverter` or 
`org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to