This is an automated email from the ASF dual-hosted git repository.

jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new a355ca9  [refactor] sink api refactor by FLIP-191 (#213)
a355ca9 is described below

commit a355ca9675181aa6dcd53017d09852c18870f44c
Author: wudi <676366...@qq.com>
AuthorDate: Tue Oct 24 10:56:53 2023 +0800

    [refactor] sink api refactor by FLIP-191 (#213)
---
 .../org/apache/doris/flink/sink/DorisSink.java     | 41 +++++++-------
 .../doris/flink/sink/committer/DorisCommitter.java | 22 ++++----
 .../doris/flink/sink/writer/DorisStreamLoad.java   |  2 +
 .../doris/flink/sink/writer/DorisWriter.java       | 27 ++++++---
 .../doris/flink/table/DorisDynamicTableSink.java   |  3 +-
 .../flink/sink/committer/MockCommitRequest.java    | 64 ++++++++++++++++++++++
 .../flink/sink/committer/TestDorisCommitter.java   | 12 ++--
 .../doris/flink/sink/writer/TestDorisWriter.java   | 16 +++---
 8 files changed, 132 insertions(+), 55 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
index d1aee44..bc2d45c 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
@@ -24,27 +24,28 @@ import org.apache.doris.flink.rest.RestService;
 import org.apache.doris.flink.sink.committer.DorisCommitter;
 import org.apache.doris.flink.sink.writer.DorisRecordSerializer;
 import org.apache.doris.flink.sink.writer.DorisWriter;
-import org.apache.flink.api.connector.sink.Committer;
 import org.apache.doris.flink.sink.writer.DorisWriterState;
 import org.apache.doris.flink.sink.writer.DorisWriterStateSerializer;
-import org.apache.flink.api.connector.sink.GlobalCommitter;
-import org.apache.flink.api.connector.sink.Sink;
-import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.sink2.StatefulSink;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.List;
-import java.util.Optional;
+import java.util.Collection;
+import java.util.Collections;
 
 /**
  * Load data into Doris based on 2PC.
  * see {@link DorisWriter} and {@link DorisCommitter}.
  * @param <IN> type of record.
  */
-public class DorisSink<IN> implements Sink<IN, DorisCommittable, 
DorisWriterState, DorisCommittable> {
+public class DorisSink<IN>
+        implements StatefulSink<IN, DorisWriterState>,
+            TwoPhaseCommittingSink<IN, DorisCommittable>{
 
     private static final Logger LOG = LoggerFactory.getLogger(DorisSink.class);
     private final DorisOptions dorisOptions;
@@ -75,36 +76,32 @@ public class DorisSink<IN> implements Sink<IN, 
DorisCommittable, DorisWriterStat
     }
 
     @Override
-    public SinkWriter<IN, DorisCommittable, DorisWriterState> 
createWriter(InitContext initContext, List<DorisWriterState> state) throws 
IOException {
-        DorisWriter<IN> dorisWriter = new DorisWriter<IN>(initContext, state, 
serializer, dorisOptions, dorisReadOptions, dorisExecutionOptions);
-        dorisWriter.initializeLoad(state);
+    public DorisWriter<IN> createWriter(InitContext initContext) throws 
IOException {
+        DorisWriter<IN> dorisWriter = new DorisWriter<>(initContext, 
Collections.emptyList(), serializer, dorisOptions, dorisReadOptions, 
dorisExecutionOptions);
         return dorisWriter;
     }
 
     @Override
-    public Optional<SimpleVersionedSerializer<DorisWriterState>> 
getWriterStateSerializer() {
-        return Optional.of(new DorisWriterStateSerializer());
+    public Committer<DorisCommittable> createCommitter() throws IOException {
+        return new DorisCommitter(dorisOptions, dorisReadOptions, 
dorisExecutionOptions.getMaxRetries());
     }
 
     @Override
-    public Optional<Committer<DorisCommittable>> createCommitter() throws 
IOException {
-        return Optional.of(new DorisCommitter(dorisOptions, dorisReadOptions, 
dorisExecutionOptions.getMaxRetries()));
+    public DorisWriter<IN> restoreWriter(InitContext initContext, 
Collection<DorisWriterState> recoveredState) throws IOException {
+        DorisWriter<IN> dorisWriter = new DorisWriter<>(initContext, 
recoveredState, serializer, dorisOptions, dorisReadOptions, 
dorisExecutionOptions);
+        return dorisWriter;
     }
 
     @Override
-    public Optional<GlobalCommitter<DorisCommittable, DorisCommittable>> 
createGlobalCommitter() throws IOException {
-        return Optional.empty();
+    public SimpleVersionedSerializer<DorisWriterState> 
getWriterStateSerializer() {
+        return new DorisWriterStateSerializer();
     }
 
     @Override
-    public Optional<SimpleVersionedSerializer<DorisCommittable>> 
getCommittableSerializer() {
-        return Optional.of(new DorisCommittableSerializer());
+    public SimpleVersionedSerializer<DorisCommittable> 
getCommittableSerializer() {
+        return new DorisCommittableSerializer();
     }
 
-    @Override
-    public Optional<SimpleVersionedSerializer<DorisCommittable>> 
getGlobalCommittableSerializer() {
-        return Optional.empty();
-    }
 
     public static <IN> Builder<IN> builder() {
         return new Builder<>();
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
index 0e19b0f..2a0fba0 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
@@ -29,7 +29,7 @@ import org.apache.doris.flink.sink.DorisCommittable;
 import org.apache.doris.flink.sink.HttpPutBuilder;
 import org.apache.doris.flink.sink.HttpUtil;
 import org.apache.doris.flink.sink.ResponseUtil;
-import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink2.Committer;
 import org.apache.http.StatusLine;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpPut;
@@ -38,10 +38,10 @@ import org.apache.http.util.EntityUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Closeable;
 import java.io.IOException;
-import java.util.Collections;
+import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
@@ -49,7 +49,7 @@ import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
 /**
  * The committer to commit transaction.
  */
-public class DorisCommitter implements Committer<DorisCommittable> {
+public class DorisCommitter implements Committer<DorisCommittable>, Closeable {
     private static final Logger LOG = 
LoggerFactory.getLogger(DorisCommitter.class);
     private static final String commitPattern = 
"http://%s/api/%s/_stream_load_2pc";;
     private final CloseableHttpClient httpClient;
@@ -75,11 +75,10 @@ public class DorisCommitter implements 
Committer<DorisCommittable> {
     }
 
     @Override
-    public List<DorisCommittable> commit(List<DorisCommittable> 
committableList) throws IOException {
-        for (DorisCommittable committable : committableList) {
-            commitTransaction(committable);
+    public void commit(Collection<CommitRequest<DorisCommittable>> requests) 
throws IOException, InterruptedException {
+        for (CommitRequest<DorisCommittable> request: requests) {
+            commitTransaction(request.getCommittable());
         }
-        return Collections.emptyList();
     }
 
     private void commitTransaction(DorisCommittable committable) throws 
IOException {
@@ -133,9 +132,12 @@ public class DorisCommitter implements 
Committer<DorisCommittable> {
     }
 
     @Override
-    public void close() throws Exception {
+    public void close() {
         if (httpClient != null) {
-            httpClient.close();
+            try {
+                httpClient.close();
+            } catch (IOException e) {
+            }
         }
     }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
index cda3c05..4fd9abf 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
@@ -139,6 +139,8 @@ public class DorisStreamLoad implements Serializable {
         LOG.info("abort for labelSuffix {}. start chkId {}.", labelSuffix, 
chkID);
         while (true) {
             try {
+                // TODO: According to label abort txn. Currently, it can only 
be aborted based on txnid,
+                //  so we must first request a streamload based on the label 
to get the txnid.
                 String label = labelGenerator.generateLabel(startChkID);
                 HttpPutBuilder builder = new HttpPutBuilder();
                 builder.setUrl(loadUrlStr)
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index 230bad5..295a0be 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.flink.sink.writer;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
@@ -27,11 +28,10 @@ import org.apache.doris.flink.rest.models.RespContent;
 import org.apache.doris.flink.sink.BackendUtil;
 import org.apache.doris.flink.sink.DorisCommittable;
 import org.apache.doris.flink.sink.HttpUtil;
-
-import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.connector.sink.Sink;
-import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.StatefulSink;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
 import org.apache.flink.util.Preconditions;
@@ -42,6 +42,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
@@ -56,7 +57,8 @@ import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
  * Doris Writer will load data to doris.
  * @param <IN>
  */
-public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable, 
DorisWriterState> {
+public class DorisWriter<IN> implements StatefulSink.StatefulSinkWriter<IN, 
DorisWriterState>,
+        TwoPhaseCommittingSink.PrecommittingSinkWriter<IN, DorisCommittable> {
     private static final Logger LOG = 
LoggerFactory.getLogger(DorisWriter.class);
     private static final List<String> DORIS_SUCCESS_STATUS = new 
ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT));
     private final long lastCheckpointId;
@@ -78,7 +80,7 @@ public class DorisWriter<IN> implements SinkWriter<IN, 
DorisCommittable, DorisWr
     private String currentLabel;
 
     public DorisWriter(Sink.InitContext initContext,
-                       List<DorisWriterState> state,
+                       Collection<DorisWriterState> state,
                        DorisRecordSerializer<IN> serializer,
                        DorisOptions dorisOptions,
                        DorisReadOptions dorisReadOptions,
@@ -100,9 +102,11 @@ public class DorisWriter<IN> implements SinkWriter<IN, 
DorisCommittable, DorisWr
         this.executionOptions = executionOptions;
         this.intervalTime = executionOptions.checkInterval();
         this.loading = false;
+
+        initializeLoad(state);
     }
 
-    public void initializeLoad(List<DorisWriterState> state) throws 
IOException {
+    public void initializeLoad(Collection<DorisWriterState> state) {
         this.backendUtil = StringUtils.isNotEmpty(dorisOptions.getBenodes()) ? 
new BackendUtil(
                 dorisOptions.getBenodes())
                 : new BackendUtil(RestService.getBackendsV2(dorisOptions, 
dorisReadOptions, LOG));
@@ -144,7 +148,13 @@ public class DorisWriter<IN> implements SinkWriter<IN, 
DorisCommittable, DorisWr
     }
 
     @Override
-    public List<DorisCommittable> prepareCommit(boolean flush) throws 
IOException {
+    public void flush(boolean flush) throws IOException, InterruptedException {
+
+    }
+
+
+    @Override
+    public Collection<DorisCommittable> prepareCommit() throws IOException, 
InterruptedException {
         if(!loading){
             //There is no data during the entire checkpoint period
             return Collections.emptyList();
@@ -246,4 +256,5 @@ public class DorisWriter<IN> implements SinkWriter<IN, 
DorisCommittable, DorisWr
             dorisStreamLoad.close();
         }
     }
+
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
index 06f2bfb..66d0227 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
@@ -27,7 +27,6 @@ import org.apache.doris.flink.sink.writer.RowDataSerializer;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.sink.SinkProvider;
 import org.apache.flink.table.connector.sink.SinkV2Provider;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.util.Preconditions;
@@ -104,7 +103,7 @@ public class DorisDynamicTableSink implements 
DynamicTableSink {
                     .setDorisReadOptions(readOptions)
                     .setDorisExecutionOptions(executionOptions)
                     .setSerializer(serializerBuilder.build());
-            return SinkProvider.of(dorisSinkBuilder.build(), sinkParallelism);
+            return SinkV2Provider.of(dorisSinkBuilder.build(), 
sinkParallelism);
         }else{
             DorisBatchSink.Builder<RowData> dorisBatchSinkBuilder = 
DorisBatchSink.builder();
             dorisBatchSinkBuilder.setDorisOptions(options)
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/MockCommitRequest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/MockCommitRequest.java
new file mode 100644
index 0000000..d879e81
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/MockCommitRequest.java
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.committer;
+
+import org.apache.flink.api.connector.sink2.Committer;
+
+public class MockCommitRequest <CommT> implements 
Committer.CommitRequest<CommT>{
+
+    private final CommT committable;
+
+    public MockCommitRequest(CommT committable) {
+        this.committable = committable;
+    }
+
+    @Override
+    public CommT getCommittable() {
+        return committable;
+    }
+
+    @Override
+    public int getNumberOfRetries() {
+        return 0;
+    }
+
+    @Override
+    public void signalFailedWithKnownReason(Throwable throwable) {
+
+    }
+
+    @Override
+    public void signalFailedWithUnknownReason(Throwable throwable) {
+
+    }
+
+    @Override
+    public void retryLater() {
+
+    }
+
+    @Override
+    public void updateAndRetryLater(CommT commT) {
+
+    }
+
+    @Override
+    public void signalAlreadyCommitted() {
+
+    }
+}
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java
index 7cc2a88..794f806 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java
@@ -25,7 +25,6 @@ import 
org.apache.doris.flink.rest.models.BackendV2.BackendRowV2;
 import org.apache.doris.flink.sink.DorisCommittable;
 import org.apache.doris.flink.sink.HttpEntityMock;
 import org.apache.doris.flink.sink.OptionUtils;
-
 import org.apache.http.ProtocolVersion;
 import org.apache.http.StatusLine;
 import org.apache.http.client.methods.CloseableHttpResponse;
@@ -34,16 +33,15 @@ import org.apache.http.message.BasicStatusLine;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-
+import org.mockito.MockedStatic;
+import org.slf4j.Logger;
 
 import java.util.Collections;
 
 import static org.mockito.ArgumentMatchers.any;
-import org.mockito.MockedStatic;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.when;
-import org.slf4j.Logger;
 
 /**
  * Test for Doris Committer.
@@ -83,7 +81,8 @@ public class TestDorisCommitter {
                 "\"msg\": \"errCode = 2, detailMessage = transaction [2] is 
already visible, not pre-committed.\"\n" +
                 "}";
         this.entityMock.setValue(response);
-        dorisCommitter.commit(Collections.singletonList(dorisCommittable));
+        final MockCommitRequest<DorisCommittable> request = new 
MockCommitRequest<>(dorisCommittable);
+        dorisCommitter.commit(Collections.singletonList(request));
 
     }
 
@@ -94,7 +93,8 @@ public class TestDorisCommitter {
                 "\"msg\": \"errCode = 2, detailMessage = transaction [25] is 
already aborted. abort reason: User Abort\"\n" +
                 "}";
         this.entityMock.setValue(response);
-        dorisCommitter.commit(Collections.singletonList(dorisCommittable));
+        final MockCommitRequest<DorisCommittable> request = new 
MockCommitRequest<>(dorisCommittable);
+        dorisCommitter.commit(Collections.singletonList(request));
     }
 
     @After
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
index e988d6b..01e1559 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
@@ -23,7 +23,7 @@ import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.sink.DorisCommittable;
 import org.apache.doris.flink.sink.HttpTestUtil;
 import org.apache.doris.flink.sink.OptionUtils;
-import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.junit.Assert;
@@ -31,6 +31,7 @@ import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.OptionalLong;
@@ -67,12 +68,13 @@ public class TestDorisWriter {
         
when(initContext.getRestoredCheckpointId()).thenReturn(OptionalLong.of(1));
         DorisWriter<String> dorisWriter = new DorisWriter<String>(initContext, 
Collections.emptyList(), new SimpleStringSerializer(), dorisOptions, 
readOptions, executionOptions);
         dorisWriter.setDorisStreamLoad(dorisStreamLoad);
-        List<DorisCommittable> committableList = 
dorisWriter.prepareCommit(true);
-
+        dorisWriter.write("doris,1",null);
+        Collection<DorisCommittable> committableList = 
dorisWriter.prepareCommit();
         Assert.assertEquals(1, committableList.size());
-        Assert.assertEquals("local:8040", 
committableList.get(0).getHostPort());
-        Assert.assertEquals("db_test", committableList.get(0).getDb());
-        Assert.assertEquals(2, committableList.get(0).getTxnID());
+        DorisCommittable dorisCommittable = 
committableList.stream().findFirst().get();
+        Assert.assertEquals("local:8040", dorisCommittable.getHostPort());
+        Assert.assertEquals("test", dorisCommittable.getDb());
+        Assert.assertEquals(2, dorisCommittable.getTxnID());
         Assert.assertFalse(dorisWriter.isLoading());
     }
 
@@ -91,6 +93,6 @@ public class TestDorisWriter {
 
         Assert.assertEquals(1, writerStates.size());
         Assert.assertEquals("doris", writerStates.get(0).getLabelPrefix());
-        Assert.assertTrue(dorisWriter.isLoading());
+        Assert.assertTrue(!dorisWriter.isLoading());
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to