This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new ba21aabd [improve]improve group commit logic (#413) ba21aabd is described below commit ba21aabd0be664141e44581c5d16d97e9ff5467f Author: Petrichor <31833513+vinle...@users.noreply.github.com> AuthorDate: Tue Jul 2 11:26:40 2024 +0800 [improve]improve group commit logic (#413) --- .../flink/sink/batch/DorisBatchStreamLoad.java | 14 +++++- .../doris/flink/sink/writer/DorisStreamLoad.java | 44 +++++++++++++++---- .../doris/flink/sink/writer/LoadConstants.java | 1 + .../apache/doris/flink/sink/DorisSinkITCase.java | 51 ++++++++++++++++++++++ 4 files changed, 99 insertions(+), 11 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java index fbc6daa0..375e4335 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java @@ -68,6 +68,7 @@ import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW; import static org.apache.doris.flink.sink.writer.LoadConstants.CSV; import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY; import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT; +import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT_OFF_MODE; import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT; import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY; @@ -122,7 +123,11 @@ public class DorisBatchStreamLoad implements Serializable { LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT)) .getBytes(); } - this.enableGroupCommit = loadProps.containsKey(GROUP_COMMIT); + this.enableGroupCommit = + loadProps.containsKey(GROUP_COMMIT) + && !loadProps + .getProperty(GROUP_COMMIT) + .equalsIgnoreCase(GROUP_COMMIT_OFF_MODE); this.executionOptions = executionOptions; this.flushQueue = new LinkedBlockingDeque<>(executionOptions.getFlushQueueSize()); if (StringUtils.isNotBlank(dorisOptions.getTableIdentifier())) { @@ -283,7 +288,12 @@ public class DorisBatchStreamLoad implements Serializable { Throwable resEx = new Throwable(); int retry = 0; while (retry <= executionOptions.getMaxRetries()) { - LOG.info("stream load started for {} on host {}", label, hostPort); + if (enableGroupCommit) { + LOG.info("stream load started with group commit on host {}", hostPort); + } else { + LOG.info("stream load started for {} on host {}", label, hostPort); + } + try (CloseableHttpClient httpClient = httpClientBuilder.build()) { try (CloseableHttpResponse response = httpClient.execute(putBuilder.build())) { int statusCode = response.getStatusLine().getStatusCode(); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java index 676de3df..4cbcb431 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java @@ -59,6 +59,7 @@ import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW; import static org.apache.doris.flink.sink.writer.LoadConstants.CSV; import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY; import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT; +import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT_OFF_MODE; import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT; import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY; @@ -131,7 +132,11 @@ public class DorisStreamLoad implements Serializable { LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT)) .getBytes(); } - enableGroupCommit = streamLoadProp.containsKey(GROUP_COMMIT); + this.enableGroupCommit = + streamLoadProp.containsKey(GROUP_COMMIT) + && !streamLoadProp + .getProperty(GROUP_COMMIT) + .equalsIgnoreCase(GROUP_COMMIT_OFF_MODE); loadBatchFirstRecord = true; } @@ -263,7 +268,16 @@ public class DorisStreamLoad implements Serializable { public RespContent stopLoad() throws IOException { recordStream.endInput(); - LOG.info("table {} stream load stopped for {} on host {}", table, currentLabel, hostPort); + if (enableGroupCommit) { + LOG.info("table {} stream load stopped with group commit on host {}", table, hostPort); + } else { + LOG.info( + "table {} stream load stopped for {} on host {}", + table, + currentLabel, + hostPort); + } + Preconditions.checkState(pendingLoadFuture != null); try { return handlePreCommitResponse(pendingLoadFuture.get()); @@ -285,7 +299,11 @@ public class DorisStreamLoad implements Serializable { loadBatchFirstRecord = !isResume; HttpPutBuilder putBuilder = new HttpPutBuilder(); recordStream.startInput(isResume); - LOG.info("table {} stream load started for {} on host {}", table, label, hostPort); + if (enableGroupCommit) { + LOG.info("table {} stream load started with group commit on host {}", table, hostPort); + } else { + LOG.info("table {} stream load started for {} on host {}", table, label, hostPort); + } this.currentLabel = label; try { InputStreamEntity entity = new InputStreamEntity(recordStream); @@ -300,18 +318,26 @@ public class DorisStreamLoad implements Serializable { if (enable2PC) { putBuilder.enable2PC(); } - String finalLabel = label; + + String executeMessage; + if (enableGroupCommit) { + executeMessage = "table " + table + " start execute load with group commit"; + } else { + executeMessage = "table " + table + " start execute load for label " + label; + } pendingLoadFuture = executorService.submit( () -> { - LOG.info( - "table {} start execute load for label {}", - table, - finalLabel); + LOG.info(executeMessage); return httpClient.execute(putBuilder.build()); }); } catch (Exception e) { - String err = "failed to stream load data with label: " + label; + String err; + if (enableGroupCommit) { + err = "failed to stream load data with group commit"; + } else { + err = "failed to stream load data with label: " + label; + } LOG.warn(err, e); throw e; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java index 2a79b736..e8cd87e6 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java @@ -32,4 +32,5 @@ public class LoadConstants { public static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__"; public static final String READ_JSON_BY_LINE = "read_json_by_line"; public static final String GROUP_COMMIT = "group_commit"; + public static final String GROUP_COMMIT_OFF_MODE = "off_mode"; } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java index 6fc2da7e..91077ea2 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java @@ -58,6 +58,7 @@ public class DorisSinkITCase extends DorisTestBase { static final String TABLE_JSON_TBL = "tbl_json_tbl"; static final String TABLE_CSV_BATCH_TBL = "tbl_csv_batch_tbl"; static final String TABLE_CSV_BATCH_DS = "tbl_csv_batch_DS"; + static final String TABLE_GROUP_COMMIT = "tbl_group_commit"; static final String TABLE_CSV_JM = "tbl_csv_jm"; static final String TABLE_CSV_TM = "tbl_csv_tm"; @@ -264,6 +265,56 @@ public class DorisSinkITCase extends DorisTestBase { checkResult(expected, query, 2); } + @Test + public void testTableGroupCommit() throws Exception { + initializeTable(TABLE_GROUP_COMMIT); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.setRuntimeMode(RuntimeExecutionMode.BATCH); + final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + String sinkDDL = + String.format( + "CREATE TABLE doris_group_commit_sink (" + + " name STRING," + + " age INT" + + ") WITH (" + + " 'connector' = 'doris'," + + " 'fenodes' = '%s'," + + " 'table.identifier' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'sink.label-prefix' = '" + + UUID.randomUUID() + + "'," + + " 'sink.properties.column_separator' = '\\x01'," + + " 'sink.properties.line_delimiter' = '\\x02'," + + " 'sink.properties.group_commit' = 'sync_mode'," + + " 'sink.ignore.update-before' = 'false'," + + " 'sink.enable.batch-mode' = 'true'," + + " 'sink.enable-delete' = 'true'," + + " 'sink.flush.queue-size' = '2'," + + " 'sink.buffer-flush.max-rows' = '3'," + + " 'sink.buffer-flush.max-bytes' = '5000'," + + " 'sink.buffer-flush.interval' = '10s'" + + ")", + getFenodes(), + DATABASE + "." + TABLE_GROUP_COMMIT, + USERNAME, + PASSWORD); + tEnv.executeSql(sinkDDL); + tEnv.executeSql( + "INSERT INTO doris_group_commit_sink SELECT 'doris',1 union all SELECT 'group_commit',2 union all SELECT 'flink',3"); + + Thread.sleep(25000); + List<String> expected = Arrays.asList("doris,1", "flink,3", "group_commit,2"); + String query = + String.format( + "select name,age from %s.%s order by 1", DATABASE, TABLE_GROUP_COMMIT); + // + checkResult(expected, query, 2); + } + @Test public void testJobManagerFailoverSink() throws Exception { initializeFailoverTable(TABLE_CSV_JM); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org