This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new f2ea39a7fba branch-3.0: [improvement](info) Add F/L commitSeq and commitTs for get_lag #46769 (#47138) f2ea39a7fba is described below commit f2ea39a7fbadf9d355e57c87d2a8e3c85fa886ae Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Wed Feb 19 19:28:25 2025 +0800 branch-3.0: [improvement](info) Add F/L commitSeq and commitTs for get_lag #46769 (#47138) Cherry-picked from #46769 Co-authored-by: Uniqueyou <wangyix...@selectdb.com> --- .../org/apache/doris/binlog/BinlogLagInfo.java | 63 ++++++++++++++++++++++ .../org/apache/doris/binlog/BinlogManager.java | 3 +- .../java/org/apache/doris/binlog/BinlogUtils.java | 24 ++++++--- .../java/org/apache/doris/binlog/DBBinlog.java | 3 +- .../java/org/apache/doris/binlog/TableBinlog.java | 3 +- .../apache/doris/service/FrontendServiceImpl.java | 16 +++--- gensrc/thrift/FrontendService.thrift | 4 ++ 7 files changed, 100 insertions(+), 16 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogLagInfo.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogLagInfo.java new file mode 100644 index 00000000000..83b4181fa2f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogLagInfo.java @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.binlog; + +public class BinlogLagInfo { + private long lag; + private long firstCommitSeq; + private long lastCommitSeq; + private long firstCommitTs; + private long lastCommitTs; + + public BinlogLagInfo(long lag, long firstCommitSeq, long lastCommitSeq, long firstCommitTs, long lastCommitTs) { + this.lag = lag; + this.firstCommitSeq = firstCommitSeq; + this.lastCommitSeq = lastCommitSeq; + this.firstCommitTs = firstCommitTs; + this.lastCommitTs = lastCommitTs; + } + + public BinlogLagInfo() { + lag = 0; + firstCommitSeq = 0; + lastCommitSeq = 0; + firstCommitTs = 0; + lastCommitTs = 0; + } + + public long getLag() { + return lag; + } + + public long getFirstCommitSeq() { + return firstCommitSeq; + } + + public long getLastCommitSeq() { + return lastCommitSeq; + } + + public long getFirstCommitTs() { + return firstCommitTs; + } + + public long getLastCommitTs() { + return lastCommitTs; + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java index b15e511e75c..af4d8d408e7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java @@ -500,7 +500,7 @@ public class BinlogManager { } // get binlog by dbId, return first binlog.version > version - public Pair<TStatus, Long> getBinlogLag(long dbId, long tableId, long prevCommitSeq) { + public Pair<TStatus, BinlogLagInfo> getBinlogLag(long dbId, long tableId, long prevCommitSeq) { TStatus status = new TStatus(TStatusCode.OK); lock.readLock().lock(); try { @@ -510,7 +510,6 @@ public class BinlogManager { LOG.warn("dbBinlog not found. dbId: {}", dbId); return Pair.of(status, null); } - return dbBinlog.getBinlogLag(tableId, prevCommitSeq); } finally { lock.readLock().unlock(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java index e51bc931759..1fa930cd4d7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java @@ -61,12 +61,15 @@ public class BinlogUtils { } } - public static Pair<TStatus, Long> getBinlogLag(TreeSet<TBinlog> binlogs, long prevCommitSeq) { + public static Pair<TStatus, BinlogLagInfo> getBinlogLag(TreeSet<TBinlog> binlogs, long prevCommitSeq) { TStatus status = new TStatus(TStatusCode.OK); TBinlog firstBinlog = binlogs.first(); + TBinlog lastBinlog = binlogs.last(); if (firstBinlog.getCommitSeq() > prevCommitSeq) { - return Pair.of(status, Long.valueOf(binlogs.size())); + BinlogLagInfo lagInfo = new BinlogLagInfo(binlogs.size(), firstBinlog.getCommitSeq(), + firstBinlog.getTimestamp(), lastBinlog.getCommitSeq(), lastBinlog.getTimestamp()); + return Pair.of(status, lagInfo); } // find first binlog whose commitSeq > commitSeq @@ -75,11 +78,20 @@ public class BinlogUtils { TBinlog binlog = binlogs.higher(guard); // all prevCommitSeq <= commitSeq - if (binlog == null) { - return Pair.of(status, 0L); - } else { - return Pair.of(status, Long.valueOf(binlogs.tailSet(binlog).size())); + long lag = 0; + long lastCommitSeq = 0; + long lastCommitTs = 0; + long firstCommitSeq = 0; + long firstCommitTs = 0; + if (binlog != null) { + lag = binlogs.tailSet(binlog).size(); + firstCommitSeq = binlog.getCommitSeq(); + firstCommitTs = binlog.getTimestamp(); + lastCommitSeq = lastBinlog.getCommitSeq(); + lastCommitTs = lastBinlog.getTimestamp(); } + return Pair.of(status, new BinlogLagInfo(lag, firstCommitSeq, lastCommitSeq, + firstCommitTs, lastCommitTs)); } public static TBinlog newDummyBinlog(long dbId, long tableId) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java index 2fbee550c91..09a06660b13 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java @@ -266,7 +266,7 @@ public class DBBinlog { } } - public Pair<TStatus, Long> getBinlogLag(long tableId, long prevCommitSeq) { + public Pair<TStatus, BinlogLagInfo> getBinlogLag(long tableId, long prevCommitSeq) { TStatus status = new TStatus(TStatusCode.OK); lock.readLock().lock(); try { @@ -777,4 +777,5 @@ public class DBBinlog { } } } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java index 718fce49ee1..c4b291bca52 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java @@ -123,7 +123,7 @@ public class TableBinlog { } } - public Pair<TStatus, Long> getBinlogLag(long prevCommitSeq) { + public Pair<TStatus, BinlogLagInfo> getBinlogLag(long prevCommitSeq) { lock.readLock().lock(); try { return BinlogUtils.getBinlogLag(binlogs, prevCommitSeq); @@ -400,4 +400,5 @@ public class TableBinlog { lock.readLock().unlock(); } } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 5f0b250f593..5ab79333385 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -29,6 +29,7 @@ import org.apache.doris.analysis.TableName; import org.apache.doris.analysis.TableRef; import org.apache.doris.analysis.UserIdentity; import org.apache.doris.backup.Snapshot; +import org.apache.doris.binlog.BinlogLagInfo; import org.apache.doris.catalog.AutoIncrementGenerator; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; @@ -3373,16 +3374,19 @@ public class FrontendServiceImpl implements FrontendService.Iface { result.setStatus(new TStatus(TStatusCode.OK)); long prevCommitSeq = request.getPrevCommitSeq(); - Pair<TStatus, Long> statusLagPair = env.getBinlogManager().getBinlogLag(dbId, tableId, prevCommitSeq); - TStatus status = statusLagPair.first; + Pair<TStatus, BinlogLagInfo> binlogLagInfo = env.getBinlogManager().getBinlogLag(dbId, tableId, prevCommitSeq); + TStatus status = binlogLagInfo.first; if (status != null && status.getStatusCode() != TStatusCode.OK) { result.setStatus(status); } - Long binlogLag = statusLagPair.second; - if (binlogLag != null) { - result.setLag(binlogLag); + BinlogLagInfo lagInfo = binlogLagInfo.second; + if (lagInfo != null) { + result.setLag(lagInfo.getLag()); + result.setFirstCommitSeq(lagInfo.getFirstCommitSeq()); + result.setLastCommitSeq(lagInfo.getLastCommitSeq()); + result.setFirstBinlogTimestamp(lagInfo.getFirstCommitTs()); + result.setLastBinlogTimestamp(lagInfo.getLastCommitTs()); } - return result; } diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index f27b8bff9a4..1f91a7db1d0 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1493,6 +1493,10 @@ struct TGetBinlogLagResult { 1: optional Status.TStatus status 2: optional i64 lag 3: optional Types.TNetworkAddress master_address + 4: optional i64 first_commit_seq + 5: optional i64 last_commit_seq + 6: optional i64 first_binlog_timestamp + 7: optional i64 last_binlog_timestamp } struct TUpdateFollowerStatsCacheRequest { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org