This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 616c9b2 [improvement] fix some successfully precommitted cannot be
aborted lead to LabelAlreadyExists (#362)
616c9b2 is described below
commit 616c9b210cfa0e366557c3ff18049d52e3f44b70
Author: wudi <[email protected]>
AuthorDate: Mon Apr 15 15:49:54 2024 +0800
[improvement] fix some successfully precommitted cannot be aborted lead to
LabelAlreadyExists (#362)
---
.../exception/LabelAlreadyExistsException.java | 44 +++++++++++
.../doris/flink/rest/models/RespContent.java | 4 +
.../org/apache/doris/flink/sink/ResponseUtil.java | 8 ++
.../doris/flink/sink/batch/RecordWithMeta.java | 15 ++++
.../doris/flink/sink/writer/DorisStreamLoad.java | 78 ++++++++++++++++++--
.../doris/flink/sink/writer/DorisWriter.java | 86 ++++++++++++++++++----
.../doris/flink/sink/writer/LabelGenerator.java | 5 ++
.../apache/doris/flink/sink/TestResponseUtil.java | 21 ++++++
.../flink/sink/writer/TestDorisStreamLoad.java | 3 +-
9 files changed, 242 insertions(+), 22 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/LabelAlreadyExistsException.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/LabelAlreadyExistsException.java
new file mode 100644
index 0000000..ea86c60
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/LabelAlreadyExistsException.java
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.exception;
+
+public class LabelAlreadyExistsException extends RuntimeException {
+ public LabelAlreadyExistsException() {
+ super();
+ }
+
+ public LabelAlreadyExistsException(String message) {
+ super(message);
+ }
+
+ public LabelAlreadyExistsException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public LabelAlreadyExistsException(Throwable cause) {
+ super(cause);
+ }
+
+ protected LabelAlreadyExistsException(
+ String message,
+ Throwable cause,
+ boolean enableSuppression,
+ boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/RespContent.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/RespContent.java
index 94a1dc4..225fb5a 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/RespContent.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/RespContent.java
@@ -143,6 +143,10 @@ public class RespContent {
return commitAndPublishTimeMs;
}
+ public String getLabel() {
+ return label;
+ }
+
@Override
public String toString() {
ObjectMapper mapper = new ObjectMapper();
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/ResponseUtil.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/ResponseUtil.java
index 9eb1541..c348e73 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/ResponseUtil.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/ResponseUtil.java
@@ -27,10 +27,18 @@ public class ResponseUtil {
Pattern.compile(
"transaction \\[(\\d+)\\] is already
\\b(COMMITTED|committed|VISIBLE|visible)\\b, not pre-committed.");
+ public static final Pattern ABORTTED_PATTERN =
+ Pattern.compile(
+ "transaction \\[(\\d+)\\] is already|transaction
\\[(\\d+)\\] not found");
+
public static boolean isCommitted(String msg) {
return COMMITTED_PATTERN.matcher(msg).find();
}
+ public static boolean isAborted(String msg) {
+ return ABORTTED_PATTERN.matcher(msg).find();
+ }
+
static final Pattern COPY_COMMITTED_PATTERN =
Pattern.compile("errCode = 2, detailMessage = No files can be
copied.*");
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/RecordWithMeta.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/RecordWithMeta.java
index 936487d..e11da6a 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/RecordWithMeta.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/RecordWithMeta.java
@@ -57,4 +57,19 @@ public class RecordWithMeta {
public String getTableIdentifier() {
return this.database + "." + this.table;
}
+
+ @Override
+ public String toString() {
+ return "RecordWithMeta{"
+ + "database='"
+ + database
+ + '\''
+ + ", table='"
+ + table
+ + '\''
+ + ", record='"
+ + record
+ + '\''
+ + '}';
+ }
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
index d049a2b..cbfa8ba 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
@@ -19,6 +19,7 @@ package org.apache.doris.flink.sink.writer;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import com.fasterxml.jackson.core.type.TypeReference;
@@ -162,13 +163,18 @@ public class DorisStreamLoad implements Serializable {
*/
public void abortPreCommit(String labelPrefix, long chkID) throws
Exception {
long startChkID = chkID;
- LOG.info("abort for labelPrefix {}. start chkId {}.", labelPrefix,
chkID);
+ LOG.info(
+ "abort for labelPrefix {}, concat labelPrefix {}, start chkId
{}.",
+ labelPrefix,
+ labelGenerator.getConcatLabelPrefix(),
+ chkID);
while (true) {
try {
// TODO: According to label abort txn.
// Currently, it can only be aborted based on txnid, so we
must
// first request a streamload based on the label to get the
txnid.
String label = labelGenerator.generateTableLabel(startChkID);
+ LOG.info("start a check label {} to load.", label);
HttpPutBuilder builder = new HttpPutBuilder();
builder.setUrl(loadUrlStr)
.baseAuth(user, passwd)
@@ -210,7 +216,7 @@ public class DorisStreamLoad implements Serializable {
}
startChkID++;
} catch (Exception e) {
- LOG.warn("failed to stream load data", e);
+ LOG.warn("failed to abort labelPrefix {}", labelPrefix, e);
throw e;
}
}
@@ -320,9 +326,9 @@ public class DorisStreamLoad implements Serializable {
mapper.readValue(loadResult, new TypeReference<HashMap<String,
String>>() {});
if (!SUCCESS.equals(res.get("status"))) {
String msg = res.get("msg");
- if (msg != null && ResponseUtil.isCommitted(msg)) {
- throw new DorisException(
- "try abort committed transaction, " + "do you recover
from old savepoint?");
+ if (msg != null && ResponseUtil.isAborted(msg)) {
+ LOG.warn("Failed to abort transaction, {}", msg);
+ return;
}
LOG.error("Fail to abort transaction. txnId: {}, error: {}",
txnID, msg);
@@ -330,6 +336,68 @@ public class DorisStreamLoad implements Serializable {
}
}
+ public void abortTransactionByLabel(String label) throws Exception {
+ if (StringUtils.isNullOrWhitespaceOnly(label)) {
+ return;
+ }
+ HttpPutBuilder builder = new HttpPutBuilder();
+ builder.setUrl(abortUrlStr)
+ .baseAuth(user, passwd)
+ .addCommonHeader()
+ .setLabel(label)
+ .setEmptyEntity()
+ .abort();
+ CloseableHttpResponse response = httpClient.execute(builder.build());
+
+ int statusCode = response.getStatusLine().getStatusCode();
+ if (statusCode != 200 || response.getEntity() == null) {
+ LOG.warn("abort transaction by label response: " +
response.getStatusLine().toString());
+ throw new DorisRuntimeException(
+ "Fail to abort transaction by label " + label + " with url
" + abortUrlStr);
+ }
+
+ ObjectMapper mapper = new ObjectMapper();
+ String loadResult = EntityUtils.toString(response.getEntity());
+ LOG.info("abort Result {}", loadResult);
+ Map<String, String> res =
+ mapper.readValue(loadResult, new TypeReference<HashMap<String,
String>>() {});
+ if (!SUCCESS.equals(res.get("status"))) {
+ String msg = res.get("msg");
+ if (msg != null && ResponseUtil.isCommitted(msg)) {
+ throw new DorisException(
+ "try abort committed transaction by label, "
+ + "do you recover from old savepoint?");
+ }
+
+ LOG.error("Fail to abort transaction by label. label: {}, error:
{}", label, msg);
+ throw new DorisException("Fail to abort transaction by label, " +
loadResult);
+ }
+ }
+
+ public void abortLabelExistTransaction(RespContent respContent) {
+ if (respContent == null || respContent.getMessage() == null) {
+ return;
+ }
+ try {
+ Matcher matcher =
LABEL_EXIST_PATTERN.matcher(respContent.getMessage());
+ if (matcher.find()) {
+ long txnId = Long.parseLong(matcher.group(2));
+ abortTransaction(txnId);
+ LOG.info(
+ "Finish to abort transaction {} for label already
exist {}",
+ txnId,
+ respContent.getLabel());
+ }
+ } catch (Exception ex) {
+ LOG.error(
+ "Failed abort transaction {} for label already exist",
respContent.getLabel());
+ }
+ }
+
+ public String getCurrentLabel() {
+ return currentLabel;
+ }
+
public void close() throws IOException {
if (null != httpClient) {
try {
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index 54facc7..db6094e 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -28,11 +28,13 @@ import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.exception.DorisRuntimeException;
+import org.apache.doris.flink.exception.LabelAlreadyExistsException;
import org.apache.doris.flink.exception.StreamLoadException;
import org.apache.doris.flink.rest.models.RespContent;
import org.apache.doris.flink.sink.BackendUtil;
import org.apache.doris.flink.sink.DorisCommittable;
import org.apache.doris.flink.sink.HttpUtil;
+import org.apache.doris.flink.sink.LoadStatus;
import org.apache.doris.flink.sink.writer.serializer.DorisRecord;
import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer;
import org.slf4j.Logger;
@@ -72,7 +74,7 @@ public class DorisWriter<IN>
private final DorisOptions dorisOptions;
private final DorisReadOptions dorisReadOptions;
private final DorisExecutionOptions executionOptions;
- private final String labelPrefix;
+ private String labelPrefix;
private final int subtaskId;
private final int intervalTime;
private final DorisRecordSerializer<IN> serializer;
@@ -82,6 +84,7 @@ public class DorisWriter<IN>
private BackendUtil backendUtil;
private SinkWriterMetricGroup sinkMetricGroup;
private Map<String, DorisWriteMetrics> sinkMetricsMap = new
ConcurrentHashMap<>();
+ private volatile boolean multiTableLoad = false;
public DorisWriter(
Sink.InitContext initContext,
@@ -95,13 +98,17 @@ public class DorisWriter<IN>
.getRestoredCheckpointId()
.orElse(CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1);
this.curCheckpointId = lastCheckpointId + 1;
- LOG.info("restore checkpointId {}", lastCheckpointId);
+ LOG.info("restore from checkpointId {}", lastCheckpointId);
LOG.info("labelPrefix {}", executionOptions.getLabelPrefix());
this.labelPrefix = executionOptions.getLabelPrefix();
this.subtaskId = initContext.getSubtaskId();
this.scheduledExecutorService =
new ScheduledThreadPoolExecutor(1, new
ExecutorThreadFactory("stream-load-check"));
this.serializer = serializer;
+ if (StringUtils.isBlank(dorisOptions.getTableIdentifier())) {
+ this.multiTableLoad = true;
+ LOG.info("table.identifier is empty, multiple table writes.");
+ }
this.dorisOptions = dorisOptions;
this.dorisReadOptions = dorisReadOptions;
this.executionOptions = executionOptions;
@@ -135,6 +142,7 @@ public class DorisWriter<IN>
List<String> alreadyAborts = new ArrayList<>();
// abort label in state
for (DorisWriterState state : recoveredStates) {
+ LOG.info("try to abort txn from DorisWriterState {}",
state.toString());
// Todo: When the sink parallelism is reduced,
// the txn of the redundant task before aborting is also needed.
if (!state.getLabelPrefix().equals(labelPrefix)) {
@@ -177,7 +185,6 @@ public class DorisWriter<IN>
}
public void writeOneDorisRecord(DorisRecord record) throws IOException,
InterruptedException {
-
if (record == null || record.getRow() == null) {
// ddl or value is null
return;
@@ -221,9 +228,9 @@ public class DorisWriter<IN>
if (!globalLoading &&
loadingMap.values().stream().noneMatch(Boolean::booleanValue)) {
return Collections.emptyList();
}
+
// disable exception checker before stop load.
globalLoading = false;
-
// submit stream load http request
List<DorisCommittable> committableList = new ArrayList<>();
for (Map.Entry<String, DorisStreamLoad> streamLoader :
dorisStreamLoadMap.entrySet()) {
@@ -240,13 +247,21 @@ public class DorisWriter<IN>
dorisWriteMetrics.flush(respContent);
}
if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
- String errMsg =
- String.format(
- "table %s stream load error: %s, see more in
%s",
- tableIdentifier,
- respContent.getMessage(),
- respContent.getErrorURL());
- throw new DorisRuntimeException(errMsg);
+ if (executionOptions.enabled2PC()
+ &&
LoadStatus.LABEL_ALREADY_EXIST.equals(respContent.getStatus())) {
+ LOG.info("try to abort {} cause Label Already Exists",
respContent.getLabel());
+ dorisStreamLoad.abortLabelExistTransaction(respContent);
+ throw new LabelAlreadyExistsException("Exist label abort
finished, retry");
+ } else {
+ String errMsg =
+ String.format(
+ "table %s stream load error: %s, see more
in %s",
+ tableIdentifier,
+ respContent.getMessage(),
+ respContent.getErrorURL());
+ LOG.error("Failed to load, {}", errMsg);
+ throw new DorisRuntimeException(errMsg);
+ }
}
if (executionOptions.enabled2PC()) {
long txnId = respContent.getTxnId();
@@ -255,11 +270,32 @@ public class DorisWriter<IN>
dorisStreamLoad.getHostPort(),
dorisStreamLoad.getDb(), txnId));
}
}
+
// clean loadingMap
loadingMap.clear();
return committableList;
}
+ private void abortPossibleSuccessfulTransaction() {
+ // In the case of multi-table writing, if a new table is added during
the period
+ // (there is no streamloader for this table in the previous Checkpoint
state),
+ // in the precommit phase, if some tables succeed and others fail,
+ // the txn of successful precommit cannot be aborted.
+ if (executionOptions.enabled2PC() && multiTableLoad) {
+ LOG.info("Try to abort may have successfully preCommitted label.");
+ for (Map.Entry<String, DorisStreamLoad> entry :
dorisStreamLoadMap.entrySet()) {
+ DorisStreamLoad abortLoader = entry.getValue();
+ try {
+
abortLoader.abortTransactionByLabel(abortLoader.getCurrentLabel());
+ } catch (Exception ex) {
+ LOG.warn(
+ "Skip abort transaction failed by label, reason is
{}.",
+ ex.getMessage());
+ }
+ }
+ }
+ }
+
@Override
public List<DorisWriterState> snapshotState(long checkpointId) throws
IOException {
List<DorisWriterState> writerStates = new ArrayList<>();
@@ -302,8 +338,12 @@ public class DorisWriter<IN>
/** Check the streamload http request regularly. */
private void checkDone() {
- for (Map.Entry<String, DorisStreamLoad> streamLoadMap :
dorisStreamLoadMap.entrySet()) {
- checkAllDone(streamLoadMap.getKey(), streamLoadMap.getValue());
+ // todo: When writing to multiple tables,
+ // the checkdone thread may cause problems. Disable it first.
+ if (!multiTableLoad) {
+ for (Map.Entry<String, DorisStreamLoad> streamLoadMap :
dorisStreamLoadMap.entrySet()) {
+ checkAllDone(streamLoadMap.getKey(), streamLoadMap.getValue());
+ }
}
}
@@ -347,16 +387,29 @@ public class DorisWriter<IN>
RespContent content =
dorisStreamLoad.handlePreCommitResponse(
dorisStreamLoad.getPendingLoadFuture().get());
- errorMsg = content.getMessage();
+ if (executionOptions.enabled2PC()
+ &&
LoadStatus.LABEL_ALREADY_EXIST.equals(content.getStatus())) {
+ LOG.info(
+ "try to abort {} cause Label Already
Exists",
+ content.getLabel());
+
dorisStreamLoad.abortLabelExistTransaction(content);
+ errorMsg = "Exist label abort finished, retry";
+ LOG.info(errorMsg);
+ return;
+ } else {
+ errorMsg = content.getMessage();
+ loadException = new StreamLoadException(errorMsg);
+ }
} catch (Exception e) {
errorMsg = e.getMessage();
+ loadException = new DorisRuntimeException(e);
}
- loadException = new StreamLoadException(errorMsg);
LOG.error(
"table {} stream load finished unexpectedly,
interrupt worker thread! {}",
tableIdentifier,
errorMsg);
+
// set the executor thread interrupted in case blocking in
write data.
executorThread.interrupt();
}
@@ -391,6 +444,9 @@ public class DorisWriter<IN>
if (scheduledExecutorService != null) {
scheduledExecutorService.shutdownNow();
}
+ LOG.info("Try to abort txn before closing.");
+ abortPossibleSuccessfulTransaction();
+
if (dorisStreamLoadMap != null && !dorisStreamLoadMap.isEmpty()) {
for (DorisStreamLoad dorisStreamLoad :
dorisStreamLoadMap.values()) {
dorisStreamLoad.close();
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
index 91eaf9e..5f3a8d6 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
@@ -69,4 +69,9 @@ public class LabelGenerator {
public String generateCopyBatchLabel(String table, long chkId, int
fileNum) {
return String.format("%s_%s_%s_%s_%s", labelPrefix, table, subtaskId,
chkId, fileNum);
}
+
+ public String getConcatLabelPrefix() {
+ String concatPrefix = String.format("%s_%s_%s", labelPrefix,
tableIdentifier, subtaskId);
+ return concatPrefix;
+ }
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/TestResponseUtil.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/TestResponseUtil.java
index 1225424..f8d36a7 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/TestResponseUtil.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/TestResponseUtil.java
@@ -44,4 +44,25 @@ public class TestResponseUtil {
Assert.assertFalse(ResponseUtil.isCommitted(commitMsg));
Assert.assertFalse(ResponseUtil.isCommitted(abortedMsg));
}
+
+ @Test
+ public void testIsAborted() {
+ String notFoundMsg = "errCode = 2, detailMessage = transaction [2] not
found";
+ String alreadyAbort =
+ "errCode = 2, detailMessage = transaction [2] is already
aborted. abort reason: User Abort";
+ String systemAlreadAbort =
+ "errCode = 2, detailMessage = transaction [2] is already
aborted. abort reason: timeout by txn manager";
+ String alreadCommit =
+ "errCode = 2, detailMessage = transaction [2] is already
COMMITTED, could not abort.";
+ String alreadVISIBLE =
+ "errCode = 2, detailMessage = transaction [2] is already
VISIBLE, could not abort.";
+ String errormsg =
+ "tCouldn't open transport for :0 (Could not resolve host for
client socket.";
+ Assert.assertTrue(ResponseUtil.isAborted(notFoundMsg));
+ Assert.assertTrue(ResponseUtil.isAborted(alreadyAbort));
+ Assert.assertTrue(ResponseUtil.isAborted(systemAlreadAbort));
+ Assert.assertTrue(ResponseUtil.isAborted(alreadCommit));
+ Assert.assertTrue(ResponseUtil.isAborted(alreadVISIBLE));
+ Assert.assertFalse(ResponseUtil.isAborted(errormsg));
+ }
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
index 1352a26..94beae8 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
@@ -20,7 +20,6 @@ package org.apache.doris.flink.sink.writer;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
-import org.apache.doris.flink.exception.DorisException;
import org.apache.doris.flink.sink.HttpTestUtil;
import org.apache.doris.flink.sink.OptionUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
@@ -89,7 +88,7 @@ public class TestDorisStreamLoad {
dorisStreamLoad.abortTransaction(anyLong());
}
- @Test(expected = DorisException.class)
+ @Test(expected = Exception.class)
public void testAbortTransactionFailed() throws Exception {
CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
CloseableHttpResponse abortFailedResponse =
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]