This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new d65c0223 [Improve] support gz compress in streamload (#434)
d65c0223 is described below

commit d65c0223362dc841a1db25cc11b3c487db7c13e6
Author: wudi <676366...@qq.com>
AuthorDate: Mon Jul 22 15:49:26 2024 +0800

    [Improve] support gz compress in streamload (#434)
---
 .../flink/sink/batch/DorisBatchStreamLoad.java     |  9 +++++
 .../doris/flink/sink/writer/DorisStreamLoad.java   | 10 +++++
 .../doris/flink/sink/writer/LoadConstants.java     |  2 +
 .../apache/doris/flink/sink/DorisSinkITCase.java   | 43 ++++++++++++++++++++++
 4 files changed, 64 insertions(+)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
index 375e4335..c5614c31 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
@@ -34,6 +34,7 @@ import org.apache.doris.flink.sink.EscapeHandler;
 import org.apache.doris.flink.sink.HttpPutBuilder;
 import org.apache.doris.flink.sink.HttpUtil;
 import org.apache.doris.flink.sink.writer.LabelGenerator;
+import org.apache.http.client.entity.GzipCompressingEntity;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.entity.ByteArrayEntity;
 import org.apache.http.impl.client.CloseableHttpClient;
@@ -65,6 +66,8 @@ import java.util.concurrent.atomic.AtomicReference;
 import static org.apache.doris.flink.sink.LoadStatus.PUBLISH_TIMEOUT;
 import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
 import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW;
+import static org.apache.doris.flink.sink.writer.LoadConstants.COMPRESS_TYPE;
+import static 
org.apache.doris.flink.sink.writer.LoadConstants.COMPRESS_TYPE_GZ;
 import static org.apache.doris.flink.sink.writer.LoadConstants.CSV;
 import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
 import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT;
@@ -98,6 +101,7 @@ public class DorisBatchStreamLoad implements Serializable {
     private HttpClientBuilder httpClientBuilder = new 
HttpUtil().getHttpClientBuilderForBatch();
     private BackendUtil backendUtil;
     private boolean enableGroupCommit;
+    private boolean enableGzCompress;
 
     public DorisBatchStreamLoad(
             DorisOptions dorisOptions,
@@ -128,6 +132,7 @@ public class DorisBatchStreamLoad implements Serializable {
                         && !loadProps
                                 .getProperty(GROUP_COMMIT)
                                 .equalsIgnoreCase(GROUP_COMMIT_OFF_MODE);
+        this.enableGzCompress = loadProps.getProperty(COMPRESS_TYPE, 
"").equals(COMPRESS_TYPE_GZ);
         this.executionOptions = executionOptions;
         this.flushQueue = new 
LinkedBlockingDeque<>(executionOptions.getFlushQueueSize());
         if (StringUtils.isNotBlank(dorisOptions.getTableIdentifier())) {
@@ -285,6 +290,10 @@ public class DorisBatchStreamLoad implements Serializable {
                     .addHiddenColumns(executionOptions.getDeletable())
                     .addProperties(executionOptions.getStreamLoadProp());
 
+            if (enableGzCompress) {
+                putBuilder.setEntity(new GzipCompressingEntity(entity));
+            }
+
             Throwable resEx = new Throwable();
             int retry = 0;
             while (retry <= executionOptions.getMaxRetries()) {
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
index 4cbcb431..060bccb5 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
@@ -33,6 +33,7 @@ import org.apache.doris.flink.rest.models.RespContent;
 import org.apache.doris.flink.sink.EscapeHandler;
 import org.apache.doris.flink.sink.HttpPutBuilder;
 import org.apache.doris.flink.sink.ResponseUtil;
+import org.apache.http.client.entity.GzipCompressingEntity;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.entity.InputStreamEntity;
 import org.apache.http.impl.client.CloseableHttpClient;
@@ -56,6 +57,8 @@ import static 
org.apache.doris.flink.sink.LoadStatus.LABEL_ALREADY_EXIST;
 import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
 import static org.apache.doris.flink.sink.ResponseUtil.LABEL_EXIST_PATTERN;
 import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW;
+import static org.apache.doris.flink.sink.writer.LoadConstants.COMPRESS_TYPE;
+import static 
org.apache.doris.flink.sink.writer.LoadConstants.COMPRESS_TYPE_GZ;
 import static org.apache.doris.flink.sink.writer.LoadConstants.CSV;
 import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
 import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT;
@@ -90,6 +93,7 @@ public class DorisStreamLoad implements Serializable {
     private boolean loadBatchFirstRecord;
     private volatile String currentLabel;
     private boolean enableGroupCommit;
+    private boolean enableGzCompress;
 
     public DorisStreamLoad(
             String hostPort,
@@ -137,6 +141,8 @@ public class DorisStreamLoad implements Serializable {
                         && !streamLoadProp
                                 .getProperty(GROUP_COMMIT)
                                 .equalsIgnoreCase(GROUP_COMMIT_OFF_MODE);
+        this.enableGzCompress =
+                streamLoadProp.getProperty(COMPRESS_TYPE, 
"").equals(COMPRESS_TYPE_GZ);
         loadBatchFirstRecord = true;
     }
 
@@ -319,6 +325,10 @@ public class DorisStreamLoad implements Serializable {
                 putBuilder.enable2PC();
             }
 
+            if (enableGzCompress) {
+                putBuilder.setEntity(new GzipCompressingEntity(entity));
+            }
+
             String executeMessage;
             if (enableGroupCommit) {
                 executeMessage = "table " + table + " start execute load with 
group commit";
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java
index e8cd87e6..1e026977 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java
@@ -33,4 +33,6 @@ public class LoadConstants {
     public static final String READ_JSON_BY_LINE = "read_json_by_line";
     public static final String GROUP_COMMIT = "group_commit";
     public static final String GROUP_COMMIT_OFF_MODE = "off_mode";
+    public static final String COMPRESS_TYPE = "compress_type";
+    public static final String COMPRESS_TYPE_GZ = "gz";
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
index 91077ea2..aa3d00da 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
@@ -59,6 +59,7 @@ public class DorisSinkITCase extends DorisTestBase {
     static final String TABLE_CSV_BATCH_TBL = "tbl_csv_batch_tbl";
     static final String TABLE_CSV_BATCH_DS = "tbl_csv_batch_DS";
     static final String TABLE_GROUP_COMMIT = "tbl_group_commit";
+    static final String TABLE_GZ_FORMAT = "tbl_gz_format";
     static final String TABLE_CSV_JM = "tbl_csv_jm";
     static final String TABLE_CSV_TM = "tbl_csv_tm";
 
@@ -315,6 +316,48 @@ public class DorisSinkITCase extends DorisTestBase {
         checkResult(expected, query, 2);
     }
 
+    @Test
+    public void testTableGzFormat() throws Exception {
+        initializeTable(TABLE_GZ_FORMAT);
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        String sinkDDL =
+                String.format(
+                        "CREATE TABLE doris_gz_format_sink ("
+                                + " name STRING,"
+                                + " age INT"
+                                + ") WITH ("
+                                + " 'connector' = 'doris',"
+                                + " 'fenodes' = '%s',"
+                                + " 'table.identifier' = '%s',"
+                                + " 'username' = '%s',"
+                                + " 'password' = '%s',"
+                                + " 'sink.label-prefix' = '"
+                                + UUID.randomUUID()
+                                + "',"
+                                + " 'sink.properties.column_separator' = 
'\\x01',"
+                                + " 'sink.properties.line_delimiter' = 
'\\x02',"
+                                + " 'sink.properties.compress_type' = 'gz'"
+                                + ")",
+                        getFenodes(),
+                        DATABASE + "." + TABLE_GZ_FORMAT,
+                        USERNAME,
+                        PASSWORD);
+        tEnv.executeSql(sinkDDL);
+        tEnv.executeSql(
+                "INSERT INTO doris_gz_format_sink SELECT 'doris',1 union all  
SELECT 'flink',2");
+
+        Thread.sleep(25000);
+        List<String> expected = Arrays.asList("doris,1", "flink,2");
+        String query =
+                String.format("select name,age from %s.%s order by 1", 
DATABASE, TABLE_GZ_FORMAT);
+        //
+        checkResult(expected, query, 2);
+    }
+
     @Test
     public void testJobManagerFailoverSink() throws Exception {
         initializeFailoverTable(TABLE_CSV_JM);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to