This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new d41e035 [Bug] Fix log printing error (#315) d41e035 is described below commit d41e03517aa463581d0701905d94c1dbb1897223 Author: wudi <676366...@qq.com> AuthorDate: Tue Feb 6 14:01:20 2024 +0800 [Bug] Fix log printing error (#315) --- .../java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java | 8 +++++--- .../main/java/org/apache/doris/flink/sink/writer/DorisWriter.java | 4 +--- .../org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java | 6 +++--- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java index 604eb5c..5e2a697 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java @@ -85,6 +85,7 @@ public class DorisStreamLoad implements Serializable { private final CloseableHttpClient httpClient; private final ExecutorService executorService; private boolean loadBatchFirstRecord; + private volatile String currentLabel; public DorisStreamLoad( String hostPort, @@ -246,9 +247,9 @@ public class DorisStreamLoad implements Serializable { throw new StreamLoadException("stream load error: " + response.getStatusLine().toString()); } - public RespContent stopLoad(String label) throws IOException { + public RespContent stopLoad() throws IOException { recordStream.endInput(); - LOG.info("table {} stream load stopped for {} on host {}", table, label, hostPort); + LOG.info("table {} stream load stopped for {} on host {}", table, currentLabel, hostPort); Preconditions.checkState(pendingLoadFuture != null); try { return handlePreCommitResponse(pendingLoadFuture.get()); @@ -268,6 +269,7 @@ public class DorisStreamLoad implements Serializable { HttpPutBuilder putBuilder = new HttpPutBuilder(); recordStream.startInput(isResume); LOG.info("table {} stream load started for {} on host {}", table, label, hostPort); + this.currentLabel = label; try { InputStreamEntity entity = new InputStreamEntity(recordStream); putBuilder @@ -284,7 +286,7 @@ public class DorisStreamLoad implements Serializable { pendingLoadFuture = executorService.submit( () -> { - LOG.info("table {} start execute load", table); + LOG.info("table {} start execute load for label {}", table, label); return httpClient.execute(putBuilder.build()); }); } catch (Exception e) { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java index c6f8124..54facc7 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java @@ -233,9 +233,7 @@ public class DorisWriter<IN> continue; } DorisStreamLoad dorisStreamLoad = streamLoader.getValue(); - LabelGenerator labelGenerator = getLabelGenerator(tableIdentifier); - String currentLabel = labelGenerator.generateTableLabel(curCheckpointId); - RespContent respContent = dorisStreamLoad.stopLoad(currentLabel); + RespContent respContent = dorisStreamLoad.stopLoad(); // refresh metrics if (sinkMetricsMap.containsKey(tableIdentifier)) { DorisWriteMetrics dorisWriteMetrics = sinkMetricsMap.get(tableIdentifier); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java index fab2fcd..5ead3e8 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java @@ -120,7 +120,7 @@ public class TestDorisStreamLoad { httpClient); dorisStreamLoad.startLoad("1", false); dorisStreamLoad.writeRecord(writeBuffer); - dorisStreamLoad.stopLoad("label"); + dorisStreamLoad.stopLoad(); byte[] buff = new byte[4]; int n = dorisStreamLoad.getRecordStream().read(buff); dorisStreamLoad.getRecordStream().read(new byte[4]); @@ -147,7 +147,7 @@ public class TestDorisStreamLoad { dorisStreamLoad.startLoad("1", false); dorisStreamLoad.writeRecord(writeBuffer); dorisStreamLoad.writeRecord(writeBuffer); - dorisStreamLoad.stopLoad("label"); + dorisStreamLoad.stopLoad(); byte[] buff = new byte[9]; int n = dorisStreamLoad.getRecordStream().read(buff); int ret = dorisStreamLoad.getRecordStream().read(new byte[9]); @@ -179,7 +179,7 @@ public class TestDorisStreamLoad { dorisStreamLoad.startLoad("1", false); dorisStreamLoad.writeRecord("{\"id\": 1}".getBytes(StandardCharsets.UTF_8)); dorisStreamLoad.writeRecord("{\"id\": 2}".getBytes(StandardCharsets.UTF_8)); - dorisStreamLoad.stopLoad("label"); + dorisStreamLoad.stopLoad(); byte[] buff = new byte[expectBuffer.length]; int n = dorisStreamLoad.getRecordStream().read(buff); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org