This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 869396ba [Feature] Support insert overwrite (#544)
869396ba is described below

commit 869396bae945103e44dd592f781166ae634a9e2b
Author: wudi <676366...@qq.com>
AuthorDate: Mon Jan 20 14:21:30 2025 +0800

    [Feature] Support insert overwrite (#544)
---
 .../doris/flink/table/DorisDynamicTableSink.java   | 58 +++++++++++++++++++---
 .../apache/doris/flink/sink/DorisSinkITCase.java   | 56 +++++++++++++++++++++
 2 files changed, 108 insertions(+), 6 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
index 17db7d2e..f43d49bc 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
@@ -21,18 +21,23 @@ import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.sink.SinkV2Provider;
+import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.connection.SimpleJdbcConnectionProvider;
+import org.apache.doris.flink.exception.DorisSystemException;
 import org.apache.doris.flink.rest.RestService;
 import org.apache.doris.flink.sink.DorisSink;
 import org.apache.doris.flink.sink.writer.serializer.RowDataSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.sql.Connection;
+import java.sql.Statement;
 import java.util.Arrays;
 import java.util.Objects;
 import java.util.Properties;
@@ -46,13 +51,14 @@ import static 
org.apache.doris.flink.sink.writer.LoadConstants.FIELD_DELIMITER_K
 import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
 
 /** DorisDynamicTableSink. */
-public class DorisDynamicTableSink implements DynamicTableSink {
+public class DorisDynamicTableSink implements DynamicTableSink, 
SupportsOverwrite {
     private static final Logger LOG = 
LoggerFactory.getLogger(DorisDynamicTableSink.class);
     private final DorisOptions options;
     private final DorisReadOptions readOptions;
     private final DorisExecutionOptions executionOptions;
     private final TableSchema tableSchema;
     private final Integer sinkParallelism;
+    private boolean overwrite = false;
 
     public DorisDynamicTableSink(
             DorisOptions options,
@@ -115,13 +121,46 @@ public class DorisDynamicTableSink implements 
DynamicTableSink {
                 .setDorisReadOptions(readOptions)
                 .setDorisExecutionOptions(executionOptions)
                 .setSerializer(serializerBuilder.build());
-        return SinkV2Provider.of(dorisSinkBuilder.build(), sinkParallelism);
+        DorisSink<RowData> dorisSink = dorisSinkBuilder.build();
+
+        // for insert overwrite
+        if (overwrite) {
+            if (context.isBounded()) {
+                // execute jdbc query to truncate table
+                Preconditions.checkArgument(
+                        options.getJdbcUrl() != null, "jdbc-url is required 
for Overwrite mode.");
+                // todo: should be written to a temporary table first,
+                // and then use GlobalCommitter to perform the rename.
+                truncateTable();
+            } else {
+                throw new IllegalStateException("Streaming mode not support 
overwrite.");
+            }
+        }
+        return SinkV2Provider.of(dorisSink, sinkParallelism);
+    }
+
+    private void truncateTable() {
+        String truncateQuery = "TRUNCATE TABLE " + 
options.getTableIdentifier();
+        SimpleJdbcConnectionProvider jdbcConnectionProvider =
+                new SimpleJdbcConnectionProvider(options);
+        try (Connection connection = 
jdbcConnectionProvider.getOrEstablishConnection();
+                Statement statement = connection.createStatement()) {
+            LOG.info("Executing truncate query: {}", truncateQuery);
+            statement.execute(truncateQuery);
+        } catch (Exception e) {
+            LOG.error("Failed to execute truncate query: {}", truncateQuery, 
e);
+            throw new DorisSystemException(
+                    String.format("Failed to execute truncate query: %s", 
truncateQuery), e);
+        }
     }
 
     @Override
     public DynamicTableSink copy() {
-        return new DorisDynamicTableSink(
-                options, readOptions, executionOptions, tableSchema, 
sinkParallelism);
+        DorisDynamicTableSink sink =
+                new DorisDynamicTableSink(
+                        options, readOptions, executionOptions, tableSchema, 
sinkParallelism);
+        sink.overwrite = overwrite;
+        return sink;
     }
 
     @Override
@@ -142,11 +181,18 @@ public class DorisDynamicTableSink implements 
DynamicTableSink {
                 && Objects.equals(readOptions, that.readOptions)
                 && Objects.equals(executionOptions, that.executionOptions)
                 && Objects.equals(tableSchema, that.tableSchema)
-                && Objects.equals(sinkParallelism, that.sinkParallelism);
+                && Objects.equals(sinkParallelism, that.sinkParallelism)
+                && Objects.equals(overwrite, that.overwrite);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(options, readOptions, executionOptions, 
tableSchema, sinkParallelism);
+        return Objects.hash(
+                options, readOptions, executionOptions, tableSchema, 
sinkParallelism, overwrite);
+    }
+
+    @Override
+    public void applyOverwrite(boolean overwrite) {
+        this.overwrite = overwrite;
     }
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
index 80986ea3..dd74bb1c 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.runtime.minicluster.RpcServiceSharing;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.StringUtils;
@@ -53,6 +54,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.api.common.JobStatus.FINISHED;
 import static org.apache.flink.api.common.JobStatus.RUNNING;
@@ -68,6 +70,7 @@ public class DorisSinkITCase extends AbstractITCaseService {
     static final String TABLE_CSV_BATCH_TBL = "tbl_csv_batch_tbl";
     static final String TABLE_CSV_BATCH_DS = "tbl_csv_batch_DS";
     static final String TABLE_GROUP_COMMIT = "tbl_group_commit";
+    static final String TABLE_OVERWRITE = "tbl_overwrite";
     static final String TABLE_GZ_FORMAT = "tbl_gz_format";
     static final String TABLE_CSV_JM = "tbl_csv_jm";
     static final String TABLE_CSV_TM = "tbl_csv_tm";
@@ -556,6 +559,59 @@ public class DorisSinkITCase extends AbstractITCaseService 
{
         ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, 
query, 2);
     }
 
+    @Test
+    public void testTableOverwrite() throws Exception {
+        initializeTable(TABLE_OVERWRITE);
+        // mock data
+        ContainerUtils.executeSQLStatement(
+                getDorisQueryConnection(),
+                LOG,
+                String.format(
+                        "INSERT INTO %s.%s values('history-data',12)", 
DATABASE, TABLE_OVERWRITE));
+
+        List<String> expected_his = Arrays.asList("history-data,12");
+        String query =
+                String.format("select name,age from %s.%s order by 1", 
DATABASE, TABLE_OVERWRITE);
+        ContainerUtils.checkResult(getDorisQueryConnection(), LOG, 
expected_his, query, 2);
+
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(DEFAULT_PARALLELISM);
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        String sinkDDL =
+                String.format(
+                        "CREATE TABLE doris_overwrite_sink ("
+                                + " name STRING,"
+                                + " age INT"
+                                + ") WITH ("
+                                + " 'connector' = '"
+                                + DorisConfigOptions.IDENTIFIER
+                                + "',"
+                                + " 'fenodes' = '%s',"
+                                + " 'table.identifier' = '%s',"
+                                + " 'jdbc-url' = '%s',"
+                                + " 'username' = '%s',"
+                                + " 'password' = '%s',"
+                                + " 'sink.label-prefix' = '"
+                                + UUID.randomUUID()
+                                + "'"
+                                + ")",
+                        getFenodes(),
+                        DATABASE + "." + TABLE_OVERWRITE,
+                        getDorisQueryUrl(),
+                        getDorisUsername(),
+                        getDorisPassword());
+        tEnv.executeSql(sinkDDL);
+        TableResult tableResult =
+                tEnv.executeSql(
+                        "INSERT OVERWRITE doris_overwrite_sink SELECT 
'doris',1 union all  SELECT 'overwrite',2 union all  SELECT 'flink',3");
+
+        tableResult.await(25000, TimeUnit.MILLISECONDS);
+        List<String> expected = Arrays.asList("doris,1", "flink,3", 
"overwrite,2");
+        ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, 
query, 2);
+    }
+
     private void initializeTable(String table) {
         ContainerUtils.executeSQLStatement(
                 getDorisQueryConnection(),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to