This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 63cbd725 [Bug] Fix retry of lost tablet when tablet read fails (#502) 63cbd725 is described below commit 63cbd72550f031ad652707f114a735ba0ea49e14 Author: wudi <676366...@qq.com> AuthorDate: Fri Oct 25 10:22:09 2024 +0800 [Bug] Fix retry of lost tablet when tablet read fails (#502) --- .../apache/doris/flink/backend/BackendClient.java | 24 ++++++++++++++++++++-- .../doris/flink/rest/PartitionDefinition.java | 16 +++++++++++++++ .../source/assigners/SimpleSplitAssigner.java | 6 +++--- .../flink/source/reader/DorisValueReader.java | 10 +++++++++ .../doris/flink/source/reader/ValueReader.java | 2 +- .../doris/flink/table/DorisTableInputSplit.java | 11 ++++++++++ 6 files changed, 63 insertions(+), 6 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java index 9881ec5f..be976eff 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java @@ -139,10 +139,11 @@ public class BackendClient { open(); } TException ex = null; + TScanOpenResult result = null; for (int attempt = 0; attempt < retries; ++attempt) { logger.debug("Attempt {} to openScanner {}.", attempt, routing); try { - TScanOpenResult result = client.openScanner(openParams); + result = client.openScanner(openParams); if (result == null) { logger.warn("Open scanner result from {} is null.", routing); continue; @@ -155,12 +156,28 @@ public class BackendClient { result.getStatus().getErrorMsgs()); continue; } + logger.info( + "OpenScanner success for Doris BE '{}' with contextId '{}' for tablets '{}'.", + routing, + result.getContextId(), + openParams.tablet_ids); return result; } catch (TException e) { logger.warn("Open scanner from {} failed.", routing, e); ex = e; } } + if (result != null && (TStatusCode.OK != (result.getStatus().getStatusCode()))) { + logger.error( + ErrorMessages.DORIS_INTERNAL_FAIL_MESSAGE, + routing, + result.getStatus().getStatusCode(), + result.getStatus().getErrorMsgs()); + throw new DorisInternalException( + routing.toString(), + result.getStatus().getStatusCode(), + result.getStatus().getErrorMsgs()); + } logger.error(ErrorMessages.CONNECT_FAILED_MESSAGE, routing); throw new ConnectedFailedException(routing.toString(), ex); } @@ -244,7 +261,10 @@ public class BackendClient { logger.warn("Close scanner from {} failed.", routing, e); } } - logger.info("CloseScanner to Doris BE '{}' success.", routing); + logger.info( + "CloseScanner to Doris BE '{}' success for contextId {} ", + routing, + closeParams.getContextId()); close(); } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java index dd57a991..9f2bd07d 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java @@ -144,4 +144,20 @@ public class PartitionDefinition implements Serializable, Comparable<PartitionDe + '\'' + '}'; } + + public String toStringWithoutPlan() { + return "PartitionDefinition{" + + "database='" + + database + + '\'' + + ", table='" + + table + + '\'' + + ", beAddress='" + + beAddress + + '\'' + + ", tabletIds=" + + tabletIds + + '}'; + } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java index 7948244c..57a56f53 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/assigners/SimpleSplitAssigner.java @@ -45,9 +45,9 @@ public class SimpleSplitAssigner implements DorisSplitAssigner { } @Override - public void addSplits(Collection<DorisSourceSplit> splits) { - LOG.info("Adding splits: {}", splits); - splits.addAll(splits); + public void addSplits(Collection<DorisSourceSplit> newSplits) { + LOG.info("Adding splits: {}", newSplits); + splits.addAll(newSplits); } @Override diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java index e55e1775..2db4f798 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java @@ -182,6 +182,11 @@ public class DorisValueReader extends ValueReader implements AutoCloseable { } catch (InterruptedException e) { throw new DorisRuntimeException(e); } + } else { + LOG.info( + "Async scan finished , tablets: {}, offset: {}", + partition.getTabletIds(), + offset); } } } finally { @@ -245,6 +250,11 @@ public class DorisValueReader extends ValueReader implements AutoCloseable { eos.set(nextResult.isEos()); if (!eos.get()) { rowBatch = new RowBatch(nextResult, schema).readArrow(); + } else { + LOG.info( + "Scan finished, tablets: {}, offset: {}", + partition.getTabletIds(), + offset); } } hasNext = !eos.get(); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/ValueReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/ValueReader.java index 9e453934..9e1d091e 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/ValueReader.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/ValueReader.java @@ -34,7 +34,7 @@ public abstract class ValueReader { DorisReadOptions readOptions, Logger logger) throws DorisException { - logger.info("create reader for partition: {}", partition); + logger.info("create reader for partition: {}", partition.toStringWithoutPlan()); if (readOptions.getUseFlightSql()) { return new DorisFlightValueReader( partition, diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisTableInputSplit.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisTableInputSplit.java index 9620c575..9abe15f2 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisTableInputSplit.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisTableInputSplit.java @@ -39,4 +39,15 @@ public class DorisTableInputSplit implements InputSplit, java.io.Serializable { public int getSplitNumber() { return splitNumber; } + + @Override + public String toString() { + return String.format( + "DorisTableInputSplit: %s.%s,id=%s,be=%s,tablets=%s", + partition.getDatabase(), + partition.getTable(), + splitNumber, + partition.getBeAddress(), + partition.getTabletIds()); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org