This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ec3fe8  [Feature] Support flink table lookup join (#61)
3ec3fe8 is described below

commit 3ec3fe8bf1d1e7193fcb2687d57967ee4714b4ac
Author: wudi <676366...@qq.com>
AuthorDate: Thu Sep 15 14:08:41 2022 +0800

    [Feature] Support flink table lookup join (#61)
    
    * add flink table lookup join
    Co-authored-by: wudi <>
---
 .../doris/flink/cfg/DorisExecutionOptions.java     |   2 +-
 .../apache/doris/flink/cfg/DorisLookupOptions.java |  80 +++++++++++
 .../org/apache/doris/flink/cfg/DorisOptions.java   |   2 +-
 .../doris/flink/table/DorisConfigOptions.java      |  30 +++-
 .../flink/table/DorisDynamicTableFactory.java      |  16 +++
 .../doris/flink/table/DorisDynamicTableSource.java |  34 ++++-
 .../flink/table/DorisRowDataLookupFunction.java    | 160 +++++++++++++++++++++
 .../table/DorisRowDataLookupFunctionTest.java      | 155 ++++++++++++++++++++
 8 files changed, 471 insertions(+), 8 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index 102a7ee..36c577a 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -23,7 +23,7 @@ import java.io.Serializable;
 import java.util.Properties;
 
 /**
- * JDBC sink batch options.
+ * Doris sink batch options.
  */
 public class DorisExecutionOptions implements Serializable {
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisLookupOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisLookupOptions.java
new file mode 100644
index 0000000..f0f7eb4
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisLookupOptions.java
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.cfg;
+
+import java.io.Serializable;
+
+public class DorisLookupOptions implements Serializable {
+
+    private final long cacheMaxSize;
+    private final long cacheExpireMs;
+    private final int maxRetryTimes;
+
+    public DorisLookupOptions(
+            long cacheMaxSize, long cacheExpireMs, int maxRetryTimes) {
+        this.cacheMaxSize = cacheMaxSize;
+        this.cacheExpireMs = cacheExpireMs;
+        this.maxRetryTimes = maxRetryTimes;
+    }
+
+    public long getCacheMaxSize() {
+        return cacheMaxSize;
+    }
+
+    public long getCacheExpireMs() {
+        return cacheExpireMs;
+    }
+
+    public int getMaxRetryTimes() {
+        return maxRetryTimes;
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /** Builder of {@link DorisLookupOptions}. */
+    public static class Builder {
+        private long cacheMaxSize = -1L;
+        private long cacheExpireMs = -1L;
+        private int maxRetryTimes = 3;
+
+        /** optional, lookup cache max size, over this value, the old data 
will be eliminated. */
+        public Builder setCacheMaxSize(long cacheMaxSize) {
+            this.cacheMaxSize = cacheMaxSize;
+            return this;
+        }
+
+        /** optional, lookup cache expire mills, over this time, the old data 
will expire. */
+        public Builder setCacheExpireMs(long cacheExpireMs) {
+            this.cacheExpireMs = cacheExpireMs;
+            return this;
+        }
+
+        /** optional, max retry times. */
+        public Builder setMaxRetryTimes(int maxRetryTimes) {
+            this.maxRetryTimes = maxRetryTimes;
+            return this;
+        }
+
+        public DorisLookupOptions build() {
+            return new DorisLookupOptions(
+                    cacheMaxSize, cacheExpireMs, maxRetryTimes);
+        }
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
index 512d0ab..7d6963b 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
@@ -84,7 +84,7 @@ public class DorisOptions extends DorisConnectionOptions {
         }
 
         /**
-         * required, JDBC DB url.
+         * required, Frontend Http Rest url.
          */
         public Builder setFenodes(String fenodes) {
             this.fenodes = fenodes;
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index 5b56342..02a4d22 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -20,6 +20,8 @@ package org.apache.doris.flink.table;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 
+import java.time.Duration;
+
 import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT;
 import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT;
 import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT;
@@ -35,9 +37,9 @@ public class DorisConfigOptions {
     public static final String IDENTIFIER = "doris";
     // common option
     public static final ConfigOption<String> FENODES = 
ConfigOptions.key("fenodes").stringType().noDefaultValue().withDescription("doris
 fe http address.");
-    public static final ConfigOption<String> TABLE_IDENTIFIER = 
ConfigOptions.key("table.identifier").stringType().noDefaultValue().withDescription("the
 jdbc table name.");
-    public static final ConfigOption<String> USERNAME = 
ConfigOptions.key("username").stringType().noDefaultValue().withDescription("the
 jdbc user name.");
-    public static final ConfigOption<String> PASSWORD = 
ConfigOptions.key("password").stringType().noDefaultValue().withDescription("the
 jdbc password.");
+    public static final ConfigOption<String> TABLE_IDENTIFIER = 
ConfigOptions.key("table.identifier").stringType().noDefaultValue().withDescription("the
 doris table name.");
+    public static final ConfigOption<String> USERNAME = 
ConfigOptions.key("username").stringType().noDefaultValue().withDescription("the
 doris user name.");
+    public static final ConfigOption<String> PASSWORD = 
ConfigOptions.key("password").stringType().noDefaultValue().withDescription("the
 doris password.");
 
     // source config options
     public static final ConfigOption<String> DORIS_READ_FIELD = ConfigOptions
@@ -101,6 +103,28 @@ public class DorisConfigOptions {
             .defaultValue(false)
             .withDescription("Whether to read data using the new interface 
defined according to the FLIP-27 specification,default false");
 
+    // Lookup options
+    public static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS =
+            ConfigOptions.key("lookup.cache.max-rows")
+                    .longType()
+                    .defaultValue(-1L)
+                    .withDescription(
+                            "The max number of rows of lookup cache, over this 
value, the oldest rows will "
+                                    + "be eliminated. \"cache.max-rows\" and 
\"cache.ttl\" options must all be specified if any of them is "
+                                    + "specified.");
+
+    public static final ConfigOption<Duration> LOOKUP_CACHE_TTL =
+            ConfigOptions.key("lookup.cache.ttl")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(10))
+                    .withDescription("The cache time to live.");
+
+    public static final ConfigOption<Integer> LOOKUP_MAX_RETRIES =
+            ConfigOptions.key("lookup.max-retries")
+                    .intType()
+                    .defaultValue(3)
+                    .withDescription("The max retry times if lookup database 
failed.");
+
     // sink config options
     public static final ConfigOption<Boolean> SINK_ENABLE_2PC = ConfigOptions
             .key("sink.enable-2pc")
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index 81ee23c..5a11605 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -17,6 +17,7 @@
 package org.apache.doris.flink.table;
 
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
+import org.apache.doris.flink.cfg.DorisLookupOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.flink.configuration.ConfigOption;
@@ -48,6 +49,9 @@ import static 
org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_RETR
 import static 
org.apache.doris.flink.table.DorisConfigOptions.DORIS_TABLET_SIZE;
 import static org.apache.doris.flink.table.DorisConfigOptions.FENODES;
 import static org.apache.doris.flink.table.DorisConfigOptions.IDENTIFIER;
+import static 
org.apache.doris.flink.table.DorisConfigOptions.LOOKUP_CACHE_MAX_ROWS;
+import static org.apache.doris.flink.table.DorisConfigOptions.LOOKUP_CACHE_TTL;
+import static 
org.apache.doris.flink.table.DorisConfigOptions.LOOKUP_MAX_RETRIES;
 import static org.apache.doris.flink.table.DorisConfigOptions.PASSWORD;
 import static 
org.apache.doris.flink.table.DorisConfigOptions.SINK_BUFFER_COUNT;
 import static org.apache.doris.flink.table.DorisConfigOptions.SINK_BUFFER_SIZE;
@@ -102,6 +106,9 @@ public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory
         options.add(DORIS_DESERIALIZE_QUEUE_SIZE);
         options.add(DORIS_BATCH_SIZE);
         options.add(DORIS_EXEC_MEM_LIMIT);
+        options.add(LOOKUP_CACHE_MAX_ROWS);
+        options.add(LOOKUP_CACHE_TTL);
+        options.add(LOOKUP_MAX_RETRIES);
 
         options.add(SINK_CHECK_INTERVAL);
         options.add(SINK_ENABLE_2PC);
@@ -131,6 +138,7 @@ public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory
         return new DorisDynamicTableSource(
                 getDorisOptions(helper.getOptions()),
                 getDorisReadOptions(helper.getOptions()),
+                getDorisLookupOptions(helper.getOptions()),
                 physicalSchema);
     }
 
@@ -189,6 +197,14 @@ public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory
         return streamLoadProp;
     }
 
+    private DorisLookupOptions getDorisLookupOptions(ReadableConfig 
readableConfig){
+        final DorisLookupOptions.Builder builder = 
DorisLookupOptions.builder();
+        
builder.setCacheExpireMs(readableConfig.get(LOOKUP_CACHE_TTL).toMillis());
+        builder.setCacheMaxSize(readableConfig.get(LOOKUP_CACHE_MAX_ROWS));
+        builder.setMaxRetryTimes(readableConfig.get(LOOKUP_MAX_RETRIES));
+        return builder.build();
+    }
+
     @Override
     public DynamicTableSink createDynamicTableSink(Context context) {
         final FactoryUtil.TableFactoryHelper helper =
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
index 20ce4f6..c5e47fa 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.flink.table;
 
+import org.apache.doris.flink.cfg.DorisLookupOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.deserialization.RowDataDeserializationSchema;
@@ -34,9 +35,11 @@ import 
org.apache.flink.table.connector.source.InputFormatProvider;
 import org.apache.flink.table.connector.source.LookupTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource;
 import org.apache.flink.table.connector.source.SourceProvider;
+import org.apache.flink.table.connector.source.TableFunctionProvider;
 import 
org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
 import 
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,8 +61,19 @@ public final class DorisDynamicTableSource implements 
ScanTableSource, LookupTab
     private static final Logger LOG = 
LoggerFactory.getLogger(DorisDynamicTableSource.class);
     private final DorisOptions options;
     private final DorisReadOptions readOptions;
+    private  DorisLookupOptions lookupOptions;
     private TableSchema physicalSchema;
 
+    public DorisDynamicTableSource(DorisOptions options,
+                                   DorisReadOptions readOptions,
+                                   DorisLookupOptions lookupOptions,
+                                   TableSchema physicalSchema) {
+        this.options = options;
+        this.lookupOptions = lookupOptions;
+        this.readOptions = readOptions;
+        this.physicalSchema = physicalSchema;
+    }
+
     public DorisDynamicTableSource(DorisOptions options,
                                    DorisReadOptions readOptions,
                                    TableSchema physicalSchema) {
@@ -109,13 +123,27 @@ public final class DorisDynamicTableSource implements 
ScanTableSource, LookupTab
     }
 
     @Override
-    public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext 
lookupContext) {
-        return null;
+    public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext 
context) {
+        DataType physicalRowDataType = physicalSchema.toRowDataType();
+        String[] keyNames = new String[context.getKeys().length];
+        for (int i = 0; i < keyNames.length; i++) {
+            int[] innerKeyArr = context.getKeys()[i];
+            keyNames[i] = 
DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]);
+        }
+
+        return TableFunctionProvider.of(
+                new DorisRowDataLookupFunction(
+                        options,
+                        readOptions,
+                        lookupOptions,
+                        
DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),
+                        
DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0]),
+                        keyNames));
     }
 
     @Override
     public DynamicTableSource copy() {
-        return new DorisDynamicTableSource(options, readOptions, 
physicalSchema);
+        return new DorisDynamicTableSource(options, readOptions, 
lookupOptions, physicalSchema);
     }
 
     @Override
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataLookupFunction.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataLookupFunction.java
new file mode 100644
index 0000000..d6e0edd
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataLookupFunction.java
@@ -0,0 +1,160 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.table;
+
+import org.apache.doris.flink.cfg.DorisLookupOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.deserialization.converter.DorisRowConverter;
+import org.apache.doris.flink.exception.DorisException;
+import org.apache.doris.flink.rest.PartitionDefinition;
+import org.apache.doris.flink.rest.RestService;
+import org.apache.doris.flink.source.reader.DorisValueReader;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.types.DataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.StringJoiner;
+import java.util.concurrent.TimeUnit;
+
+public class DorisRowDataLookupFunction extends TableFunction<RowData> {
+    private static final Logger logger = 
LoggerFactory.getLogger(DorisRowDataLookupFunction.class);
+
+    private final DorisOptions options;
+    private final DorisReadOptions readOptions;
+    private final String[] selectFields;
+    private final String[] conditionFields;
+
+    private final long cacheMaxSize;
+    private final long cacheExpireMs;
+    private final int maxRetryTimes;
+
+    private final DorisRowConverter rowConverter;
+    private transient Cache<RowData, List<RowData>> cache;
+
+    public DorisRowDataLookupFunction(DorisOptions options,
+                                      DorisReadOptions readOptions,
+                                      DorisLookupOptions lookupOptions,
+                                      String[] selectFields,
+                                      DataType[] fieldTypes,
+                                      String[] conditionFields) {
+        this.options = options;
+        this.readOptions = readOptions;
+        this.selectFields = selectFields;
+        this.conditionFields = conditionFields;
+        this.cacheMaxSize = lookupOptions.getCacheMaxSize();
+        this.cacheExpireMs = lookupOptions.getCacheExpireMs();
+        this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
+        this.rowConverter = new DorisRowConverter(fieldTypes);
+    }
+
+    @Override
+    public void open(FunctionContext context) throws Exception {
+        super.open(context);
+        this.cache = cacheMaxSize == -1 || cacheExpireMs == -1
+                ? null
+                : CacheBuilder.newBuilder()
+                .expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
+                .maximumSize(cacheMaxSize)
+                .build();
+    }
+
+    /**
+     * This is a lookup method which is called by Flink framework in runtime.
+     *
+     * @param keys lookup keys
+     */
+    public void eval(Object... keys) {
+        RowData keyRow = GenericRowData.of(keys);
+        if (cache != null) {
+            List<RowData> cachedRows = cache.getIfPresent(keyRow);
+            if (cachedRows != null) {
+                for (RowData cachedRow : cachedRows) {
+                    collect(cachedRow);
+                }
+                return;
+            }
+        }
+
+        List<PartitionDefinition> partitions = getPartitions(keys);
+        for (int retry = 0; retry <= maxRetryTimes; retry++) {
+            try {
+                ArrayList<RowData> rows = new ArrayList<>();
+                for (PartitionDefinition part : partitions) {
+                    try (DorisValueReader valueReader = new 
DorisValueReader(part, options, readOptions)) {
+                        while (valueReader.hasNext()) {
+                            List<?> record = valueReader.next();
+                            GenericRowData rowData = 
rowConverter.convertInternal(record);
+                            rows.add(rowData);
+                            collect(rowData);
+                        }
+                    }
+                }
+                if(cache != null){
+                    rows.trimToSize();
+                    cache.put(keyRow, rows);
+                }
+                break;
+            } catch (Exception ex) {
+                logger.error(String.format("Read Doris error, retry times = 
%d", retry), ex);
+                if (retry >= maxRetryTimes) {
+                    throw new RuntimeException("Read Doris failed.", ex);
+                }
+                try {
+                    Thread.sleep(1000 * retry);
+                } catch (InterruptedException e1) {
+                    throw new RuntimeException(e1);
+                }
+            }
+        }
+    }
+
+    private List<PartitionDefinition> getPartitions(Object... keys) {
+        readOptions.setReadFields((String.join(",", selectFields)));
+        StringJoiner filter = new StringJoiner(" AND ");
+        for (int i = 0; i < keys.length && i < conditionFields.length; i++) {
+            filter.add(String.format("%s = '%s'", conditionFields[i], 
keys[i]));
+        }
+        readOptions.setFilterQuery(filter.toString());
+        try {
+            return RestService.findPartitions(options, readOptions, logger);
+        } catch (DorisException ex) {
+            logger.error("Failed fetch doris partitions");
+            return new ArrayList<>();
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+    }
+
+    @VisibleForTesting
+    public Cache<RowData, List<RowData>> getCache() {
+        return cache;
+    }
+}
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisRowDataLookupFunctionTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisRowDataLookupFunctionTest.java
new file mode 100644
index 0000000..cef9e10
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisRowDataLookupFunctionTest.java
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.table;
+
+import org.apache.doris.flink.cfg.DorisLookupOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Collector;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+
+@Ignore
+public class DorisRowDataLookupFunctionTest {
+
+    private static final String TEST_FENODES = "127.0.0.1:8030";
+    private static final String LOOKUP_TABLE = "test.t_lookup_table";
+
+    private static String[] fieldNames = new String[] {"id1", "id2", 
"c_string", "c_double"};
+    private static DataType[] fieldDataTypes =
+            new DataType[] {
+                    DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING(), 
DataTypes.DOUBLE()
+            };
+    private static String[] lookupKeys = new String[] {"id1", "id2"};
+
+    @Test
+    public void testEval() throws Exception {
+
+        DorisLookupOptions lookupOptions = 
DorisLookupOptions.builder().build();
+        DorisRowDataLookupFunction lookupFunction = 
buildRowDataLookupFunction(lookupOptions);
+
+        ListOutputCollector collector = new ListOutputCollector();
+        lookupFunction.setCollector(collector);
+
+        lookupFunction.open(null);
+
+        lookupFunction.eval(1, StringData.fromString("A"));
+        lookupFunction.eval(2, StringData.fromString("B"));
+
+        List<String> result =
+                new ArrayList<>(collector.getOutputs())
+                        
.stream().map(RowData::toString).sorted().collect(Collectors.toList());
+
+        List<String> expected = new ArrayList<>();
+        expected.add("+I(1,A,zhangsanA,1.12)");
+        expected.add("+I(1,A,zhangsanA-1,11.12)");
+        expected.add("+I(2,B,zhangsanB,2.12)");
+        Collections.sort(expected);
+
+        assertEquals(expected, result);
+    }
+
+    @Test
+    public void testEvalWithCache() throws Exception {
+        long cacheExpireMs = 10000;
+        DorisLookupOptions lookupOptions =
+                DorisLookupOptions.builder()
+                        .setCacheExpireMs(cacheExpireMs)
+                        .setCacheMaxSize(10)
+                        .build();
+
+        DorisRowDataLookupFunction lookupFunction = 
buildRowDataLookupFunction(lookupOptions);
+
+        ListOutputCollector collector = new ListOutputCollector();
+        lookupFunction.setCollector(collector);
+
+        lookupFunction.open(null);
+
+        lookupFunction.eval(4, StringData.fromString("D"));
+        lookupFunction.eval(5, StringData.fromString("5"));
+        RowData keyRow = GenericRowData.of(4, StringData.fromString("D"));
+        RowData keyRowNoExist = GenericRowData.of(5, 
StringData.fromString("5"));
+        Cache<RowData, List<RowData>> cache = lookupFunction.getCache();
+        // empty data should cache
+        assertEquals(cache.getIfPresent(keyRow),
+                        Arrays.asList(GenericRowData.of(
+                                4,
+                                StringData.fromString("D"),
+                                StringData.fromString("zhangsanD"),
+                                4.12)));
+        assertEquals(cache.getIfPresent(keyRowNoExist), 
Collections.<RowData>emptyList());
+
+        //cache data expire
+        Thread.sleep(cacheExpireMs);
+        assert cache.getIfPresent(keyRow) == null;
+    }
+
+
+    private DorisRowDataLookupFunction 
buildRowDataLookupFunction(DorisLookupOptions lookupOptions) {
+        DorisOptions dorisOptions = 
DorisOptions.builder().setFenodes(TEST_FENODES)
+                .setTableIdentifier(LOOKUP_TABLE)
+                .setUsername("root")
+                .setPassword("")
+                .build();
+
+        DorisReadOptions readOptions = DorisReadOptions.builder().build();
+
+        DorisRowDataLookupFunction lookupFunction =
+                new DorisRowDataLookupFunction(
+                        dorisOptions,
+                        readOptions,
+                        lookupOptions,
+                        fieldNames,
+                        fieldDataTypes,
+                        lookupKeys);
+
+        return lookupFunction;
+    }
+
+    private static final class ListOutputCollector implements 
Collector<RowData> {
+
+        private final List<RowData> output = new ArrayList<>();
+
+        @Override
+        public void collect(RowData row) {
+            this.output.add(row);
+        }
+
+        @Override
+        public void close() {}
+
+        public List<RowData> getOutputs() {
+            return output;
+        }
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to