This is an automated email from the ASF dual-hosted git repository. jiafengzheng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 706baa0 [Feature] Refactoring DorisSouce based on FLIP-27 (#24) 706baa0 is described below commit 706baa0348224059c3754a78d0a957e0258811ca Author: wudi <676366...@qq.com> AuthorDate: Thu Apr 21 10:40:47 2022 +0800 [Feature] Refactoring DorisSouce based on FLIP-27 (#24) * Refactoring DorisSouce based on FLIP-27 --- .../apache/doris/flink/cfg/DorisReadOptions.java | 17 ++- .../DorisDeserializationSchema.java | 8 ++ .../RowDataDeserializationSchema.java | 50 ++++++++ .../SimpleListDeserializationSchema.java | 7 +- .../converter/DorisRowConverter.java | 128 +++++++++++++++++++++ .../doris/flink/rest/PartitionDefinition.java | 14 +-- .../org/apache/doris/flink/rest/RestService.java | 2 +- .../apache/doris/flink/serialization/RowBatch.java | 5 +- .../doris/flink/sink/writer/DorisStreamLoad.java | 14 +-- .../org/apache/doris/flink/source/DorisSource.java | 123 ++++++++++++++++++++ .../doris/flink/source/DorisSourceBuilder.java | 68 +++++++++++ .../flink/source/assigners/DorisSplitAssigner.java | 53 +++++++++ .../source/assigners/SimpleSplitAssigner.java | 58 ++++++++++ .../source/enumerator/DorisSourceEnumerator.java | 97 ++++++++++++++++ .../source/enumerator/PendingSplitsCheckpoint.java | 50 ++++++++ .../PendingSplitsCheckpointSerializer.java | 124 ++++++++++++++++++++ .../flink/source/reader/DorisRecordEmitter.java | 62 ++++++++++ .../flink/source/reader/DorisSourceReader.java | 69 +++++++++++ .../source/reader/DorisSourceSplitReader.java | 103 +++++++++++++++++ .../doris/flink/source/split/DorisSourceSplit.java | 73 ++++++++++++ .../source/split/DorisSourceSplitSerializer.java | 110 ++++++++++++++++++ .../split/DorisSourceSplitState.java} | 17 ++- .../flink/source/split/DorisSplitRecords.java | 81 +++++++++++++ .../flink/table/DorisDynamicTableFactory.java | 15 ++- .../doris/flink/table/DorisDynamicTableSource.java | 48 +++++--- .../doris/flink/table/DorisRowDataInputFormat.java | 22 ++-- .../RowDataDeserializationSchemaTest.java | 65 +++++++++++ .../convert/DorisRowConverterTest.java | 62 ++++++++++ .../org/apache/doris/flink/sink/OptionUtils.java | 27 +++-- .../doris/flink/source/DorisSourceExampleTest.java | 48 ++++++++ .../PendingSplitsCheckpointSerializerTest.java | 50 ++++++++ .../flink/source/reader/DorisSourceReaderTest.java | 69 +++++++++++ .../flink/source/reader/TestingReaderContext.java | 98 ++++++++++++++++ .../split/DorisSourceSplitSerializerTest.java | 44 +++++++ .../flink/source/split/DorisSplitRecordsTest.java} | 23 ++-- .../flink/table/DorisDynamicTableSourceTest.java | 86 ++++++++++++++ .../org/apache/doris/flink/utils/FactoryMocks.java | 46 ++++++++ 37 files changed, 1953 insertions(+), 83 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java index 0beb18c..aead8a8 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java @@ -37,10 +37,11 @@ public class DorisReadOptions implements Serializable { private Long execMemLimit; private Integer deserializeQueueSize; private Boolean deserializeArrowAsync; + private boolean useOldApi; public DorisReadOptions(String readFields, String filterQuery, Integer requestTabletSize, Integer requestConnectTimeoutMs, Integer requestReadTimeoutMs, Integer requestQueryTimeoutS, Integer requestRetries, Integer requestBatchSize, Long execMemLimit, - Integer deserializeQueueSize, Boolean deserializeArrowAsync) { + Integer deserializeQueueSize, Boolean deserializeArrowAsync, boolean useOldApi) { this.readFields = readFields; this.filterQuery = filterQuery; this.requestTabletSize = requestTabletSize; @@ -52,6 +53,7 @@ public class DorisReadOptions implements Serializable { this.execMemLimit = execMemLimit; this.deserializeQueueSize = deserializeQueueSize; this.deserializeArrowAsync = deserializeArrowAsync; + this.useOldApi = useOldApi; } public String getReadFields() { @@ -98,12 +100,15 @@ public class DorisReadOptions implements Serializable { return deserializeArrowAsync; } + public boolean getUseOldApi() { + return useOldApi; + } public static Builder builder() { return new Builder(); } - public static DorisReadOptions defaults(){ + public static DorisReadOptions defaults() { return DorisReadOptions.builder().build(); } @@ -123,6 +128,7 @@ public class DorisReadOptions implements Serializable { private Long execMemLimit; private Integer deserializeQueueSize; private Boolean deserializeArrowAsync; + private Boolean useOldApi = false; public Builder setReadFields(String readFields) { @@ -180,8 +186,13 @@ public class DorisReadOptions implements Serializable { return this; } + public Builder setUseOldApi(boolean useOldApi) { + this.useOldApi = useOldApi; + return this; + } + public DorisReadOptions build() { - return new DorisReadOptions(readFields, filterQuery, requestTabletSize, requestConnectTimeoutMs, requestReadTimeoutMs, requestQueryTimeoutS, requestRetries, requestBatchSize, execMemLimit, deserializeQueueSize, deserializeArrowAsync); + return new DorisReadOptions(readFields, filterQuery, requestTabletSize, requestConnectTimeoutMs, requestReadTimeoutMs, requestQueryTimeoutS, requestRetries, requestBatchSize, execMemLimit, deserializeQueueSize, deserializeArrowAsync, useOldApi); } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/DorisDeserializationSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/DorisDeserializationSchema.java index 2aaec99..608fa5c 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/DorisDeserializationSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/DorisDeserializationSchema.java @@ -17,8 +17,16 @@ package org.apache.doris.flink.deserialization; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.util.Collector; import java.io.Serializable; +import java.util.List; +/** + * The deserialization schema describes how to turn the doris list record into data types + * (Java/Scala objects) that are processed by Flink. + **/ public interface DorisDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> { + + void deserialize(List<?> record, Collector<T> out) throws Exception; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/RowDataDeserializationSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/RowDataDeserializationSchema.java new file mode 100644 index 0000000..f9c65b3 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/RowDataDeserializationSchema.java @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.deserialization; + +import org.apache.doris.flink.deserialization.converter.DorisRowConverter; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.Collector; +import java.util.List; + + +/** + * A simple implementation of {@link DorisDeserializationSchema} which converts the received + * list record into {@link GenericRowData}. + */ +public class RowDataDeserializationSchema implements DorisDeserializationSchema<RowData> { + + private final DorisRowConverter rowConverter; + + public RowDataDeserializationSchema(RowType rowType) { + this.rowConverter = new DorisRowConverter(rowType); + } + + @Override + public TypeInformation<RowData> getProducedType() { + return TypeInformation.of(RowData.class); + } + + @Override + public void deserialize(List<?> record, Collector<RowData> out) throws Exception { + RowData row = rowConverter.convert(record); + out.collect(row); + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java index d9ec6e5..43e68ef 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java @@ -19,10 +19,10 @@ package org.apache.doris.flink.deserialization; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.util.Collector; import java.util.List; - public class SimpleListDeserializationSchema implements DorisDeserializationSchema<List<?>> { @Override @@ -30,4 +30,9 @@ public class SimpleListDeserializationSchema implements DorisDeserializationSche return TypeInformation.of(new TypeHint<List<?>>() { }); } + + @Override + public void deserialize(List<?> record, Collector<List<?>> out) throws Exception { + out.collect(record); + } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java new file mode 100644 index 0000000..b84fcdb --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java @@ -0,0 +1,128 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.deserialization.converter; + +import org.apache.doris.flink.serialization.RowBatch; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +public class DorisRowConverter implements Serializable { + + private static final long serialVersionUID = 1L; + private final DeserializationConverter[] deserializationConverters; + + public DorisRowConverter(RowType rowType) { + checkNotNull(rowType); + this.deserializationConverters = new DeserializationConverter[rowType.getFieldCount()]; + for (int i = 0; i < rowType.getFieldCount(); i++) { + deserializationConverters[i] = createNullableConverter(rowType.getTypeAt(i)); + } + } + + /** + * Convert data retrieved from {@link RowBatch} to {@link RowData}. + * + * @param record from rowBatch + */ + public GenericRowData convert(List record){ + GenericRowData rowData = new GenericRowData(record.size()); + for (int i = 0; i < record.size(); i++) { + rowData.setField(i, deserializationConverters[i].deserialize(record.get(i))); + } + return rowData; + } + + + /** + * Create a nullable runtime {@link DeserializationConverter} from given {@link + * LogicalType}. + */ + protected DeserializationConverter createNullableConverter(LogicalType type) { + return wrapIntoNullableInternalConverter(createConverter(type)); + } + + protected DeserializationConverter wrapIntoNullableInternalConverter( + DeserializationConverter deserializationConverter) { + return val -> { + if (val == null) { + return null; + } else { + return deserializationConverter.deserialize(val); + } + }; + } + + /** Runtime converter to convert doris field to {@link RowData} type object. */ + @FunctionalInterface + interface DeserializationConverter extends Serializable { + /** + * Convert a doris field object of {@link RowBatch } to the data structure object. + * + * @param field + */ + Object deserialize(Object field); + } + + protected DeserializationConverter createConverter(LogicalType type) { + switch (type.getTypeRoot()) { + case NULL: + return val -> null; + case BOOLEAN: + case FLOAT: + case DOUBLE: + case INTERVAL_YEAR_MONTH: + case INTERVAL_DAY_TIME: + case TINYINT: + case SMALLINT: + case INTEGER: + case BIGINT: + return val -> val; + case DECIMAL: + final int precision = ((DecimalType) type).getPrecision(); + final int scale = ((DecimalType) type).getScale(); + return val -> DecimalData.fromBigDecimal((BigDecimal) val, precision, scale); + case TIMESTAMP_WITH_TIME_ZONE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + case DATE: + case CHAR: + case VARCHAR: + return val -> StringData.fromString((String) val); + case TIME_WITHOUT_TIME_ZONE: + case BINARY: + case VARBINARY: + case ARRAY: + case ROW: + case MAP: + case MULTISET: + case RAW: + default: + throw new UnsupportedOperationException("Unsupported type:" + type); + } + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java index 8a66f76..e753d66 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java @@ -17,8 +17,6 @@ package org.apache.doris.flink.rest; -import org.apache.doris.flink.cfg.DorisOptions; - import java.io.Serializable; import java.util.Collections; import java.util.HashSet; @@ -35,16 +33,10 @@ public class PartitionDefinition implements Serializable, Comparable<PartitionDe private final String beAddress; private final Set<Long> tabletIds; private final String queryPlan; - private final String serializedSettings; public PartitionDefinition(String database, String table, - DorisOptions settings, String beAddress, Set<Long> tabletIds, String queryPlan) + String beAddress, Set<Long> tabletIds, String queryPlan) throws IllegalArgumentException { - if (settings != null) { - this.serializedSettings = settings.save(); - } else { - this.serializedSettings = null; - } this.database = database; this.table = table; this.beAddress = beAddress; @@ -72,7 +64,6 @@ public class PartitionDefinition implements Serializable, Comparable<PartitionDe return queryPlan; } - @Override public int compareTo(PartitionDefinition o) { int cmp = database.compareTo(o.database); @@ -123,8 +114,7 @@ public class PartitionDefinition implements Serializable, Comparable<PartitionDe Objects.equals(table, that.table) && Objects.equals(beAddress, that.beAddress) && Objects.equals(tabletIds, that.tabletIds) && - Objects.equals(queryPlan, that.queryPlan) && - Objects.equals(serializedSettings, that.serializedSettings); + Objects.equals(queryPlan, that.queryPlan) ; } @Override diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java index 6ac84bc..734bfdb 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java @@ -675,7 +675,7 @@ public class RestService implements Serializable { first, Math.min(beInfo.getValue().size(), first + tabletsSize))); first = first + tabletsSize; PartitionDefinition partitionDefinition = - new PartitionDefinition(database, table, options, + new PartitionDefinition(database, table, beInfo.getKey(), partitionTablets, opaquedQueryPlan); logger.debug("Generate one PartitionDefinition '{}'.", partitionDefinition); partitions.add(partitionDefinition); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java index d235aa9..3be6f87 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java @@ -36,6 +36,8 @@ import org.apache.doris.flink.exception.DorisException; import org.apache.doris.flink.rest.models.Schema; import org.apache.doris.thrift.TScanBatchResult; import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -44,9 +46,6 @@ import java.util.ArrayList; import java.util.List; import java.util.NoSuchElementException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * row batch data container. */ diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java index 7849559..b468cc8 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java @@ -17,10 +17,6 @@ package org.apache.doris.flink.sink.writer; -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.util.Preconditions; - -import org.apache.flink.util.concurrent.ExecutorThreadFactory; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.doris.flink.cfg.DorisExecutionOptions; @@ -31,7 +27,9 @@ import org.apache.doris.flink.exception.StreamLoadException; import org.apache.doris.flink.rest.models.RespContent; import org.apache.doris.flink.sink.HttpPutBuilder; import org.apache.doris.flink.sink.ResponseUtil; - +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.entity.InputStreamEntity; import org.apache.http.impl.client.CloseableHttpClient; @@ -41,7 +39,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; -import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -53,11 +50,8 @@ import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import static org.apache.doris.flink.sink.LoadStatus.FAIL; -import static org.apache.doris.flink.sink.ResponseUtil.LABEL_EXIST_PATTERN; import static org.apache.doris.flink.sink.LoadStatus.LABEL_ALREADY_EXIST; -import static org.apache.doris.flink.sink.writer.LoadConstants.CSV; -import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY; -import static org.apache.doris.flink.sink.writer.LoadConstants.JSON; +import static org.apache.doris.flink.sink.ResponseUtil.LABEL_EXIST_PATTERN; import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT; import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java new file mode 100644 index 0000000..e2b6d41 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java @@ -0,0 +1,123 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.source; + +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.cfg.DorisReadOptions; +import org.apache.doris.flink.deserialization.DorisDeserializationSchema; +import org.apache.doris.flink.rest.PartitionDefinition; +import org.apache.doris.flink.rest.RestService; +import org.apache.doris.flink.source.assigners.DorisSplitAssigner; +import org.apache.doris.flink.source.assigners.SimpleSplitAssigner; +import org.apache.doris.flink.source.enumerator.DorisSourceEnumerator; +import org.apache.doris.flink.source.enumerator.PendingSplitsCheckpoint; +import org.apache.doris.flink.source.enumerator.PendingSplitsCheckpointSerializer; +import org.apache.doris.flink.source.reader.DorisRecordEmitter; +import org.apache.doris.flink.source.reader.DorisSourceReader; +import org.apache.doris.flink.source.split.DorisSourceSplit; +import org.apache.doris.flink.source.split.DorisSourceSplitSerializer; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * DorisSource based on FLIP-27 which is a BOUNDED stream. + **/ +public class DorisSource<OUT> implements Source<OUT, DorisSourceSplit, PendingSplitsCheckpoint>, + ResultTypeQueryable<OUT> { + + private static final Logger LOG = LoggerFactory.getLogger(DorisSource.class); + + private final DorisOptions options; + private final DorisReadOptions readOptions; + + // Boundedness + private final Boundedness boundedness; + private final DorisDeserializationSchema<OUT> deserializer; + + public DorisSource(DorisOptions options, + DorisReadOptions readOptions, + Boundedness boundedness, + DorisDeserializationSchema<OUT> deserializer) { + this.options = options; + this.readOptions = readOptions; + this.boundedness = boundedness; + this.deserializer = deserializer; + } + + @Override + public Boundedness getBoundedness() { + return this.boundedness; + } + + @Override + public SourceReader<OUT, DorisSourceSplit> createReader(SourceReaderContext readerContext) throws Exception { + return new DorisSourceReader<>( + options, + readOptions, + new DorisRecordEmitter<>(deserializer), + readerContext, + readerContext.getConfiguration() + ); + } + + @Override + public SplitEnumerator<DorisSourceSplit, PendingSplitsCheckpoint> createEnumerator(SplitEnumeratorContext<DorisSourceSplit> context) throws Exception { + List<DorisSourceSplit> dorisSourceSplits = new ArrayList<>(); + List<PartitionDefinition> partitions = RestService.findPartitions(options, readOptions, LOG); + partitions.forEach(m -> dorisSourceSplits.add(new DorisSourceSplit(m))); + DorisSplitAssigner splitAssigner = new SimpleSplitAssigner(dorisSourceSplits); + + return new DorisSourceEnumerator(context, splitAssigner); + } + + @Override + public SplitEnumerator<DorisSourceSplit, PendingSplitsCheckpoint> restoreEnumerator( + SplitEnumeratorContext<DorisSourceSplit> context, + PendingSplitsCheckpoint checkpoint) throws Exception { + Collection<DorisSourceSplit> splits = checkpoint.getSplits(); + DorisSplitAssigner splitAssigner = new SimpleSplitAssigner(splits); + return new DorisSourceEnumerator(context, splitAssigner); + } + + @Override + public SimpleVersionedSerializer<DorisSourceSplit> getSplitSerializer() { + return DorisSourceSplitSerializer.INSTANCE; + } + + @Override + public SimpleVersionedSerializer<PendingSplitsCheckpoint> getEnumeratorCheckpointSerializer() { + return new PendingSplitsCheckpointSerializer(getSplitSerializer()); + } + + @Override + public TypeInformation<OUT> getProducedType() { + return deserializer.getProducedType(); + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSourceBuilder.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSourceBuilder.java new file mode 100644 index 0000000..8d96c08 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSourceBuilder.java @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.source; + +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.cfg.DorisReadOptions; +import org.apache.doris.flink.deserialization.DorisDeserializationSchema; +import org.apache.flink.api.connector.source.Boundedness; + +/** + * The builder class for {@link DorisSource} to make it easier for the users to construct a {@link + * DorisSource}. + **/ +public class DorisSourceBuilder<OUT> { + + private DorisOptions options; + private DorisReadOptions readOptions; + + // Boundedness + private Boundedness boundedness; + private DorisDeserializationSchema<OUT> deserializer; + + DorisSourceBuilder() { + boundedness = Boundedness.BOUNDED; + } + + public static <OUT> DorisSourceBuilder<OUT> builder() { + return new DorisSourceBuilder(); + } + + public DorisSourceBuilder<OUT> setDorisOptions(DorisOptions options) { + this.options = options; + return this; + } + + public DorisSourceBuilder<OUT> setDorisReadOptions(DorisReadOptions readOptions) { + this.readOptions = readOptions; + return this; + } + + public DorisSourceBuilder<OUT> setBoundedness(Boundedness boundedness) { + this.boundedness = boundedness; + return this; + } + + public DorisSourceBuilder<OUT> setDeserializer(DorisDeserializationSchema<OUT> deserializer) { + this.deserializer = deserializer; + return this; + } + + public DorisSource<OUT> build() { + return new DorisSource<>(options, readOptions, boundedness, deserializer); + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/DorisSplitAssigner.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/DorisSplitAssigner.java new file mode 100644 index 0000000..bf541b3 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/DorisSplitAssigner.java @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.source.assigners; + +import org.apache.doris.flink.source.enumerator.PendingSplitsCheckpoint; +import org.apache.doris.flink.source.split.DorisSourceSplit; + +import javax.annotation.Nullable; +import java.util.Collection; +import java.util.Optional; + +/** + * The {@code DorisSplitAssigner} is responsible for deciding what split should be processed. It + * determines split processing order. + */ +public interface DorisSplitAssigner { + + /** + * Gets the next split. + * + * <p>When this method returns an empty {@code Optional}, then the set of splits is assumed to + * be done and the source will finish once the readers finished their current splits. + */ + Optional<DorisSourceSplit> getNext(@Nullable String hostname); + + /** + * Adds a set of splits to this assigner. This happens for example when some split processing + * failed and the splits need to be re-added, or when new splits got discovered. + */ + void addSplits(Collection<DorisSourceSplit> splits); + + /** + * Creates a snapshot of the state of this split assigner, to be stored in a checkpoint. + * + * @param checkpointId The ID of the checkpoint for which the snapshot is created. + * @return an object containing the state of the split enumerator. + */ + PendingSplitsCheckpoint snapshotState(long checkpointId); +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java new file mode 100644 index 0000000..eef17f2 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.source.assigners; + +import org.apache.doris.flink.source.enumerator.PendingSplitsCheckpoint; +import org.apache.doris.flink.source.split.DorisSourceSplit; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Optional; + +/** + * The {@code SimpleSplitAssigner} hands out splits in a random order. + **/ +public class SimpleSplitAssigner implements DorisSplitAssigner { + + private final ArrayList<DorisSourceSplit> splits; + + public SimpleSplitAssigner(Collection<DorisSourceSplit> splits) { + this.splits = new ArrayList<>(splits); + } + + @Override + public Optional<DorisSourceSplit> getNext(@Nullable String hostname) { + final int size = splits.size(); + return size == 0 ? Optional.empty() : Optional.of(splits.remove(size - 1)); + } + + @Override + public void addSplits(Collection<DorisSourceSplit> splits) { + splits.addAll(splits); + } + + @Override + public PendingSplitsCheckpoint snapshotState(long checkpointId) { + return new PendingSplitsCheckpoint(splits); + } + + @Override + public String toString() { + return "SimpleSplitAssigner " + splits; + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumerator.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumerator.java new file mode 100644 index 0000000..3275a22 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/DorisSourceEnumerator.java @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.source.enumerator; + +import org.apache.doris.flink.source.DorisSource; +import org.apache.doris.flink.source.assigners.DorisSplitAssigner; +import org.apache.doris.flink.source.split.DorisSourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.List; +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A SplitEnumerator implementation for bounded / batch {@link DorisSource} input. + * <p> + * This enumerator takes all backend tablets and assigns them to the readers. + * Once tablets are processed, the source is finished. + */ +public class DorisSourceEnumerator + implements SplitEnumerator<DorisSourceSplit, PendingSplitsCheckpoint> { + + private static final Logger LOG = LoggerFactory.getLogger(DorisSourceEnumerator.class); + private final SplitEnumeratorContext<DorisSourceSplit> context; + + private final DorisSplitAssigner splitAssigner; + + public DorisSourceEnumerator(SplitEnumeratorContext<DorisSourceSplit> context, + DorisSplitAssigner splitAssigner) { + this.context = context; + this.splitAssigner = checkNotNull(splitAssigner); + } + + @Override + public void start() { + // no resources to start + } + + @Override + public void handleSplitRequest(int subtaskId, @Nullable String hostname) { + if (!context.registeredReaders().containsKey(subtaskId)) { + // reader failed between sending the request and now. skip this request. + return; + } + + final Optional<DorisSourceSplit> nextSplit = splitAssigner.getNext(hostname); + if (nextSplit.isPresent()) { + final DorisSourceSplit split = nextSplit.get(); + context.assignSplit(split, subtaskId); + LOG.info("Assigned split to subtask {} : {}", subtaskId, split); + } else { + context.signalNoMoreSplits(subtaskId); + LOG.info("No more splits available for subtask {}", subtaskId); + } + } + + @Override + public void addSplitsBack(List<DorisSourceSplit> splits, int subtaskId) { + LOG.debug("Doris Source Enumerator adds splits back: {}", splits); + splitAssigner.addSplits(splits); + } + + @Override + public void addReader(int subtaskId) { + // do nothing + } + + @Override + public PendingSplitsCheckpoint snapshotState(long checkpointId) throws Exception { + return splitAssigner.snapshotState(checkpointId); + } + + @Override + public void close() throws IOException { + // no resources to close + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/PendingSplitsCheckpoint.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/PendingSplitsCheckpoint.java new file mode 100644 index 0000000..3edba0f --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/PendingSplitsCheckpoint.java @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.source.enumerator; + +import org.apache.doris.flink.source.split.DorisSourceSplit; + +import javax.annotation.Nullable; +import java.util.Collection; + +/** + * A checkpoint of the current state of the containing the currently pending splits that are not yet + * assigned. + */ +public class PendingSplitsCheckpoint { + + /** + * The splits in the checkpoint. + */ + private final Collection<DorisSourceSplit> splits; + + /** + * The cached byte representation from the last serialization step. This helps to avoid paying + * repeated serialization cost for the same checkpoint object. This field is used by {@link + * PendingSplitsCheckpointSerializer}. + */ + @Nullable + byte[] serializedFormCache; + + public PendingSplitsCheckpoint(Collection<DorisSourceSplit> splits) { + this.splits = splits; + } + + public Collection<DorisSourceSplit> getSplits() { + return splits; + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/PendingSplitsCheckpointSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/PendingSplitsCheckpointSerializer.java new file mode 100644 index 0000000..deeb544 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/enumerator/PendingSplitsCheckpointSerializer.java @@ -0,0 +1,124 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.source.enumerator; + +import org.apache.doris.flink.source.split.DorisSourceSplit; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.ArrayList; +import java.util.Collection; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A serializer for the {@link PendingSplitsCheckpoint}. + */ +public class PendingSplitsCheckpointSerializer + implements SimpleVersionedSerializer<PendingSplitsCheckpoint> { + + private static final int VERSION = 1; + + private static final int VERSION_1_MAGIC_NUMBER = 0xDEADBEEF; + + private final SimpleVersionedSerializer<DorisSourceSplit> splitSerializer; + + public PendingSplitsCheckpointSerializer(SimpleVersionedSerializer<DorisSourceSplit> splitSerializer) { + this.splitSerializer = checkNotNull(splitSerializer); + } + + @Override + public int getVersion() { + return VERSION; + } + + @Override + public byte[] serialize(PendingSplitsCheckpoint checkpoint) throws IOException { + // optimization: the splits lazily cache their own serialized form + if (checkpoint.serializedFormCache != null) { + return checkpoint.serializedFormCache; + } + + Collection<DorisSourceSplit> splits = checkpoint.getSplits(); + final ArrayList<byte[]> serializedSplits = new ArrayList<>(splits.size()); + + int totalLen = + 12; // four ints: magic, version of split serializer, count splits + + for (DorisSourceSplit split : splits) { + final byte[] serSplit = splitSerializer.serialize(split); + serializedSplits.add(serSplit); + totalLen += serSplit.length + 4; // 4 bytes for the length field + } + + final byte[] result = new byte[totalLen]; + final ByteBuffer byteBuffer = ByteBuffer.wrap(result).order(ByteOrder.LITTLE_ENDIAN); + byteBuffer.putInt(VERSION_1_MAGIC_NUMBER); + byteBuffer.putInt(splitSerializer.getVersion()); + byteBuffer.putInt(serializedSplits.size()); + + for (byte[] splitBytes : serializedSplits) { + byteBuffer.putInt(splitBytes.length); + byteBuffer.put(splitBytes); + } + + assert byteBuffer.remaining() == 0; + + // optimization: cache the serialized from, so we avoid the byte work during repeated + // serialization + checkpoint.serializedFormCache = result; + + return result; + } + + @Override + public PendingSplitsCheckpoint deserialize(int version, byte[] serialized) throws IOException { + if (version == 1) { + return deserialize(serialized); + } + throw new IOException("Unknown version: " + version); + } + + private PendingSplitsCheckpoint deserialize(byte[] serialized) throws IOException { + final ByteBuffer bb = ByteBuffer.wrap(serialized).order(ByteOrder.LITTLE_ENDIAN); + + final int magic = bb.getInt(); + if (magic != VERSION_1_MAGIC_NUMBER) { + throw new IOException( + String.format( + "Invalid magic number for PendingSplitsCheckpoint. " + + "Expected: %X , found %X", + VERSION_1_MAGIC_NUMBER, magic)); + } + + final int splitSerializerVersion = bb.getInt(); + final int numSplits = bb.getInt(); + + SimpleVersionedSerializer<DorisSourceSplit> splitSerializer = this.splitSerializer;// stack cache + final ArrayList<DorisSourceSplit> splits = new ArrayList<>(numSplits); + + for (int remaining = numSplits; remaining > 0; remaining--) { + final byte[] bytes = new byte[bb.getInt()]; + bb.get(bytes); + final DorisSourceSplit split = splitSerializer.deserialize(splitSerializerVersion, bytes); + splits.add(split); + } + return new PendingSplitsCheckpoint(splits); + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisRecordEmitter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisRecordEmitter.java new file mode 100644 index 0000000..61c31fc --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisRecordEmitter.java @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.source.reader; + +import org.apache.doris.flink.deserialization.DorisDeserializationSchema; +import org.apache.doris.flink.source.split.DorisSourceSplitState; +import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.flink.connector.base.source.reader.RecordEmitter; +import org.apache.flink.util.Collector; + +import java.util.List; + +/** + * The {@link RecordEmitter} implementation for {@link DorisSourceReader}. + **/ +public class DorisRecordEmitter<T> + implements RecordEmitter<List, T, DorisSourceSplitState> { + + private final DorisDeserializationSchema<T> dorisDeserializationSchema; + private final OutputCollector<T> outputCollector; + + + public DorisRecordEmitter(DorisDeserializationSchema<T> dorisDeserializationSchema) { + this.dorisDeserializationSchema = dorisDeserializationSchema; + this.outputCollector = new OutputCollector<>(); + } + + + @Override + public void emitRecord(List value, SourceOutput<T> output, DorisSourceSplitState splitState) throws Exception { + outputCollector.output = output; + dorisDeserializationSchema.deserialize(value, outputCollector); + } + + private static class OutputCollector<T> implements Collector<T> { + private SourceOutput<T> output; + + @Override + public void collect(T record) { + output.collect(record); + } + + @Override + public void close() { + // do nothing + } + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceReader.java new file mode 100644 index 0000000..db63d19 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceReader.java @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.source.reader; + +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.cfg.DorisReadOptions; +import org.apache.doris.flink.source.split.DorisSourceSplit; +import org.apache.doris.flink.source.split.DorisSourceSplitState; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.reader.RecordEmitter; +import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase; + +import java.util.List; +import java.util.Map; + +/** + * A {@link SourceReader} that read records from {@link DorisSourceSplit}. + **/ +public class DorisSourceReader<T> + extends SingleThreadMultiplexSourceReaderBase<List, T, DorisSourceSplit, DorisSourceSplitState> { + + + public DorisSourceReader(DorisOptions options, + DorisReadOptions readOptions, + RecordEmitter<List, T, DorisSourceSplitState> recordEmitter, + SourceReaderContext context, + Configuration config) { + super(() -> new DorisSourceSplitReader(options, readOptions), recordEmitter, config, context); + } + + @Override + public void start() { + // we request a split only if we did not get splits during the checkpoint restore + if (getNumberOfCurrentlyAssignedSplits() == 0) { + context.sendSplitRequest(); + } + } + + @Override + protected void onSplitFinished(Map<String, DorisSourceSplitState> finishedSplitIds) { + context.sendSplitRequest(); + } + + @Override + protected DorisSourceSplitState initializedState(DorisSourceSplit split) { + return new DorisSourceSplitState(split); + } + + @Override + protected DorisSourceSplit toSplitType(String splitId, DorisSourceSplitState splitState) { + return splitState.toDorisSourceSplit(); + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java new file mode 100644 index 0000000..aadf6fd --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java @@ -0,0 +1,103 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.source.reader; + +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.cfg.DorisReadOptions; +import org.apache.doris.flink.datastream.ScalaValueReader; +import org.apache.doris.flink.source.split.DorisSourceSplit; +import org.apache.doris.flink.source.split.DorisSplitRecords; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.List; +import java.util.Queue; + +/** + * The {@link SplitReader} implementation for the doris source. + **/ +public class DorisSourceSplitReader + implements SplitReader<List, DorisSourceSplit> { + + private static final Logger LOG = LoggerFactory.getLogger(DorisSourceSplitReader.class); + + private final Queue<DorisSourceSplit> splits; + private final DorisOptions options; + private final DorisReadOptions readOptions; + private ScalaValueReader scalaValueReader; + private String currentSplitId; + + public DorisSourceSplitReader(DorisOptions options, DorisReadOptions readOptions) { + this.options = options; + this.readOptions = readOptions; + this.splits = new ArrayDeque<>(); + } + + @Override + public RecordsWithSplitIds<List> fetch() throws IOException { + checkSplitOrStartNext(); + + if (!scalaValueReader.hasNext()) { + return finishSplit(); + } + return DorisSplitRecords.forRecords(currentSplitId, scalaValueReader); + } + + private void checkSplitOrStartNext() throws IOException { + if (scalaValueReader != null) { + return; + } + final DorisSourceSplit nextSplit = splits.poll(); + if (nextSplit == null) { + throw new IOException("Cannot fetch from another split - no split remaining"); + } + currentSplitId = nextSplit.splitId(); + scalaValueReader = new ScalaValueReader(nextSplit.getPartitionDefinition(), options, readOptions); + } + + private DorisSplitRecords finishSplit() { + if (scalaValueReader != null) { + scalaValueReader.close(); + scalaValueReader = null; + } + final DorisSplitRecords finishRecords = DorisSplitRecords.finishedSplit(currentSplitId); + currentSplitId = null; + return finishRecords; + } + + @Override + public void handleSplitsChanges(SplitsChange<DorisSourceSplit> splitsChange) { + LOG.debug("Handling split change {}", splitsChange); + splits.addAll(splitsChange.splits()); + } + + @Override + public void wakeUp() { + } + + @Override + public void close() throws Exception { + if (scalaValueReader != null) { + scalaValueReader.close(); + } + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplit.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplit.java new file mode 100644 index 0000000..c6a644d --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplit.java @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.source.split; + +import org.apache.doris.flink.rest.PartitionDefinition; +import org.apache.flink.api.connector.source.SourceSplit; + +import javax.annotation.Nullable; +import java.util.Objects; + +/** + * A {@link SourceSplit} that represents a {@link PartitionDefinition}. + **/ +public class DorisSourceSplit implements SourceSplit { + + private final PartitionDefinition partitionDefinition; + + /** + * The splits are frequently serialized into checkpoints. Caching the byte representation makes + * repeated serialization cheap. This field is used by {@link DorisSourceSplitSerializer}. + */ + @Nullable + transient byte[] serializedFormCache; + + public DorisSourceSplit(PartitionDefinition partitionDefinition) { + this.partitionDefinition = partitionDefinition; + } + + @Override + public String splitId() { + return partitionDefinition.getBeAddress(); + } + + public PartitionDefinition getPartitionDefinition() { + return partitionDefinition; + } + + @Override + public String toString() { + return String.format("DorisSourceSplit: %s.%s,be=%s,tablets=%s", + partitionDefinition.getDatabase(), + partitionDefinition.getTable(), + partitionDefinition.getBeAddress(), + partitionDefinition.getTabletIds()); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DorisSourceSplit that = (DorisSourceSplit) o; + + return Objects.equals(partitionDefinition, that.partitionDefinition); + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializer.java new file mode 100644 index 0000000..d667ed6 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializer.java @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.source.split; + +import org.apache.doris.flink.rest.PartitionDefinition; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +/** + * A serializer for the {@link DorisSourceSplit}. + **/ +public class DorisSourceSplitSerializer + implements SimpleVersionedSerializer<DorisSourceSplit> { + + public static final DorisSourceSplitSerializer INSTANCE = new DorisSourceSplitSerializer(); + + private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE = + ThreadLocal.withInitial(() -> new DataOutputSerializer(64)); + + private static final int VERSION = 1; + + private static void writeLongArray(DataOutputView out, Long[] values) throws IOException { + out.writeInt(values.length); + for (Long val : values) { + out.writeLong(val); + } + } + + private static Long[] readLongArray(DataInputView in) throws IOException { + final int len = in.readInt(); + final Long[] values = new Long[len]; + for (int i = 0; i < len; i++) { + values[i] = in.readLong(); + } + return values; + } + + @Override + public int getVersion() { + return VERSION; + } + + @Override + public byte[] serialize(DorisSourceSplit split) throws IOException { + + // optimization: the splits lazily cache their own serialized form + if (split.serializedFormCache != null) { + return split.serializedFormCache; + } + + final DataOutputSerializer out = SERIALIZER_CACHE.get(); + PartitionDefinition partDef = split.getPartitionDefinition(); + out.writeUTF(partDef.getDatabase()); + out.writeUTF(partDef.getTable()); + out.writeUTF(partDef.getBeAddress()); + writeLongArray(out, partDef.getTabletIds().toArray(new Long[]{})); + out.writeUTF(partDef.getQueryPlan()); + + final byte[] result = out.getCopyOfBuffer(); + out.clear(); + + // optimization: cache the serialized from, so we avoid the byte work during repeated + // serialization + split.serializedFormCache = result; + + return result; + } + + @Override + public DorisSourceSplit deserialize(int version, byte[] serialized) throws IOException { + if (version == 1) { + return deserialize(serialized); + } + throw new IOException("Unknown version: " + version); + } + + private DorisSourceSplit deserialize(byte[] serialized) throws IOException { + final DataInputDeserializer in = new DataInputDeserializer(serialized); + final String database = in.readUTF(); + final String table = in.readUTF(); + final String beAddress = in.readUTF(); + Long[] vals = readLongArray(in); + final Set<Long> tabletIds = new HashSet<>(Arrays.asList(vals)); + final String queryPlan = in.readUTF(); + PartitionDefinition partDef = new PartitionDefinition(database, table, beAddress, tabletIds, queryPlan); + return new DorisSourceSplit(partDef); + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/DorisDeserializationSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplitState.java similarity index 67% copy from flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/DorisDeserializationSchema.java copy to flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplitState.java index 2aaec99..6369a08 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/DorisDeserializationSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSourceSplitState.java @@ -14,11 +14,20 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -package org.apache.doris.flink.deserialization; +package org.apache.doris.flink.source.split; -import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +/** + * State of the reader, essentially a mutable version of the {@link DorisSourceSplit}. + **/ +public class DorisSourceSplitState { -import java.io.Serializable; + private final DorisSourceSplit split; -public interface DorisDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> { + public DorisSourceSplitState(DorisSourceSplit split) { + this.split = split; + } + + public DorisSourceSplit toDorisSourceSplit() { + return split; + } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSplitRecords.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSplitRecords.java new file mode 100644 index 0000000..6f02446 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSplitRecords.java @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.source.split; + +import org.apache.doris.flink.datastream.ScalaValueReader; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; + +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +/** + * An implementation of {@link RecordsWithSplitIds}. + * This is essentially a slim wrapper around the {@link ScalaValueReader} that only adds + * information about the current split, or finished splits + */ +public class DorisSplitRecords implements RecordsWithSplitIds<List> { + + private final Set<String> finishedSplits; + private final ScalaValueReader scalaValueReader; + private String splitId; + + public DorisSplitRecords(String splitId, + ScalaValueReader scalaValueReader, + Set<String> finishedSplits) { + this.splitId = splitId; + this.scalaValueReader = scalaValueReader; + this.finishedSplits = finishedSplits; + } + + public static DorisSplitRecords forRecords( + final String splitId, final ScalaValueReader valueReader) { + return new DorisSplitRecords(splitId, valueReader, Collections.emptySet()); + } + + public static DorisSplitRecords finishedSplit(final String splitId) { + return new DorisSplitRecords(null, null, Collections.singleton(splitId)); + } + + @Nullable + @Override + public String nextSplit() { + // move the split one (from current value to null) + final String nextSplit = this.splitId; + this.splitId = null; + if (scalaValueReader == null || !scalaValueReader.hasNext()) { + return null; + } + return nextSplit; + } + + @Nullable + @Override + public List nextRecordFromSplit() { + if (scalaValueReader != null && scalaValueReader.hasNext()) { + List next = scalaValueReader.next(); + return next; + } + return null; + } + + @Override + public Set<String> finishedSplits() { + return finishedSplits; + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java index 76a7bbc..7d6455e 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java @@ -19,15 +19,12 @@ package org.apache.doris.flink.table; import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; -import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.connector.format.EncodingFormat; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.source.DynamicTableSource; -import org.apache.flink.table.data.RowData; import org.apache.flink.table.factories.DynamicTableSinkFactory; import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil; @@ -37,7 +34,6 @@ import org.apache.flink.table.utils.TableSchemaUtils; import java.time.Duration; import java.util.HashSet; import java.util.Map; -import java.util.Optional; import java.util.Properties; import java.util.Set; @@ -159,6 +155,12 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory .defaultValue(true) .withDescription("whether to enable the delete function"); + private static final ConfigOption<Boolean> SOURCE_USE_OLD_API = ConfigOptions + .key("source.use-old-api") + .booleanType() + .defaultValue(false) + .withDescription("Whether to read data using the new interface defined according to the FLIP-27 specification,default false"); + @Override public String factoryIdentifier() { return "doris"; // used for matching to `connector = '...'` @@ -199,6 +201,8 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory options.add(SINK_LABEL_PREFIX); options.add(SINK_BUFFER_SIZE); options.add(SINK_BUFFER_COUNT); + + options.add(SOURCE_USE_OLD_API); return options; } @@ -244,7 +248,8 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory .setRequestConnectTimeoutMs(readableConfig.get(DORIS_REQUEST_CONNECT_TIMEOUT_MS)) .setRequestReadTimeoutMs(readableConfig.get(DORIS_REQUEST_READ_TIMEOUT_MS)) .setRequestRetries(readableConfig.get(DORIS_REQUEST_RETRIES)) - .setRequestTabletSize(readableConfig.get(DORIS_TABLET_SIZE)); + .setRequestTabletSize(readableConfig.get(DORIS_TABLET_SIZE)) + .setUseOldApi(readableConfig.get(SOURCE_USE_OLD_API)); return builder.build(); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java index 689aa47..0af8ad5 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java @@ -19,9 +19,12 @@ package org.apache.doris.flink.table; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; +import org.apache.doris.flink.deserialization.RowDataDeserializationSchema; import org.apache.doris.flink.exception.DorisException; import org.apache.doris.flink.rest.PartitionDefinition; import org.apache.doris.flink.rest.RestService; +import org.apache.doris.flink.source.DorisSource; +import org.apache.doris.flink.source.DorisSourceBuilder; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.table.api.TableSchema; @@ -30,6 +33,7 @@ import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.InputFormatProvider; import org.apache.flink.table.connector.source.LookupTableSource; import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.SourceProvider; import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; import org.apache.flink.table.data.RowData; @@ -50,12 +54,14 @@ import java.util.List; */ public final class DorisDynamicTableSource implements ScanTableSource, LookupTableSource { + private static final Logger LOG = LoggerFactory.getLogger(DorisDynamicTableSource.class); private final DorisOptions options; private final DorisReadOptions readOptions; private TableSchema physicalSchema; - private static final Logger LOG = LoggerFactory.getLogger(DorisRowDataInputFormat.class); - public DorisDynamicTableSource(DorisOptions options, DorisReadOptions readOptions, TableSchema physicalSchema) { + public DorisDynamicTableSource(DorisOptions options, + DorisReadOptions readOptions, + TableSchema physicalSchema) { this.options = options; this.readOptions = readOptions; this.physicalSchema = physicalSchema; @@ -70,21 +76,31 @@ public final class DorisDynamicTableSource implements ScanTableSource, LookupTab @Override public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { - List<PartitionDefinition> dorisPartitions; - try { - dorisPartitions = RestService.findPartitions(options, readOptions, LOG); - } catch (DorisException e) { - throw new RuntimeException("Failed fetch doris partitions"); + if (readOptions.getUseOldApi()) { + List<PartitionDefinition> dorisPartitions; + try { + dorisPartitions = RestService.findPartitions(options, readOptions, LOG); + } catch (DorisException e) { + throw new RuntimeException("Failed fetch doris partitions"); + } + DorisRowDataInputFormat.Builder builder = DorisRowDataInputFormat.builder() + .setFenodes(options.getFenodes()) + .setUsername(options.getUsername()) + .setPassword(options.getPassword()) + .setTableIdentifier(options.getTableIdentifier()) + .setPartitions(dorisPartitions) + .setReadOptions(readOptions) + .setRowType((RowType) physicalSchema.toRowDataType().getLogicalType()); + return InputFormatProvider.of(builder.build()); + } else { + //Read data using the interface of the FLIP-27 specification + DorisSource<RowData> build = DorisSourceBuilder.<RowData>builder() + .setDorisReadOptions(readOptions) + .setDorisOptions(options) + .setDeserializer(new RowDataDeserializationSchema((RowType) physicalSchema.toRowDataType().getLogicalType())) + .build(); + return SourceProvider.of(build); } - DorisRowDataInputFormat.Builder builder = DorisRowDataInputFormat.builder() - .setFenodes(options.getFenodes()) - .setUsername(options.getUsername()) - .setPassword(options.getPassword()) - .setTableIdentifier(options.getTableIdentifier()) - .setPartitions(dorisPartitions) - .setReadOptions(readOptions) - .setRowType((RowType) physicalSchema.toRowDataType().getLogicalType()); - return InputFormatProvider.of(builder.build()); } @Override diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java index be1e13d..fbcaeea 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java @@ -19,6 +19,7 @@ package org.apache.doris.flink.table; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.datastream.ScalaValueReader; +import org.apache.doris.flink.deserialization.converter.DorisRowConverter; import org.apache.doris.flink.rest.PartitionDefinition; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.io.DefaultInputSplitAssigner; @@ -30,12 +31,13 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.table.data.DecimalData; -import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.math.BigDecimal; @@ -43,9 +45,6 @@ import java.sql.PreparedStatement; import java.util.ArrayList; import java.util.List; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * InputFormat for {@link DorisDynamicTableSource}. */ @@ -63,13 +62,16 @@ public class DorisRowDataInputFormat extends RichInputFormat<RowData, DorisTable private ScalaValueReader scalaValueReader; private transient boolean hasNext; - private RowType rowType; + private final DorisRowConverter rowConverter; - public DorisRowDataInputFormat(DorisOptions options, List<PartitionDefinition> dorisPartitions, DorisReadOptions readOptions, RowType rowType) { + public DorisRowDataInputFormat(DorisOptions options, + List<PartitionDefinition> dorisPartitions, + DorisReadOptions readOptions, + RowType rowType) { this.options = options; this.dorisPartitions = dorisPartitions; this.readOptions = readOptions; - this.rowType = rowType; + this.rowConverter = new DorisRowConverter(rowType); } @Override @@ -146,11 +148,7 @@ public class DorisRowDataInputFormat extends RichInputFormat<RowData, DorisTable return null; } List next = (List) scalaValueReader.next(); - GenericRowData genericRowData = new GenericRowData(rowType.getFieldCount()); - for (int i = 0; i < next.size() && i < rowType.getFieldCount(); i++) { - Object value = deserialize(rowType.getTypeAt(i), next.get(i)); - genericRowData.setField(i, value); - } + RowData genericRowData = rowConverter.convert(next); //update hasNext after we've read the record hasNext = scalaValueReader.hasNext(); return genericRowData; diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/RowDataDeserializationSchemaTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/RowDataDeserializationSchemaTest.java new file mode 100644 index 0000000..cf22009 --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/RowDataDeserializationSchemaTest.java @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.deserialization; + +import org.apache.flink.table.data.RowData; +import org.apache.flink.util.Collector; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.doris.flink.utils.FactoryMocks.PHYSICAL_TYPE; +import static org.junit.Assert.assertEquals; + +public class RowDataDeserializationSchemaTest { + + @Test + public void deserializeTest() throws Exception { + List<String> records = Arrays.asList("flink","doris"); + SimpleCollector collector = new SimpleCollector(); + RowDataDeserializationSchema deserializationSchema = new RowDataDeserializationSchema(PHYSICAL_TYPE); + for(String record : records){ + deserializationSchema.deserialize(Arrays.asList(record),collector); + } + + List<String> expected = + Arrays.asList( + "+I(flink)", + "+I(doris)"); + + List<String> actual = + collector.list.stream().map(Object::toString).collect(Collectors.toList()); + assertEquals(expected, actual); + } + + private static class SimpleCollector implements Collector<RowData> { + private List<RowData> list = new ArrayList<>(); + + @Override + public void collect(RowData record) { + list.add(record); + } + + @Override + public void close() { + // do nothing + } + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java new file mode 100644 index 0000000..3489399 --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.deserialization.convert; + +import org.apache.doris.flink.deserialization.converter.DorisRowConverter; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.types.logical.RowType; +import org.junit.Assert; +import org.junit.Test; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.List; + +public class DorisRowConverterTest implements Serializable { + + @Test + public void testConvert(){ + ResolvedSchema SCHEMA = + ResolvedSchema.of( + Column.physical("f1", DataTypes.NULL()), + Column.physical("f2", DataTypes.BOOLEAN()), + Column.physical("f3", DataTypes.FLOAT()), + Column.physical("f4", DataTypes.DOUBLE()), + Column.physical("f5", DataTypes.INTERVAL(DataTypes.YEAR())), + Column.physical("f6", DataTypes.INTERVAL(DataTypes.DAY())), + Column.physical("f7", DataTypes.TINYINT()), + Column.physical("f8", DataTypes.SMALLINT()), + Column.physical("f9", DataTypes.INT()), + Column.physical("f10", DataTypes.BIGINT()), + Column.physical("f11", DataTypes.DECIMAL(10,2)), + Column.physical("f12", DataTypes.TIMESTAMP_WITH_TIME_ZONE()), + Column.physical("f13", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()), + Column.physical("f14", DataTypes.DATE()), + Column.physical("f15", DataTypes.CHAR(1)), + Column.physical("f16", DataTypes.VARCHAR(256))); + + DorisRowConverter converter = new DorisRowConverter((RowType) SCHEMA.toPhysicalRowDataType().getLogicalType()); + + List record = Arrays.asList(null,"true",1.2,1.2345,24,10,1,32,64,128, BigDecimal.valueOf(10.123),"2021-01-01 08:00:00","2021-01-01 08:00:00","2021-01-01","a","doris"); + GenericRowData rowData = converter.convert(record); + Assert.assertEquals("+I(null,true,1.2,1.2345,24,10,1,32,64,128,10.12,2021-01-01 08:00:00,2021-01-01 08:00:00,2021-01-01,a,doris)",rowData.toString()); + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/OptionUtils.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/OptionUtils.java index 5984d91..8e08071 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/OptionUtils.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/OptionUtils.java @@ -19,7 +19,10 @@ package org.apache.doris.flink.sink; import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; +import org.apache.doris.flink.rest.PartitionDefinition; +import java.util.Arrays; +import java.util.HashSet; import java.util.Properties; /** @@ -35,19 +38,20 @@ public class OptionUtils { DorisExecutionOptions.Builder builder = DorisExecutionOptions.builder(); builder.setLabelPrefix("doris") .setStreamLoadProp(properties) - .setBufferSize(8*1024) + .setBufferSize(8 * 1024) .setBufferCount(3) .setDeletable(true) .setCheckInterval(100) .setMaxRetries(2); return builder.build(); } + public static DorisExecutionOptions buildExecutionOptional(Properties properties) { DorisExecutionOptions.Builder builder = DorisExecutionOptions.builder(); builder.setLabelPrefix("doris") .setStreamLoadProp(properties) - .setBufferSize(8*1024) + .setBufferSize(8 * 1024) .setBufferCount(3) .setDeletable(true) .setCheckInterval(100) @@ -56,6 +60,10 @@ public class OptionUtils { } public static DorisReadOptions buildDorisReadOptions() { + return dorisReadOptionsBuilder().build(); + } + + public static DorisReadOptions.Builder dorisReadOptionsBuilder() { DorisReadOptions.Builder builder = DorisReadOptions.builder(); builder.setDeserializeArrowAsync(false) .setDeserializeQueueSize(64) @@ -66,15 +74,20 @@ public class OptionUtils { .setRequestReadTimeoutMs(10000) .setRequestRetries(3) .setRequestTabletSize(1024 * 1024); - return builder.build(); + return builder; } public static DorisOptions buildDorisOptions() { DorisOptions.Builder builder = DorisOptions.builder(); - builder.setFenodes("local:8040") - .setTableIdentifier("db_test.table_test") - .setUsername("u_test") - .setPassword("p_test"); + builder.setFenodes("127.0.0.1:8030") + .setTableIdentifier("db.table") + .setUsername("root") + .setPassword(""); return builder.build(); } + + public static PartitionDefinition buildPartitionDef() { + HashSet<Long> tabletIds = new HashSet<>(Arrays.asList(100L)); + return new PartitionDefinition("db", "table", "127.0.0.1:9060", tabletIds, ""); + } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceExampleTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceExampleTest.java new file mode 100644 index 0000000..aa5eb59 --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceExampleTest.java @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.source; + +import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema; +import org.apache.doris.flink.sink.OptionUtils; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; +import org.junit.Test; + +import java.util.List; + +/** + * Example Tests for the {@link DorisSource}. + **/ +public class DorisSourceExampleTest { + + @Test + public void testBoundedDorisSource() throws Exception { + DorisSource<List<?>> dorisSource = DorisSourceBuilder.<List<?>>builder() + .setDorisOptions(OptionUtils.buildDorisOptions()) + .setDorisReadOptions(OptionUtils.buildDorisReadOptions()) + .setDeserializer(new SimpleListDeserializationSchema()) + .build(); + + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris Source") + .addSink(new PrintSinkFunction<>()); + env.execute("Flink doris source test"); + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/enumerator/PendingSplitsCheckpointSerializerTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/enumerator/PendingSplitsCheckpointSerializerTest.java new file mode 100644 index 0000000..36cb4f9 --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/enumerator/PendingSplitsCheckpointSerializerTest.java @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.source.enumerator; + +import org.apache.doris.flink.sink.OptionUtils; +import org.apache.doris.flink.source.split.DorisSourceSplit; +import org.apache.doris.flink.source.split.DorisSourceSplitSerializer; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; + +/** + * Unit tests for the {@link PendingSplitsCheckpointSerializer}. + */ +public class PendingSplitsCheckpointSerializerTest { + + private static void assertCheckpointsEqual( + final PendingSplitsCheckpoint expected, + final PendingSplitsCheckpoint actual) { + Assert.assertEquals(expected.getSplits(), actual.getSplits()); + } + + @Test + public void serializeSplit() throws Exception { + final DorisSourceSplit split = + new DorisSourceSplit(OptionUtils.buildPartitionDef()); + PendingSplitsCheckpoint checkpoint = new PendingSplitsCheckpoint(Arrays.asList(split)); + + final PendingSplitsCheckpointSerializer splitSerializer = new PendingSplitsCheckpointSerializer(DorisSourceSplitSerializer.INSTANCE); + byte[] serialized = splitSerializer.serialize(checkpoint); + PendingSplitsCheckpoint deserialize = splitSerializer.deserialize(splitSerializer.getVersion(), serialized); + + assertCheckpointsEqual(checkpoint, deserialize); + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/DorisSourceReaderTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/DorisSourceReaderTest.java new file mode 100644 index 0000000..a44b96d --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/DorisSourceReaderTest.java @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.source.reader; + +import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema; +import org.apache.doris.flink.sink.OptionUtils; +import org.apache.doris.flink.source.split.DorisSourceSplit; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; + +import static org.junit.Assert.assertEquals; + +/** + * Unit tests for the {@link DorisSourceReader}. + */ +public class DorisSourceReaderTest { + + private static DorisSourceReader createReader(TestingReaderContext context) { + return new DorisSourceReader<>( + OptionUtils.buildDorisOptions(), + OptionUtils.buildDorisReadOptions(), + new DorisRecordEmitter<>(new SimpleListDeserializationSchema()), + context, + context.getConfiguration() + ); + } + + private static DorisSourceSplit createTestDorisSplit() throws IOException { + return new DorisSourceSplit(OptionUtils.buildPartitionDef()); + } + + @Test + public void testRequestSplitWhenNoSplitRestored() throws Exception { + final TestingReaderContext context = new TestingReaderContext(); + final DorisSourceReader reader = createReader(context); + + reader.start(); + reader.close(); + assertEquals(1, context.getNumSplitRequests()); + } + + @Test + public void testNoSplitRequestWhenSplitRestored() throws Exception { + final TestingReaderContext context = new TestingReaderContext(); + final DorisSourceReader reader = createReader(context); + + reader.addSplits(Collections.singletonList(createTestDorisSplit())); + reader.start(); + reader.close(); + + assertEquals(0, context.getNumSplitRequests()); + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/TestingReaderContext.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/TestingReaderContext.java new file mode 100644 index 0000000..253f2ea --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/TestingReaderContext.java @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.source.reader; + +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.groups.SourceReaderMetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.util.SimpleUserCodeClassLoader; +import org.apache.flink.util.UserCodeClassLoader; + +import java.util.ArrayList; +import java.util.List; + +/** + * A testing implementation of the {@link SourceReaderContext}. + */ +public class TestingReaderContext implements SourceReaderContext { + + private final SourceReaderMetricGroup metrics; + + private final Configuration config; + + private final ArrayList<SourceEvent> sentEvents = new ArrayList<>(); + + private int numSplitRequests; + + public TestingReaderContext() { + this(new Configuration(), UnregisteredMetricsGroup.createSourceReaderMetricGroup()); + } + + public TestingReaderContext(Configuration config, SourceReaderMetricGroup metricGroup) { + this.config = config; + this.metrics = metricGroup; + } + + @Override + public SourceReaderMetricGroup metricGroup() { + return metrics; + } + + @Override + public Configuration getConfiguration() { + return config; + } + + @Override + public String getLocalHostName() { + return "localhost"; + } + + @Override + public int getIndexOfSubtask() { + return 0; + } + + @Override + public void sendSplitRequest() { + numSplitRequests++; + } + + @Override + public void sendSourceEventToCoordinator(SourceEvent sourceEvent) { + sentEvents.add(sourceEvent); + } + + @Override + public UserCodeClassLoader getUserCodeClassLoader() { + return SimpleUserCodeClassLoader.create(getClass().getClassLoader()); + } + + public int getNumSplitRequests() { + return numSplitRequests; + } + + public List<SourceEvent> getSentEvents() { + return new ArrayList<>(sentEvents); + } + + public void clearSentEvents() { + sentEvents.clear(); + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializerTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializerTest.java new file mode 100644 index 0000000..b116539 --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/split/DorisSourceSplitSerializerTest.java @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.source.split; + +import org.apache.doris.flink.sink.OptionUtils; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Unit tests for the {@link DorisSourceSplitSerializer}. + */ +public class DorisSourceSplitSerializerTest { + + @Test + public void serializeSplit() throws Exception { + final DorisSourceSplit split = + new DorisSourceSplit(OptionUtils.buildPartitionDef()); + + DorisSourceSplit deSerialized = serializeAndDeserializeSplit(split); + assertEquals(split, deSerialized); + } + + private DorisSourceSplit serializeAndDeserializeSplit(DorisSourceSplit split) throws Exception { + final DorisSourceSplitSerializer splitSerializer = new DorisSourceSplitSerializer(); + byte[] serialized = splitSerializer.serialize(split); + return splitSerializer.deserialize(splitSerializer.getVersion(), serialized); + } + +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/split/DorisSplitRecordsTest.java similarity index 62% copy from flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java copy to flink-doris-connector/src/test/java/org/apache/doris/flink/source/split/DorisSplitRecordsTest.java index d9ec6e5..4873cb2 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/split/DorisSplitRecordsTest.java @@ -14,20 +14,25 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -package org.apache.doris.flink.deserialization; +package org.apache.doris.flink.source.split; +import org.junit.Test; -import org.apache.flink.api.common.typeinfo.TypeHint; -import org.apache.flink.api.common.typeinfo.TypeInformation; +import java.util.Collections; -import java.util.List; +import static org.junit.Assert.assertEquals; +/** + * Unit tests for the {@link DorisSplitRecords} class. + */ +public class DorisSplitRecordsTest { -public class SimpleListDeserializationSchema implements DorisDeserializationSchema<List<?>> { + @Test + public void testEmptySplits() { + final String split = "empty"; + final DorisSplitRecords records = DorisSplitRecords.finishedSplit(split); - @Override - public TypeInformation<List<?>> getProducedType() { - return TypeInformation.of(new TypeHint<List<?>>() { - }); + assertEquals(Collections.singleton(split), records.finishedSplits()); } + } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableSourceTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableSourceTest.java new file mode 100644 index 0000000..533d56b --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableSourceTest.java @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.table; + +import org.apache.doris.flink.cfg.DorisReadOptions; +import org.apache.doris.flink.sink.OptionUtils; +import org.apache.doris.flink.source.DorisSource; +import org.apache.doris.flink.source.enumerator.PendingSplitsCheckpoint; +import org.apache.doris.flink.source.split.DorisSourceSplit; +import org.apache.doris.flink.utils.FactoryMocks; +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.connector.source.InputFormatProvider; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.SourceProvider; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext; +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; + +public class DorisDynamicTableSourceTest { + + @Test + public void testDorisUseNewApi() { + DorisReadOptions.Builder builder = OptionUtils.dorisReadOptionsBuilder(); + builder.setUseOldApi(false); + final DorisDynamicTableSource actualDorisSource = new DorisDynamicTableSource(OptionUtils.buildDorisOptions(), builder.build(), TableSchema.fromResolvedSchema(FactoryMocks.SCHEMA)); + ScanTableSource.ScanRuntimeProvider provider = + actualDorisSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); + assertDorisSource(provider); + } + + @Test + public void testDorisUseNewApiDefault() { + final DorisDynamicTableSource actualDorisSource = new DorisDynamicTableSource(OptionUtils.buildDorisOptions(), OptionUtils.buildDorisReadOptions(), TableSchema.fromResolvedSchema(FactoryMocks.SCHEMA)); + ScanTableSource.ScanRuntimeProvider provider = + actualDorisSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); + assertDorisSource(provider); + } + + @Test + public void testDorisUseOldApi() { + DorisReadOptions.Builder builder = OptionUtils.dorisReadOptionsBuilder(); + builder.setUseOldApi(true); + final DorisDynamicTableSource actualDorisSource = new DorisDynamicTableSource(OptionUtils.buildDorisOptions(), builder.build(), TableSchema.fromResolvedSchema(FactoryMocks.SCHEMA)); + ScanTableSource.ScanRuntimeProvider provider = + actualDorisSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); + assertDorisInputFormat(provider); + } + + + private void assertDorisInputFormat(ScanTableSource.ScanRuntimeProvider provider) { + assertThat(provider, instanceOf(InputFormatProvider.class)); + final InputFormatProvider inputFormatProvider = (InputFormatProvider) provider; + + InputFormat<RowData, DorisTableInputSplit> inputFormat = (InputFormat<RowData, DorisTableInputSplit>) inputFormatProvider.createInputFormat(); + assertThat(inputFormat, instanceOf(DorisRowDataInputFormat.class)); + } + + + private void assertDorisSource(ScanTableSource.ScanRuntimeProvider provider) { + assertThat(provider, instanceOf(SourceProvider.class)); + final SourceProvider sourceProvider = (SourceProvider) provider; + + Source<RowData, DorisSourceSplit, PendingSplitsCheckpoint> source = + (Source<RowData, DorisSourceSplit, PendingSplitsCheckpoint>) sourceProvider.createSource(); + assertThat(source, instanceOf(DorisSource.class)); + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/FactoryMocks.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/FactoryMocks.java new file mode 100644 index 0000000..28bbd44 --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/FactoryMocks.java @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.utils; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; + +/** Utilities for testing instances usually created by {@link FactoryUtil}. */ +public final class FactoryMocks { + + public static final ResolvedSchema SCHEMA = + ResolvedSchema.of( + Column.physical("a", DataTypes.STRING()), + Column.physical("b", DataTypes.INT()), + Column.physical("c", DataTypes.BOOLEAN())); + + public static final DataType PHYSICAL_DATA_TYPE = SCHEMA.toPhysicalRowDataType(); + + public static final RowType PHYSICAL_TYPE = (RowType) PHYSICAL_DATA_TYPE.getLogicalType(); + + public static final ObjectIdentifier IDENTIFIER = + ObjectIdentifier.of("default", "default", "t1"); + + private FactoryMocks() { + // no instantiation + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org