This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 3ec3fe8 [Feature] Support flink table lookup join (#61) 3ec3fe8 is described below commit 3ec3fe8bf1d1e7193fcb2687d57967ee4714b4ac Author: wudi <676366...@qq.com> AuthorDate: Thu Sep 15 14:08:41 2022 +0800 [Feature] Support flink table lookup join (#61) * add flink table lookup join Co-authored-by: wudi <> --- .../doris/flink/cfg/DorisExecutionOptions.java | 2 +- .../apache/doris/flink/cfg/DorisLookupOptions.java | 80 +++++++++++ .../org/apache/doris/flink/cfg/DorisOptions.java | 2 +- .../doris/flink/table/DorisConfigOptions.java | 30 +++- .../flink/table/DorisDynamicTableFactory.java | 16 +++ .../doris/flink/table/DorisDynamicTableSource.java | 34 ++++- .../flink/table/DorisRowDataLookupFunction.java | 160 +++++++++++++++++++++ .../table/DorisRowDataLookupFunctionTest.java | 155 ++++++++++++++++++++ 8 files changed, 471 insertions(+), 8 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java index 102a7ee..36c577a 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java @@ -23,7 +23,7 @@ import java.io.Serializable; import java.util.Properties; /** - * JDBC sink batch options. + * Doris sink batch options. */ public class DorisExecutionOptions implements Serializable { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisLookupOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisLookupOptions.java new file mode 100644 index 0000000..f0f7eb4 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisLookupOptions.java @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.cfg; + +import java.io.Serializable; + +public class DorisLookupOptions implements Serializable { + + private final long cacheMaxSize; + private final long cacheExpireMs; + private final int maxRetryTimes; + + public DorisLookupOptions( + long cacheMaxSize, long cacheExpireMs, int maxRetryTimes) { + this.cacheMaxSize = cacheMaxSize; + this.cacheExpireMs = cacheExpireMs; + this.maxRetryTimes = maxRetryTimes; + } + + public long getCacheMaxSize() { + return cacheMaxSize; + } + + public long getCacheExpireMs() { + return cacheExpireMs; + } + + public int getMaxRetryTimes() { + return maxRetryTimes; + } + + public static Builder builder() { + return new Builder(); + } + + /** Builder of {@link DorisLookupOptions}. */ + public static class Builder { + private long cacheMaxSize = -1L; + private long cacheExpireMs = -1L; + private int maxRetryTimes = 3; + + /** optional, lookup cache max size, over this value, the old data will be eliminated. */ + public Builder setCacheMaxSize(long cacheMaxSize) { + this.cacheMaxSize = cacheMaxSize; + return this; + } + + /** optional, lookup cache expire mills, over this time, the old data will expire. */ + public Builder setCacheExpireMs(long cacheExpireMs) { + this.cacheExpireMs = cacheExpireMs; + return this; + } + + /** optional, max retry times. */ + public Builder setMaxRetryTimes(int maxRetryTimes) { + this.maxRetryTimes = maxRetryTimes; + return this; + } + + public DorisLookupOptions build() { + return new DorisLookupOptions( + cacheMaxSize, cacheExpireMs, maxRetryTimes); + } + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java index 512d0ab..7d6963b 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java @@ -84,7 +84,7 @@ public class DorisOptions extends DorisConnectionOptions { } /** - * required, JDBC DB url. + * required, Frontend Http Rest url. */ public Builder setFenodes(String fenodes) { this.fenodes = fenodes; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java index 5b56342..02a4d22 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java @@ -20,6 +20,8 @@ package org.apache.doris.flink.table; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; +import java.time.Duration; + import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT; import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT; import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT; @@ -35,9 +37,9 @@ public class DorisConfigOptions { public static final String IDENTIFIER = "doris"; // common option public static final ConfigOption<String> FENODES = ConfigOptions.key("fenodes").stringType().noDefaultValue().withDescription("doris fe http address."); - public static final ConfigOption<String> TABLE_IDENTIFIER = ConfigOptions.key("table.identifier").stringType().noDefaultValue().withDescription("the jdbc table name."); - public static final ConfigOption<String> USERNAME = ConfigOptions.key("username").stringType().noDefaultValue().withDescription("the jdbc user name."); - public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password").stringType().noDefaultValue().withDescription("the jdbc password."); + public static final ConfigOption<String> TABLE_IDENTIFIER = ConfigOptions.key("table.identifier").stringType().noDefaultValue().withDescription("the doris table name."); + public static final ConfigOption<String> USERNAME = ConfigOptions.key("username").stringType().noDefaultValue().withDescription("the doris user name."); + public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password").stringType().noDefaultValue().withDescription("the doris password."); // source config options public static final ConfigOption<String> DORIS_READ_FIELD = ConfigOptions @@ -101,6 +103,28 @@ public class DorisConfigOptions { .defaultValue(false) .withDescription("Whether to read data using the new interface defined according to the FLIP-27 specification,default false"); + // Lookup options + public static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS = + ConfigOptions.key("lookup.cache.max-rows") + .longType() + .defaultValue(-1L) + .withDescription( + "The max number of rows of lookup cache, over this value, the oldest rows will " + + "be eliminated. \"cache.max-rows\" and \"cache.ttl\" options must all be specified if any of them is " + + "specified."); + + public static final ConfigOption<Duration> LOOKUP_CACHE_TTL = + ConfigOptions.key("lookup.cache.ttl") + .durationType() + .defaultValue(Duration.ofSeconds(10)) + .withDescription("The cache time to live."); + + public static final ConfigOption<Integer> LOOKUP_MAX_RETRIES = + ConfigOptions.key("lookup.max-retries") + .intType() + .defaultValue(3) + .withDescription("The max retry times if lookup database failed."); + // sink config options public static final ConfigOption<Boolean> SINK_ENABLE_2PC = ConfigOptions .key("sink.enable-2pc") diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java index 81ee23c..5a11605 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java @@ -17,6 +17,7 @@ package org.apache.doris.flink.table; import org.apache.doris.flink.cfg.DorisExecutionOptions; +import org.apache.doris.flink.cfg.DorisLookupOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.flink.configuration.ConfigOption; @@ -48,6 +49,9 @@ import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_RETR import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_TABLET_SIZE; import static org.apache.doris.flink.table.DorisConfigOptions.FENODES; import static org.apache.doris.flink.table.DorisConfigOptions.IDENTIFIER; +import static org.apache.doris.flink.table.DorisConfigOptions.LOOKUP_CACHE_MAX_ROWS; +import static org.apache.doris.flink.table.DorisConfigOptions.LOOKUP_CACHE_TTL; +import static org.apache.doris.flink.table.DorisConfigOptions.LOOKUP_MAX_RETRIES; import static org.apache.doris.flink.table.DorisConfigOptions.PASSWORD; import static org.apache.doris.flink.table.DorisConfigOptions.SINK_BUFFER_COUNT; import static org.apache.doris.flink.table.DorisConfigOptions.SINK_BUFFER_SIZE; @@ -102,6 +106,9 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory options.add(DORIS_DESERIALIZE_QUEUE_SIZE); options.add(DORIS_BATCH_SIZE); options.add(DORIS_EXEC_MEM_LIMIT); + options.add(LOOKUP_CACHE_MAX_ROWS); + options.add(LOOKUP_CACHE_TTL); + options.add(LOOKUP_MAX_RETRIES); options.add(SINK_CHECK_INTERVAL); options.add(SINK_ENABLE_2PC); @@ -131,6 +138,7 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory return new DorisDynamicTableSource( getDorisOptions(helper.getOptions()), getDorisReadOptions(helper.getOptions()), + getDorisLookupOptions(helper.getOptions()), physicalSchema); } @@ -189,6 +197,14 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory return streamLoadProp; } + private DorisLookupOptions getDorisLookupOptions(ReadableConfig readableConfig){ + final DorisLookupOptions.Builder builder = DorisLookupOptions.builder(); + builder.setCacheExpireMs(readableConfig.get(LOOKUP_CACHE_TTL).toMillis()); + builder.setCacheMaxSize(readableConfig.get(LOOKUP_CACHE_MAX_ROWS)); + builder.setMaxRetryTimes(readableConfig.get(LOOKUP_MAX_RETRIES)); + return builder.build(); + } + @Override public DynamicTableSink createDynamicTableSink(Context context) { final FactoryUtil.TableFactoryHelper helper = diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java index 20ce4f6..c5e47fa 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java @@ -17,6 +17,7 @@ package org.apache.doris.flink.table; +import org.apache.doris.flink.cfg.DorisLookupOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.deserialization.RowDataDeserializationSchema; @@ -34,9 +35,11 @@ import org.apache.flink.table.connector.source.InputFormatProvider; import org.apache.flink.table.connector.source.LookupTableSource; import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.connector.source.SourceProvider; +import org.apache.flink.table.connector.source.TableFunctionProvider; import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,8 +61,19 @@ public final class DorisDynamicTableSource implements ScanTableSource, LookupTab private static final Logger LOG = LoggerFactory.getLogger(DorisDynamicTableSource.class); private final DorisOptions options; private final DorisReadOptions readOptions; + private DorisLookupOptions lookupOptions; private TableSchema physicalSchema; + public DorisDynamicTableSource(DorisOptions options, + DorisReadOptions readOptions, + DorisLookupOptions lookupOptions, + TableSchema physicalSchema) { + this.options = options; + this.lookupOptions = lookupOptions; + this.readOptions = readOptions; + this.physicalSchema = physicalSchema; + } + public DorisDynamicTableSource(DorisOptions options, DorisReadOptions readOptions, TableSchema physicalSchema) { @@ -109,13 +123,27 @@ public final class DorisDynamicTableSource implements ScanTableSource, LookupTab } @Override - public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext) { - return null; + public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { + DataType physicalRowDataType = physicalSchema.toRowDataType(); + String[] keyNames = new String[context.getKeys().length]; + for (int i = 0; i < keyNames.length; i++) { + int[] innerKeyArr = context.getKeys()[i]; + keyNames[i] = DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]); + } + + return TableFunctionProvider.of( + new DorisRowDataLookupFunction( + options, + readOptions, + lookupOptions, + DataType.getFieldNames(physicalRowDataType).toArray(new String[0]), + DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0]), + keyNames)); } @Override public DynamicTableSource copy() { - return new DorisDynamicTableSource(options, readOptions, physicalSchema); + return new DorisDynamicTableSource(options, readOptions, lookupOptions, physicalSchema); } @Override diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataLookupFunction.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataLookupFunction.java new file mode 100644 index 0000000..d6e0edd --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataLookupFunction.java @@ -0,0 +1,160 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.table; + +import org.apache.doris.flink.cfg.DorisLookupOptions; +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.cfg.DorisReadOptions; +import org.apache.doris.flink.deserialization.converter.DorisRowConverter; +import org.apache.doris.flink.exception.DorisException; +import org.apache.doris.flink.rest.PartitionDefinition; +import org.apache.doris.flink.rest.RestService; +import org.apache.doris.flink.source.reader.DorisValueReader; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.shaded.guava30.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.table.types.DataType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.StringJoiner; +import java.util.concurrent.TimeUnit; + +public class DorisRowDataLookupFunction extends TableFunction<RowData> { + private static final Logger logger = LoggerFactory.getLogger(DorisRowDataLookupFunction.class); + + private final DorisOptions options; + private final DorisReadOptions readOptions; + private final String[] selectFields; + private final String[] conditionFields; + + private final long cacheMaxSize; + private final long cacheExpireMs; + private final int maxRetryTimes; + + private final DorisRowConverter rowConverter; + private transient Cache<RowData, List<RowData>> cache; + + public DorisRowDataLookupFunction(DorisOptions options, + DorisReadOptions readOptions, + DorisLookupOptions lookupOptions, + String[] selectFields, + DataType[] fieldTypes, + String[] conditionFields) { + this.options = options; + this.readOptions = readOptions; + this.selectFields = selectFields; + this.conditionFields = conditionFields; + this.cacheMaxSize = lookupOptions.getCacheMaxSize(); + this.cacheExpireMs = lookupOptions.getCacheExpireMs(); + this.maxRetryTimes = lookupOptions.getMaxRetryTimes(); + this.rowConverter = new DorisRowConverter(fieldTypes); + } + + @Override + public void open(FunctionContext context) throws Exception { + super.open(context); + this.cache = cacheMaxSize == -1 || cacheExpireMs == -1 + ? null + : CacheBuilder.newBuilder() + .expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS) + .maximumSize(cacheMaxSize) + .build(); + } + + /** + * This is a lookup method which is called by Flink framework in runtime. + * + * @param keys lookup keys + */ + public void eval(Object... keys) { + RowData keyRow = GenericRowData.of(keys); + if (cache != null) { + List<RowData> cachedRows = cache.getIfPresent(keyRow); + if (cachedRows != null) { + for (RowData cachedRow : cachedRows) { + collect(cachedRow); + } + return; + } + } + + List<PartitionDefinition> partitions = getPartitions(keys); + for (int retry = 0; retry <= maxRetryTimes; retry++) { + try { + ArrayList<RowData> rows = new ArrayList<>(); + for (PartitionDefinition part : partitions) { + try (DorisValueReader valueReader = new DorisValueReader(part, options, readOptions)) { + while (valueReader.hasNext()) { + List<?> record = valueReader.next(); + GenericRowData rowData = rowConverter.convertInternal(record); + rows.add(rowData); + collect(rowData); + } + } + } + if(cache != null){ + rows.trimToSize(); + cache.put(keyRow, rows); + } + break; + } catch (Exception ex) { + logger.error(String.format("Read Doris error, retry times = %d", retry), ex); + if (retry >= maxRetryTimes) { + throw new RuntimeException("Read Doris failed.", ex); + } + try { + Thread.sleep(1000 * retry); + } catch (InterruptedException e1) { + throw new RuntimeException(e1); + } + } + } + } + + private List<PartitionDefinition> getPartitions(Object... keys) { + readOptions.setReadFields((String.join(",", selectFields))); + StringJoiner filter = new StringJoiner(" AND "); + for (int i = 0; i < keys.length && i < conditionFields.length; i++) { + filter.add(String.format("%s = '%s'", conditionFields[i], keys[i])); + } + readOptions.setFilterQuery(filter.toString()); + try { + return RestService.findPartitions(options, readOptions, logger); + } catch (DorisException ex) { + logger.error("Failed fetch doris partitions"); + return new ArrayList<>(); + } + } + + @Override + public void close() throws Exception { + super.close(); + } + + @VisibleForTesting + public Cache<RowData, List<RowData>> getCache() { + return cache; + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisRowDataLookupFunctionTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisRowDataLookupFunctionTest.java new file mode 100644 index 0000000..cef9e10 --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisRowDataLookupFunctionTest.java @@ -0,0 +1,155 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.table; + +import org.apache.doris.flink.cfg.DorisLookupOptions; +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.cfg.DorisReadOptions; +import org.apache.flink.shaded.guava30.com.google.common.cache.Cache; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.Collector; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; + +@Ignore +public class DorisRowDataLookupFunctionTest { + + private static final String TEST_FENODES = "127.0.0.1:8030"; + private static final String LOOKUP_TABLE = "test.t_lookup_table"; + + private static String[] fieldNames = new String[] {"id1", "id2", "c_string", "c_double"}; + private static DataType[] fieldDataTypes = + new DataType[] { + DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.DOUBLE() + }; + private static String[] lookupKeys = new String[] {"id1", "id2"}; + + @Test + public void testEval() throws Exception { + + DorisLookupOptions lookupOptions = DorisLookupOptions.builder().build(); + DorisRowDataLookupFunction lookupFunction = buildRowDataLookupFunction(lookupOptions); + + ListOutputCollector collector = new ListOutputCollector(); + lookupFunction.setCollector(collector); + + lookupFunction.open(null); + + lookupFunction.eval(1, StringData.fromString("A")); + lookupFunction.eval(2, StringData.fromString("B")); + + List<String> result = + new ArrayList<>(collector.getOutputs()) + .stream().map(RowData::toString).sorted().collect(Collectors.toList()); + + List<String> expected = new ArrayList<>(); + expected.add("+I(1,A,zhangsanA,1.12)"); + expected.add("+I(1,A,zhangsanA-1,11.12)"); + expected.add("+I(2,B,zhangsanB,2.12)"); + Collections.sort(expected); + + assertEquals(expected, result); + } + + @Test + public void testEvalWithCache() throws Exception { + long cacheExpireMs = 10000; + DorisLookupOptions lookupOptions = + DorisLookupOptions.builder() + .setCacheExpireMs(cacheExpireMs) + .setCacheMaxSize(10) + .build(); + + DorisRowDataLookupFunction lookupFunction = buildRowDataLookupFunction(lookupOptions); + + ListOutputCollector collector = new ListOutputCollector(); + lookupFunction.setCollector(collector); + + lookupFunction.open(null); + + lookupFunction.eval(4, StringData.fromString("D")); + lookupFunction.eval(5, StringData.fromString("5")); + RowData keyRow = GenericRowData.of(4, StringData.fromString("D")); + RowData keyRowNoExist = GenericRowData.of(5, StringData.fromString("5")); + Cache<RowData, List<RowData>> cache = lookupFunction.getCache(); + // empty data should cache + assertEquals(cache.getIfPresent(keyRow), + Arrays.asList(GenericRowData.of( + 4, + StringData.fromString("D"), + StringData.fromString("zhangsanD"), + 4.12))); + assertEquals(cache.getIfPresent(keyRowNoExist), Collections.<RowData>emptyList()); + + //cache data expire + Thread.sleep(cacheExpireMs); + assert cache.getIfPresent(keyRow) == null; + } + + + private DorisRowDataLookupFunction buildRowDataLookupFunction(DorisLookupOptions lookupOptions) { + DorisOptions dorisOptions = DorisOptions.builder().setFenodes(TEST_FENODES) + .setTableIdentifier(LOOKUP_TABLE) + .setUsername("root") + .setPassword("") + .build(); + + DorisReadOptions readOptions = DorisReadOptions.builder().build(); + + DorisRowDataLookupFunction lookupFunction = + new DorisRowDataLookupFunction( + dorisOptions, + readOptions, + lookupOptions, + fieldNames, + fieldDataTypes, + lookupKeys); + + return lookupFunction; + } + + private static final class ListOutputCollector implements Collector<RowData> { + + private final List<RowData> output = new ArrayList<>(); + + @Override + public void collect(RowData row) { + this.output.add(row); + } + + @Override + public void close() {} + + public List<RowData> getOutputs() { + return output; + } + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org