This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new d41e035  [Bug] Fix log printing error  (#315)
d41e035 is described below

commit d41e03517aa463581d0701905d94c1dbb1897223
Author: wudi <676366...@qq.com>
AuthorDate: Tue Feb 6 14:01:20 2024 +0800

    [Bug] Fix log printing error  (#315)
---
 .../java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java  | 8 +++++---
 .../main/java/org/apache/doris/flink/sink/writer/DorisWriter.java | 4 +---
 .../org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java   | 6 +++---
 3 files changed, 9 insertions(+), 9 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
index 604eb5c..5e2a697 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
@@ -85,6 +85,7 @@ public class DorisStreamLoad implements Serializable {
     private final CloseableHttpClient httpClient;
     private final ExecutorService executorService;
     private boolean loadBatchFirstRecord;
+    private volatile String currentLabel;
 
     public DorisStreamLoad(
             String hostPort,
@@ -246,9 +247,9 @@ public class DorisStreamLoad implements Serializable {
         throw new StreamLoadException("stream load error: " + 
response.getStatusLine().toString());
     }
 
-    public RespContent stopLoad(String label) throws IOException {
+    public RespContent stopLoad() throws IOException {
         recordStream.endInput();
-        LOG.info("table {} stream load stopped for {} on host {}", table, 
label, hostPort);
+        LOG.info("table {} stream load stopped for {} on host {}", table, 
currentLabel, hostPort);
         Preconditions.checkState(pendingLoadFuture != null);
         try {
             return handlePreCommitResponse(pendingLoadFuture.get());
@@ -268,6 +269,7 @@ public class DorisStreamLoad implements Serializable {
         HttpPutBuilder putBuilder = new HttpPutBuilder();
         recordStream.startInput(isResume);
         LOG.info("table {} stream load started for {} on host {}", table, 
label, hostPort);
+        this.currentLabel = label;
         try {
             InputStreamEntity entity = new InputStreamEntity(recordStream);
             putBuilder
@@ -284,7 +286,7 @@ public class DorisStreamLoad implements Serializable {
             pendingLoadFuture =
                     executorService.submit(
                             () -> {
-                                LOG.info("table {} start execute load", table);
+                                LOG.info("table {} start execute load for 
label {}", table, label);
                                 return httpClient.execute(putBuilder.build());
                             });
         } catch (Exception e) {
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index c6f8124..54facc7 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -233,9 +233,7 @@ public class DorisWriter<IN>
                 continue;
             }
             DorisStreamLoad dorisStreamLoad = streamLoader.getValue();
-            LabelGenerator labelGenerator = getLabelGenerator(tableIdentifier);
-            String currentLabel = 
labelGenerator.generateTableLabel(curCheckpointId);
-            RespContent respContent = dorisStreamLoad.stopLoad(currentLabel);
+            RespContent respContent = dorisStreamLoad.stopLoad();
             // refresh metrics
             if (sinkMetricsMap.containsKey(tableIdentifier)) {
                 DorisWriteMetrics dorisWriteMetrics = 
sinkMetricsMap.get(tableIdentifier);
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
index fab2fcd..5ead3e8 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
@@ -120,7 +120,7 @@ public class TestDorisStreamLoad {
                         httpClient);
         dorisStreamLoad.startLoad("1", false);
         dorisStreamLoad.writeRecord(writeBuffer);
-        dorisStreamLoad.stopLoad("label");
+        dorisStreamLoad.stopLoad();
         byte[] buff = new byte[4];
         int n = dorisStreamLoad.getRecordStream().read(buff);
         dorisStreamLoad.getRecordStream().read(new byte[4]);
@@ -147,7 +147,7 @@ public class TestDorisStreamLoad {
         dorisStreamLoad.startLoad("1", false);
         dorisStreamLoad.writeRecord(writeBuffer);
         dorisStreamLoad.writeRecord(writeBuffer);
-        dorisStreamLoad.stopLoad("label");
+        dorisStreamLoad.stopLoad();
         byte[] buff = new byte[9];
         int n = dorisStreamLoad.getRecordStream().read(buff);
         int ret = dorisStreamLoad.getRecordStream().read(new byte[9]);
@@ -179,7 +179,7 @@ public class TestDorisStreamLoad {
         dorisStreamLoad.startLoad("1", false);
         dorisStreamLoad.writeRecord("{\"id\": 
1}".getBytes(StandardCharsets.UTF_8));
         dorisStreamLoad.writeRecord("{\"id\": 
2}".getBytes(StandardCharsets.UTF_8));
-        dorisStreamLoad.stopLoad("label");
+        dorisStreamLoad.stopLoad();
         byte[] buff = new byte[expectBuffer.length];
         int n = dorisStreamLoad.getRecordStream().read(buff);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to