This is an automated email from the ASF dual-hosted git repository. jiafengzheng pushed a commit to branch branch-for-flink-before-1.13 in repository https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git
The following commit(s) were added to refs/heads/branch-for-flink-before-1.13 by this push: new 6229363 [Bug] fix flink schema and doris schema column not match (#29) 6229363 is described below commit 6229363bbf873840d009765f50fd275153399ac4 Author: wudi <676366...@qq.com> AuthorDate: Mon Apr 25 13:49:08 2022 +0800 [Bug] fix flink schema and doris schema column not match (#29) * fix flink schema and doris schema column not match --- .../src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java | 7 +++++++ .../java/org/apache/doris/flink/table/DorisDynamicTableSource.java | 7 +++++++ .../java/org/apache/doris/flink/table/DorisRowDataInputFormat.java | 3 +++ .../src/test/java/org/apache/doris/flink/DorisSourceExample.java | 1 + 4 files changed, 18 insertions(+) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java index 0beb18c..d21ccaa 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java @@ -98,6 +98,13 @@ public class DorisReadOptions implements Serializable { return deserializeArrowAsync; } + public void setReadFields(String readFields) { + this.readFields = readFields; + } + + public void setFilterQuery(String filterQuery) { + this.filterQuery = filterQuery; + } public static Builder builder() { return new Builder(); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java index 689aa47..37ee11b 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java @@ -38,7 +38,9 @@ import org.apache.flink.table.types.logical.RowType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; /** * The {@link DorisDynamicTableSource} is used during planning. @@ -70,8 +72,13 @@ public final class DorisDynamicTableSource implements ScanTableSource, LookupTab @Override public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { + readOptions.setReadFields(Arrays.stream(physicalSchema.getFieldNames()) + .map(item->String.format("`%s`", item.trim().replace("`", ""))) + .collect(Collectors.joining(", "))); + List<PartitionDefinition> dorisPartitions; try { + //request doris query plan dorisPartitions = RestService.findPartitions(options, readOptions, LOG); } catch (DorisException e) { throw new RuntimeException("Failed fetch doris partitions"); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java index eeb63ba..c353de3 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java @@ -158,6 +158,9 @@ public class DorisRowDataInputFormat extends RichInputFormat<RowData, DorisTable } private Object deserialize(LogicalType type, Object val) { + if(val == null){ + return null; + } switch (type.getTypeRoot()) { case DECIMAL: final DecimalType decimalType = ((DecimalType) type); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceExample.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceExample.java index 35857dc..0abdf41 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceExample.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceExample.java @@ -50,6 +50,7 @@ public class DorisSourceExample { " 'connector' = 'doris',\n" + " 'fenodes' = 'FE_IP:8030',\n" + " 'table.identifier' = 'db.table',\n" + + " 'doris.filter.query' = 'bigint_1=1',\n" + " 'username' = 'root',\n" + " 'password' = ''\n" + ")"); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org