This is an automated email from the ASF dual-hosted git repository. jiafengzheng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new a355ca9 [refactor] sink api refactor by FLIP-191 (#213) a355ca9 is described below commit a355ca9675181aa6dcd53017d09852c18870f44c Author: wudi <676366...@qq.com> AuthorDate: Tue Oct 24 10:56:53 2023 +0800 [refactor] sink api refactor by FLIP-191 (#213) --- .../org/apache/doris/flink/sink/DorisSink.java | 41 +++++++------- .../doris/flink/sink/committer/DorisCommitter.java | 22 ++++---- .../doris/flink/sink/writer/DorisStreamLoad.java | 2 + .../doris/flink/sink/writer/DorisWriter.java | 27 ++++++--- .../doris/flink/table/DorisDynamicTableSink.java | 3 +- .../flink/sink/committer/MockCommitRequest.java | 64 ++++++++++++++++++++++ .../flink/sink/committer/TestDorisCommitter.java | 12 ++-- .../doris/flink/sink/writer/TestDorisWriter.java | 16 +++--- 8 files changed, 132 insertions(+), 55 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java index d1aee44..bc2d45c 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java @@ -24,27 +24,28 @@ import org.apache.doris.flink.rest.RestService; import org.apache.doris.flink.sink.committer.DorisCommitter; import org.apache.doris.flink.sink.writer.DorisRecordSerializer; import org.apache.doris.flink.sink.writer.DorisWriter; -import org.apache.flink.api.connector.sink.Committer; import org.apache.doris.flink.sink.writer.DorisWriterState; import org.apache.doris.flink.sink.writer.DorisWriterStateSerializer; -import org.apache.flink.api.connector.sink.GlobalCommitter; -import org.apache.flink.api.connector.sink.Sink; -import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.api.connector.sink2.Committer; +import org.apache.flink.api.connector.sink2.StatefulSink; +import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.List; -import java.util.Optional; +import java.util.Collection; +import java.util.Collections; /** * Load data into Doris based on 2PC. * see {@link DorisWriter} and {@link DorisCommitter}. * @param <IN> type of record. */ -public class DorisSink<IN> implements Sink<IN, DorisCommittable, DorisWriterState, DorisCommittable> { +public class DorisSink<IN> + implements StatefulSink<IN, DorisWriterState>, + TwoPhaseCommittingSink<IN, DorisCommittable>{ private static final Logger LOG = LoggerFactory.getLogger(DorisSink.class); private final DorisOptions dorisOptions; @@ -75,36 +76,32 @@ public class DorisSink<IN> implements Sink<IN, DorisCommittable, DorisWriterStat } @Override - public SinkWriter<IN, DorisCommittable, DorisWriterState> createWriter(InitContext initContext, List<DorisWriterState> state) throws IOException { - DorisWriter<IN> dorisWriter = new DorisWriter<IN>(initContext, state, serializer, dorisOptions, dorisReadOptions, dorisExecutionOptions); - dorisWriter.initializeLoad(state); + public DorisWriter<IN> createWriter(InitContext initContext) throws IOException { + DorisWriter<IN> dorisWriter = new DorisWriter<>(initContext, Collections.emptyList(), serializer, dorisOptions, dorisReadOptions, dorisExecutionOptions); return dorisWriter; } @Override - public Optional<SimpleVersionedSerializer<DorisWriterState>> getWriterStateSerializer() { - return Optional.of(new DorisWriterStateSerializer()); + public Committer<DorisCommittable> createCommitter() throws IOException { + return new DorisCommitter(dorisOptions, dorisReadOptions, dorisExecutionOptions.getMaxRetries()); } @Override - public Optional<Committer<DorisCommittable>> createCommitter() throws IOException { - return Optional.of(new DorisCommitter(dorisOptions, dorisReadOptions, dorisExecutionOptions.getMaxRetries())); + public DorisWriter<IN> restoreWriter(InitContext initContext, Collection<DorisWriterState> recoveredState) throws IOException { + DorisWriter<IN> dorisWriter = new DorisWriter<>(initContext, recoveredState, serializer, dorisOptions, dorisReadOptions, dorisExecutionOptions); + return dorisWriter; } @Override - public Optional<GlobalCommitter<DorisCommittable, DorisCommittable>> createGlobalCommitter() throws IOException { - return Optional.empty(); + public SimpleVersionedSerializer<DorisWriterState> getWriterStateSerializer() { + return new DorisWriterStateSerializer(); } @Override - public Optional<SimpleVersionedSerializer<DorisCommittable>> getCommittableSerializer() { - return Optional.of(new DorisCommittableSerializer()); + public SimpleVersionedSerializer<DorisCommittable> getCommittableSerializer() { + return new DorisCommittableSerializer(); } - @Override - public Optional<SimpleVersionedSerializer<DorisCommittable>> getGlobalCommittableSerializer() { - return Optional.empty(); - } public static <IN> Builder<IN> builder() { return new Builder<>(); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java index 0e19b0f..2a0fba0 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java @@ -29,7 +29,7 @@ import org.apache.doris.flink.sink.DorisCommittable; import org.apache.doris.flink.sink.HttpPutBuilder; import org.apache.doris.flink.sink.HttpUtil; import org.apache.doris.flink.sink.ResponseUtil; -import org.apache.flink.api.connector.sink.Committer; +import org.apache.flink.api.connector.sink2.Committer; import org.apache.http.StatusLine; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPut; @@ -38,10 +38,10 @@ import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Closeable; import java.io.IOException; -import java.util.Collections; +import java.util.Collection; import java.util.HashMap; -import java.util.List; import java.util.Map; import static org.apache.doris.flink.sink.LoadStatus.SUCCESS; @@ -49,7 +49,7 @@ import static org.apache.doris.flink.sink.LoadStatus.SUCCESS; /** * The committer to commit transaction. */ -public class DorisCommitter implements Committer<DorisCommittable> { +public class DorisCommitter implements Committer<DorisCommittable>, Closeable { private static final Logger LOG = LoggerFactory.getLogger(DorisCommitter.class); private static final String commitPattern = "http://%s/api/%s/_stream_load_2pc"; private final CloseableHttpClient httpClient; @@ -75,11 +75,10 @@ public class DorisCommitter implements Committer<DorisCommittable> { } @Override - public List<DorisCommittable> commit(List<DorisCommittable> committableList) throws IOException { - for (DorisCommittable committable : committableList) { - commitTransaction(committable); + public void commit(Collection<CommitRequest<DorisCommittable>> requests) throws IOException, InterruptedException { + for (CommitRequest<DorisCommittable> request: requests) { + commitTransaction(request.getCommittable()); } - return Collections.emptyList(); } private void commitTransaction(DorisCommittable committable) throws IOException { @@ -133,9 +132,12 @@ public class DorisCommitter implements Committer<DorisCommittable> { } @Override - public void close() throws Exception { + public void close() { if (httpClient != null) { - httpClient.close(); + try { + httpClient.close(); + } catch (IOException e) { + } } } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java index cda3c05..4fd9abf 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java @@ -139,6 +139,8 @@ public class DorisStreamLoad implements Serializable { LOG.info("abort for labelSuffix {}. start chkId {}.", labelSuffix, chkID); while (true) { try { + // TODO: According to label abort txn. Currently, it can only be aborted based on txnid, + // so we must first request a streamload based on the label to get the txnid. String label = labelGenerator.generateLabel(startChkID); HttpPutBuilder builder = new HttpPutBuilder(); builder.setUrl(loadUrlStr) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java index 230bad5..295a0be 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java @@ -17,6 +17,7 @@ package org.apache.doris.flink.sink.writer; +import org.apache.commons.lang3.StringUtils; import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; @@ -27,11 +28,10 @@ import org.apache.doris.flink.rest.models.RespContent; import org.apache.doris.flink.sink.BackendUtil; import org.apache.doris.flink.sink.DorisCommittable; import org.apache.doris.flink.sink.HttpUtil; - -import org.apache.commons.lang3.StringUtils; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.connector.sink.Sink; -import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.StatefulSink; +import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList; import org.apache.flink.util.Preconditions; @@ -42,6 +42,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Objects; @@ -56,7 +57,8 @@ import static org.apache.doris.flink.sink.LoadStatus.SUCCESS; * Doris Writer will load data to doris. * @param <IN> */ -public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable, DorisWriterState> { +public class DorisWriter<IN> implements StatefulSink.StatefulSinkWriter<IN, DorisWriterState>, + TwoPhaseCommittingSink.PrecommittingSinkWriter<IN, DorisCommittable> { private static final Logger LOG = LoggerFactory.getLogger(DorisWriter.class); private static final List<String> DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT)); private final long lastCheckpointId; @@ -78,7 +80,7 @@ public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable, DorisWr private String currentLabel; public DorisWriter(Sink.InitContext initContext, - List<DorisWriterState> state, + Collection<DorisWriterState> state, DorisRecordSerializer<IN> serializer, DorisOptions dorisOptions, DorisReadOptions dorisReadOptions, @@ -100,9 +102,11 @@ public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable, DorisWr this.executionOptions = executionOptions; this.intervalTime = executionOptions.checkInterval(); this.loading = false; + + initializeLoad(state); } - public void initializeLoad(List<DorisWriterState> state) throws IOException { + public void initializeLoad(Collection<DorisWriterState> state) { this.backendUtil = StringUtils.isNotEmpty(dorisOptions.getBenodes()) ? new BackendUtil( dorisOptions.getBenodes()) : new BackendUtil(RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG)); @@ -144,7 +148,13 @@ public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable, DorisWr } @Override - public List<DorisCommittable> prepareCommit(boolean flush) throws IOException { + public void flush(boolean flush) throws IOException, InterruptedException { + + } + + + @Override + public Collection<DorisCommittable> prepareCommit() throws IOException, InterruptedException { if(!loading){ //There is no data during the entire checkpoint period return Collections.emptyList(); @@ -246,4 +256,5 @@ public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable, DorisWr dorisStreamLoad.close(); } } + } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java index 06f2bfb..66d0227 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java @@ -27,7 +27,6 @@ import org.apache.doris.flink.sink.writer.RowDataSerializer; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.sink.DynamicTableSink; -import org.apache.flink.table.connector.sink.SinkProvider; import org.apache.flink.table.connector.sink.SinkV2Provider; import org.apache.flink.table.data.RowData; import org.apache.flink.util.Preconditions; @@ -104,7 +103,7 @@ public class DorisDynamicTableSink implements DynamicTableSink { .setDorisReadOptions(readOptions) .setDorisExecutionOptions(executionOptions) .setSerializer(serializerBuilder.build()); - return SinkProvider.of(dorisSinkBuilder.build(), sinkParallelism); + return SinkV2Provider.of(dorisSinkBuilder.build(), sinkParallelism); }else{ DorisBatchSink.Builder<RowData> dorisBatchSinkBuilder = DorisBatchSink.builder(); dorisBatchSinkBuilder.setDorisOptions(options) diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/MockCommitRequest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/MockCommitRequest.java new file mode 100644 index 0000000..d879e81 --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/MockCommitRequest.java @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.committer; + +import org.apache.flink.api.connector.sink2.Committer; + +public class MockCommitRequest <CommT> implements Committer.CommitRequest<CommT>{ + + private final CommT committable; + + public MockCommitRequest(CommT committable) { + this.committable = committable; + } + + @Override + public CommT getCommittable() { + return committable; + } + + @Override + public int getNumberOfRetries() { + return 0; + } + + @Override + public void signalFailedWithKnownReason(Throwable throwable) { + + } + + @Override + public void signalFailedWithUnknownReason(Throwable throwable) { + + } + + @Override + public void retryLater() { + + } + + @Override + public void updateAndRetryLater(CommT commT) { + + } + + @Override + public void signalAlreadyCommitted() { + + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java index 7cc2a88..794f806 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java @@ -25,7 +25,6 @@ import org.apache.doris.flink.rest.models.BackendV2.BackendRowV2; import org.apache.doris.flink.sink.DorisCommittable; import org.apache.doris.flink.sink.HttpEntityMock; import org.apache.doris.flink.sink.OptionUtils; - import org.apache.http.ProtocolVersion; import org.apache.http.StatusLine; import org.apache.http.client.methods.CloseableHttpResponse; @@ -34,16 +33,15 @@ import org.apache.http.message.BasicStatusLine; import org.junit.After; import org.junit.Before; import org.junit.Test; - +import org.mockito.MockedStatic; +import org.slf4j.Logger; import java.util.Collections; import static org.mockito.ArgumentMatchers.any; -import org.mockito.MockedStatic; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.when; -import org.slf4j.Logger; /** * Test for Doris Committer. @@ -83,7 +81,8 @@ public class TestDorisCommitter { "\"msg\": \"errCode = 2, detailMessage = transaction [2] is already visible, not pre-committed.\"\n" + "}"; this.entityMock.setValue(response); - dorisCommitter.commit(Collections.singletonList(dorisCommittable)); + final MockCommitRequest<DorisCommittable> request = new MockCommitRequest<>(dorisCommittable); + dorisCommitter.commit(Collections.singletonList(request)); } @@ -94,7 +93,8 @@ public class TestDorisCommitter { "\"msg\": \"errCode = 2, detailMessage = transaction [25] is already aborted. abort reason: User Abort\"\n" + "}"; this.entityMock.setValue(response); - dorisCommitter.commit(Collections.singletonList(dorisCommittable)); + final MockCommitRequest<DorisCommittable> request = new MockCommitRequest<>(dorisCommittable); + dorisCommitter.commit(Collections.singletonList(request)); } @After diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java index e988d6b..01e1559 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java @@ -23,7 +23,7 @@ import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.sink.DorisCommittable; import org.apache.doris.flink.sink.HttpTestUtil; import org.apache.doris.flink.sink.OptionUtils; -import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.api.connector.sink2.Sink; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.impl.client.CloseableHttpClient; import org.junit.Assert; @@ -31,6 +31,7 @@ import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.OptionalLong; @@ -67,12 +68,13 @@ public class TestDorisWriter { when(initContext.getRestoredCheckpointId()).thenReturn(OptionalLong.of(1)); DorisWriter<String> dorisWriter = new DorisWriter<String>(initContext, Collections.emptyList(), new SimpleStringSerializer(), dorisOptions, readOptions, executionOptions); dorisWriter.setDorisStreamLoad(dorisStreamLoad); - List<DorisCommittable> committableList = dorisWriter.prepareCommit(true); - + dorisWriter.write("doris,1",null); + Collection<DorisCommittable> committableList = dorisWriter.prepareCommit(); Assert.assertEquals(1, committableList.size()); - Assert.assertEquals("local:8040", committableList.get(0).getHostPort()); - Assert.assertEquals("db_test", committableList.get(0).getDb()); - Assert.assertEquals(2, committableList.get(0).getTxnID()); + DorisCommittable dorisCommittable = committableList.stream().findFirst().get(); + Assert.assertEquals("local:8040", dorisCommittable.getHostPort()); + Assert.assertEquals("test", dorisCommittable.getDb()); + Assert.assertEquals(2, dorisCommittable.getTxnID()); Assert.assertFalse(dorisWriter.isLoading()); } @@ -91,6 +93,6 @@ public class TestDorisWriter { Assert.assertEquals(1, writerStates.size()); Assert.assertEquals("doris", writerStates.get(0).getLabelPrefix()); - Assert.assertTrue(dorisWriter.isLoading()); + Assert.assertTrue(!dorisWriter.isLoading()); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org