This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 43a055a9 [improve] support group commit (#412) 43a055a9 is described below commit 43a055a9e6b4c728912725976eedbecdfb8b270c Author: wudi <676366...@qq.com> AuthorDate: Mon Jul 1 10:31:15 2024 +0800 [improve] support group commit (#412) --- .../java/org/apache/doris/flink/sink/HttpPutBuilder.java | 4 +++- .../apache/doris/flink/sink/batch/DorisBatchStreamLoad.java | 6 ++++++ .../org/apache/doris/flink/sink/writer/DorisStreamLoad.java | 12 +++++++++++- .../org/apache/doris/flink/sink/writer/LoadConstants.java | 1 + .../apache/doris/flink/sink/copy/TestDorisCopyWriter.java | 3 +++ 5 files changed, 24 insertions(+), 2 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpPutBuilder.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpPutBuilder.java index 023cd31a..44f6c9fe 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpPutBuilder.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpPutBuilder.java @@ -111,7 +111,9 @@ public class HttpPutBuilder { } public HttpPutBuilder setLabel(String label) { - header.put("label", label); + if (label != null) { + header.put("label", label); + } return this; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java index d9fba749..fbc6daa0 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java @@ -67,6 +67,7 @@ import static org.apache.doris.flink.sink.LoadStatus.SUCCESS; import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW; import static org.apache.doris.flink.sink.writer.LoadConstants.CSV; import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY; +import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT; import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT; import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY; @@ -95,6 +96,7 @@ public class DorisBatchStreamLoad implements Serializable { private AtomicReference<Throwable> exception = new AtomicReference<>(null); private HttpClientBuilder httpClientBuilder = new HttpUtil().getHttpClientBuilderForBatch(); private BackendUtil backendUtil; + private boolean enableGroupCommit; public DorisBatchStreamLoad( DorisOptions dorisOptions, @@ -120,6 +122,7 @@ public class DorisBatchStreamLoad implements Serializable { LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT)) .getBytes(); } + this.enableGroupCommit = loadProps.containsKey(GROUP_COMMIT); this.executionOptions = executionOptions; this.flushQueue = new LinkedBlockingDeque<>(executionOptions.getFlushQueueSize()); if (StringUtils.isNotBlank(dorisOptions.getTableIdentifier())) { @@ -260,6 +263,9 @@ public class DorisBatchStreamLoad implements Serializable { /** execute stream load. */ public void load(String label, BatchRecordBuffer buffer) throws IOException { + if (enableGroupCommit) { + label = null; + } refreshLoadUrl(buffer.getDatabase(), buffer.getTable()); ByteBuffer data = buffer.getData(); ByteArrayEntity entity = diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java index 14e44dee..676de3df 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java @@ -58,6 +58,7 @@ import static org.apache.doris.flink.sink.ResponseUtil.LABEL_EXIST_PATTERN; import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW; import static org.apache.doris.flink.sink.writer.LoadConstants.CSV; import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY; +import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT; import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT; import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY; @@ -87,6 +88,7 @@ public class DorisStreamLoad implements Serializable { private final ExecutorService executorService; private boolean loadBatchFirstRecord; private volatile String currentLabel; + private boolean enableGroupCommit; public DorisStreamLoad( String hostPort, @@ -129,6 +131,7 @@ public class DorisStreamLoad implements Serializable { LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT)) .getBytes(); } + enableGroupCommit = streamLoadProp.containsKey(GROUP_COMMIT); loadBatchFirstRecord = true; } @@ -276,6 +279,9 @@ public class DorisStreamLoad implements Serializable { * @throws IOException */ public void startLoad(String label, boolean isResume) throws IOException { + if (enableGroupCommit) { + label = null; + } loadBatchFirstRecord = !isResume; HttpPutBuilder putBuilder = new HttpPutBuilder(); recordStream.startInput(isResume); @@ -294,10 +300,14 @@ public class DorisStreamLoad implements Serializable { if (enable2PC) { putBuilder.enable2PC(); } + String finalLabel = label; pendingLoadFuture = executorService.submit( () -> { - LOG.info("table {} start execute load for label {}", table, label); + LOG.info( + "table {} start execute load for label {}", + table, + finalLabel); return httpClient.execute(putBuilder.build()); }); } catch (Exception e) { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java index 2e5d29a5..2a79b736 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java @@ -31,4 +31,5 @@ public class LoadConstants { public static final String NULL_VALUE = "\\N"; public static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__"; public static final String READ_JSON_BY_LINE = "read_json_by_line"; + public static final String GROUP_COMMIT = "group_commit"; } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestDorisCopyWriter.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestDorisCopyWriter.java index e31e9179..c36805ba 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestDorisCopyWriter.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestDorisCopyWriter.java @@ -32,7 +32,9 @@ import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClientBuilder; import org.junit.Assert; import org.junit.Before; +import org.junit.FixMethodOrder; import org.junit.Test; +import org.junit.runners.MethodSorters; import java.util.Collection; import java.util.List; @@ -41,6 +43,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +@FixMethodOrder(MethodSorters.NAME_ASCENDING) public class TestDorisCopyWriter { DorisOptions dorisOptions; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org