This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new 8465cfc [fix](connector) fix flight sql read json type (#263) 8465cfc is described below commit 8465cfc973efc820a8883ceeabd4a6cc322a78f7 Author: gnehil <adamlee...@gmail.com> AuthorDate: Mon Feb 10 10:45:49 2025 +0800 [fix](connector) fix flight sql read json type (#263) --- .../doris/spark/client/read/DorisFlightSqlReader.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java index 7a2d34e..faa462b 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java +++ b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java @@ -72,12 +72,7 @@ public class DorisFlightSqlReader extends DorisReader { throw new DorisException("init adbc connection failed", e); } } - String tableIdentifier = config.getValue(DorisOptions.DORIS_TABLE_IDENTIFIER); - String[] arr = tableIdentifier.split("\\."); - - Schema tableSchema = frontendClient.getTableSchema(arr[0], arr[1]); - this.schema = processDorisSchema(partition, tableSchema); - log.debug("origin flight sql read Schema: " + tableSchema + ", processed schema: " + schema); + this.schema = processDorisSchema(partition); this.arrowReader = executeQuery(); } @@ -153,7 +148,7 @@ public class DorisFlightSqlReader extends DorisReader { return String.format("SELECT %s FROM %s %s%s%s", columns, fullTableName, tablets, predicates, limit); } - protected Schema processDorisSchema(DorisReaderPartition partition, final Schema originSchema) throws Exception { + protected Schema processDorisSchema(DorisReaderPartition partition) throws Exception { Schema processedSchema = new Schema(); Schema tableSchema = frontendClient.getTableSchema(partition.getDatabase(), partition.getTable()); Map<String, Field> fieldTypeMap = tableSchema.getProperties().stream() @@ -170,7 +165,12 @@ public class DorisFlightSqlReader extends DorisReader { newFieldList.add(new Field(realColumn, TPrimitiveType.VARCHAR.name(), null, 0, 0, null)); } } else { - newFieldList.add(fieldTypeMap.get(readColumn.trim().replaceAll("`", ""))); + String colName = readColumn.trim().replaceAll("`", ""); + if ("JSON".equalsIgnoreCase(fieldTypeMap.get(colName).getType())) { + newFieldList.add(new Field(colName, TPrimitiveType.JSONB.name(), null, 0, 0, null)); + } else { + newFieldList.add(fieldTypeMap.get(colName)); + } } } processedSchema.setProperties(newFieldList); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org