This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 863454270 RATIS-2294. Fix NettyClientRpc exception and timeout
handling (#1264)
863454270 is described below
commit 863454270d16d0d934b3449c570357366c7ed407
Author: slfan1989 <[email protected]>
AuthorDate: Tue May 20 01:52:24 2025 +0800
RATIS-2294. Fix NettyClientRpc exception and timeout handling (#1264)
---
.../apache/ratis/netty/client/NettyClientRpc.java | 48 +++++++++++++++++++++-
.../test/java/org/apache/ratis/RaftAsyncTests.java | 20 +++++----
.../test/java/org/apache/ratis/RaftBasicTests.java | 2 +-
3 files changed, 61 insertions(+), 9 deletions(-)
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java
index 26ac41f7d..ef34caf17 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.netty.client;
+import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.client.impl.RaftClientRpcWithProxy;
import org.apache.ratis.conf.RaftProperties;
@@ -28,23 +29,40 @@ import
org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto;
import org.apache.ratis.proto.RaftProtos.GroupManagementRequestProto;
import org.apache.ratis.proto.RaftProtos.SetConfigurationRequestProto;
import org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerRequestProto;
+import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.TimeoutExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
public class NettyClientRpc extends RaftClientRpcWithProxy<NettyRpcProxy> {
+
+ public static final Logger LOG =
LoggerFactory.getLogger(NettyClientRpc.class);
+
+ private ClientId clientId;
+ private final TimeDuration requestTimeout;
+ private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
+
public NettyClientRpc(ClientId clientId, RaftProperties properties) {
super(new NettyRpcProxy.PeerMap(clientId.toString(), properties));
+ this.clientId = clientId;
+ this.requestTimeout = RaftClientConfigKeys.Rpc.requestTimeout(properties);
}
@Override
public CompletableFuture<RaftClientReply> sendRequestAsync(RaftClientRequest
request) {
final RaftPeerId serverId = request.getServerId();
+ long callId = request.getCallId();
try {
final NettyRpcProxy proxy = getProxies().getProxy(serverId);
final RaftNettyServerRequestProto serverRequestProto =
buildRequestProto(request);
- return proxy.sendAsync(serverRequestProto).thenApply(replyProto -> {
+ final CompletableFuture<RaftClientReply> replyFuture = new
CompletableFuture<>();
+
+ proxy.sendAsync(serverRequestProto).thenApply(replyProto -> {
if (request instanceof GroupListRequest) {
return
ClientProtoUtils.toGroupListReply(replyProto.getGroupListReply());
} else if (request instanceof GroupInfoRequest) {
@@ -52,7 +70,35 @@ public class NettyClientRpc extends
RaftClientRpcWithProxy<NettyRpcProxy> {
} else {
return
ClientProtoUtils.toRaftClientReply(replyProto.getRaftClientReply());
}
+ }).whenComplete((reply, e) -> {
+ if (e == null) {
+ if (reply == null) {
+ e = new NullPointerException("Both reply==null && e==null");
+ }
+ if (e == null) {
+ e = reply.getNotLeaderException();
+ }
+ if (e == null) {
+ e = reply.getLeaderNotReadyException();
+ }
+ }
+
+ if (e != null) {
+ replyFuture.completeExceptionally(e);
+ } else {
+ replyFuture.complete(reply);
+ }
});
+
+ scheduler.onTimeout(requestTimeout, () -> {
+ if (!replyFuture.isDone()) {
+ final String s = clientId + "->" + serverId + " request #" +
+ callId + " timeout " + requestTimeout.getDuration();
+ replyFuture.completeExceptionally(new TimeoutIOException(s));
+ }
+ }, LOG, () -> "Timeout check for client request #" + callId);
+
+ return replyFuture;
} catch (Throwable e) {
return JavaUtils.completeExceptionally(e);
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index 3a760a806..a1c16df8f 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -47,6 +47,7 @@ import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.PlatformUtils;
import org.apache.ratis.util.Slf4jUtils;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.function.CheckedRunnable;
@@ -83,6 +84,10 @@ public abstract class RaftAsyncTests<CLUSTER extends
MiniRaftCluster> extends Ba
{
getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
SimpleStateMachine4Testing.class, StateMachine.class);
+ if (!PlatformUtils.LINUX) {
+ getProperties().setBoolean("raft.netty.server.use-epoll", false);
+ getProperties().setBoolean("raft.netty.client.use-epoll", false);
+ }
}
@Test
@@ -282,8 +287,8 @@ public abstract class RaftAsyncTests<CLUSTER extends
MiniRaftCluster> extends Ba
void runTestStaleReadAsync(CLUSTER cluster) throws Exception {
final int numMessages = 10;
- try (RaftClient client = cluster.createClient()) {
- RaftTestUtil.waitForLeader(cluster);
+ RaftServer.Division division = waitForLeader(cluster);
+ try (RaftClient client = cluster.createClient(division.getId())) {
// submit some messages
final List<CompletableFuture<RaftClientReply>> futures = new
ArrayList<>();
@@ -304,6 +309,7 @@ public abstract class RaftAsyncTests<CLUSTER extends
MiniRaftCluster> extends Ba
// Use a follower with the max commit index
final RaftClientReply lastWriteReply = replies.get(replies.size() - 1);
final RaftPeerId leader = lastWriteReply.getServerId();
+ Assert.assertEquals(leader, lastWriteReply.getServerId());
LOG.info("leader = " + leader);
final Collection<CommitInfoProto> commitInfos =
lastWriteReply.getCommitInfos();
LOG.info("commitInfos = " + commitInfos);
@@ -366,8 +372,8 @@ public abstract class RaftAsyncTests<CLUSTER extends
MiniRaftCluster> extends Ba
void runTestWriteAsyncCustomReplicationLevel(CLUSTER cluster) throws
Exception {
final int numMessages = 20;
- try (RaftClient client = cluster.createClient()) {
- RaftTestUtil.waitForLeader(cluster);
+ final RaftPeerId leader = waitForLeader(cluster).getId();
+ try (RaftClient client = cluster.createClient(leader)) {
// submit some messages
for (int i = 0; i < numMessages; i++) {
@@ -417,13 +423,13 @@ public abstract class RaftAsyncTests<CLUSTER extends
MiniRaftCluster> extends Ba
LOG.info("Running testAppendEntriesTimeout");
final TimeDuration oldExpiryTime =
RaftServerConfigKeys.RetryCache.expiryTime(getProperties());
RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(),
TimeDuration.valueOf(20, TimeUnit.SECONDS));
- waitForLeader(cluster);
+ final RaftPeerId leader = waitForLeader(cluster).getId();
long time = System.currentTimeMillis();
long waitTime = 5000;
try (final RaftClient client = cluster.createClient()) {
// block append requests
cluster.getServerAliveStream()
- .filter(impl -> !impl.getInfo().isLeader())
+ .filter(impl -> !impl.getInfo().isLeader() &&
!impl.getPeer().getId().equals(leader))
.map(SimpleStateMachine4Testing::get)
.forEach(SimpleStateMachine4Testing::blockWriteStateMachineData);
@@ -433,7 +439,7 @@ public abstract class RaftAsyncTests<CLUSTER extends
MiniRaftCluster> extends Ba
Assert.assertFalse(replyFuture.isDone());
// unblock append request.
cluster.getServerAliveStream()
- .filter(impl -> !impl.getInfo().isLeader())
+ .filter(impl -> !impl.getInfo().isLeader() &&
!impl.getPeer().getId().equals(leader))
.map(SimpleStateMachine4Testing::get)
.forEach(SimpleStateMachine4Testing::unblockWriteStateMachineData);
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index afb189183..2ce1706cf 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -457,7 +457,7 @@ public abstract class RaftBasicTests<CLUSTER extends
MiniRaftCluster>
static void runTestStateMachineMetrics(boolean async, MiniRaftCluster
cluster) throws Exception {
RaftServer.Division leader = waitForLeader(cluster);
- try (final RaftClient client = cluster.createClient()) {
+ try (final RaftClient client = cluster.createClient(leader.getId())) {
Gauge appliedIndexGauge = getStatemachineGaugeWithName(leader,
STATEMACHINE_APPLIED_INDEX_GAUGE);
Gauge smAppliedIndexGauge = getStatemachineGaugeWithName(leader,