This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 3d9fd1814 RATIS-2497. Pass server to the dummy watch request in
OrderedAsync (#1438)
3d9fd1814 is described below
commit 3d9fd1814299c45e36f3bf072c09c25020812626
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Thu Apr 23 06:04:06 2026 -0700
RATIS-2497. Pass server to the dummy watch request in OrderedAsync (#1438)
---
.../org/apache/ratis/client/impl/AsyncImpl.java | 2 +-
.../org/apache/ratis/client/impl/OrderedAsync.java | 6 +-
.../apache/ratis/client/impl/RaftClientImpl.java | 9 +--
.../java/org/apache/ratis/protocol/Message.java | 5 +-
.../java/org/apache/ratis/util/MemoizedBase.java | 73 ++++++++++++++++++++++
.../apache/ratis/util/MemoizedCheckedSupplier.java | 31 ++-------
.../org/apache/ratis/util/MemoizedFunction.java | 69 ++++++++++++++++++++
.../org/apache/ratis/util/MemoizedSupplier.java | 32 ++--------
.../java/org/apache/ratis/util/StringUtils.java | 17 +++++
.../apache/ratis/server/impl/RaftServerImpl.java | 7 ++-
.../org/apache/ratis/LinearizableReadTests.java | 16 ++---
.../ratis/client/impl/RaftClientTestUtil.java | 3 +-
.../apache/ratis/server/impl/MiniRaftCluster.java | 4 ++
13 files changed, 204 insertions(+), 70 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
index 973b0db0c..01329fa7d 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
@@ -40,7 +40,7 @@ class AsyncImpl implements AsyncRpcApi {
CompletableFuture<RaftClientReply> send(
RaftClientRequest.Type type, Message message, RaftPeerId server) {
return TraceClient.asyncSend(
- () -> client.getOrderedAsync().send(type, message, server), type,
server);
+ () -> client.getOrderedAsync(server).send(type, message, server),
type, server);
}
@Override
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
index ecf4db3dc..791adc898 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
@@ -58,6 +58,8 @@ import java.util.function.LongFunction;
public final class OrderedAsync {
public static final Logger LOG = LoggerFactory.getLogger(OrderedAsync.class);
+ public static final Message DUMMY = Message.valueOf("DUMMY");
+
private enum BatchLogKey implements BatchLogger.Key {
SEND_REQUEST_EXCEPTION
}
@@ -116,12 +118,12 @@ public final class OrderedAsync {
}
}
- static OrderedAsync newInstance(RaftClientImpl client, RaftProperties
properties) {
+ static OrderedAsync newInstance(RaftClientImpl client, RaftPeerId server,
RaftProperties properties) {
final OrderedAsync ordered = new OrderedAsync(client, properties);
// send a dummy watch request to establish the connection
// TODO: this is a work around, it is better to fix the underlying RPC
implementation
if (RaftClientConfigKeys.Async.Experimental.sendDummyRequest(properties)) {
- ordered.send(RaftClientRequest.watchRequestType(), null, null);
+ ordered.send(RaftClientRequest.watchRequestType(), DUMMY, server);
}
return ordered;
}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 27ae2e6ba..f24360f62 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -47,6 +47,7 @@ import org.apache.ratis.trace.TraceUtils;
import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.MemoizedFunction;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.TimeDuration;
@@ -180,7 +181,7 @@ public final class RaftClientImpl implements RaftClient {
private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
- private final Supplier<OrderedAsync> orderedAsync;
+ private final MemoizedFunction<RaftPeerId, OrderedAsync> orderedAsync;
private final Supplier<AsyncImpl> asyncApi;
private final Supplier<BlockingImpl> blockingApi;
private final Supplier<MessageStreamImpl> messageStreamApi;
@@ -209,7 +210,7 @@ public final class RaftClientImpl implements RaftClient {
clientRpc.addRaftPeers(group.getPeers());
this.clientRpc = clientRpc;
- this.orderedAsync = JavaUtils.memoize(() -> OrderedAsync.newInstance(this,
properties));
+ this.orderedAsync = MemoizedFunction.valueOf(server ->
OrderedAsync.newInstance(this, server, properties));
this.messageStreamApi = JavaUtils.memoize(() ->
MessageStreamImpl.newInstance(this, properties));
this.asyncApi = JavaUtils.memoize(() -> new AsyncImpl(this));
this.blockingApi = JavaUtils.memoize(() -> new BlockingImpl(this));
@@ -277,8 +278,8 @@ public final class RaftClientImpl implements RaftClient {
return scheduler;
}
- OrderedAsync getOrderedAsync() {
- return orderedAsync.get();
+ OrderedAsync getOrderedAsync(RaftPeerId server) {
+ return orderedAsync.apply(server);
}
RaftClientRequest newRaftClientRequest(
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java
index e7ea97ca4..55fcd064d 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java
@@ -19,6 +19,7 @@ package org.apache.ratis.protocol;
import org.apache.ratis.thirdparty.com.google.protobuf.AbstractMessage;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.com.google.protobuf.TextFormat;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.StringUtils;
@@ -47,11 +48,11 @@ public interface Message {
}
static Message valueOf(AbstractMessage abstractMessage) {
- return valueOf(abstractMessage.toByteString(), abstractMessage::toString);
+ return valueOf(abstractMessage.toByteString(), () ->
TextFormat.shortDebugString(abstractMessage));
}
static Message valueOf(ByteString bytes) {
- return valueOf(bytes, () -> "Message:" +
StringUtils.bytes2HexShortString(bytes));
+ return valueOf(bytes, () -> "Message:" +
StringUtils.bytes2ShortString(bytes));
}
static Message valueOf(String string) {
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/MemoizedBase.java
b/ratis-common/src/main/java/org/apache/ratis/util/MemoizedBase.java
new file mode 100644
index 000000000..1c78c05cf
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/MemoizedBase.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.apache.ratis.util.function.CheckedSupplier;
+
+import java.util.Objects;
+
+/**
+ * This is the base class for the memoized subclass such as
+ * {@link MemoizedSupplier}, {@link MemoizedFunction}, {@link
MemoizedCheckedSupplier}, etc,
+ * The subclasses provide its own method to retrieve the value,
+ * such as {@link MemoizedSupplier#get()} and {@link
MemoizedFunction#apply(Object)}.
+ * The subclass method returns a value by invoking its initializer once at the
first call
+ * and then keeps returning the same value for the subsequent calls.
+ * <p>
+ * All the subclasses are thread safe.
+ *
+ * @param <RETURN> The value type.
+ * @param <THROW> The throwable type of the initializer.
+ */
+abstract class MemoizedBase<RETURN, THROW extends Throwable> {
+ @SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type
+ private volatile RETURN value = null;
+
+ final RETURN init(CheckedSupplier<RETURN, THROW> initializer) throws THROW {
+ final RETURN initialized = value;
+ if (initialized != null) {
+ return initialized;
+ }
+
+ synchronized (this) {
+ if (value == null) {
+ value = initializer.get();
+ Objects.requireNonNull(value, "initializer.get() returns null");
+ }
+ return value;
+ }
+ }
+
+ /** @return is the object initialized? */
+ public final boolean isInitialized() {
+ return value != null;
+ }
+
+ /**
+ * @return the value, which must be already initialized.
+ * @throws NullPointerException if the value is uninitialized.
+ */
+ public RETURN getInitializedValue() {
+ return Objects.requireNonNull(value, "Uninitialized: value == null");
+ }
+
+ @Override
+ public String toString() {
+ return value != null ? "Memoized:" + value : "Uninitialized";
+ }
+}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/util/MemoizedCheckedSupplier.java
b/ratis-common/src/main/java/org/apache/ratis/util/MemoizedCheckedSupplier.java
index 0e9ae44fa..8d4cf9cb8 100644
---
a/ratis-common/src/main/java/org/apache/ratis/util/MemoizedCheckedSupplier.java
+++
b/ratis-common/src/main/java/org/apache/ratis/util/MemoizedCheckedSupplier.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -25,13 +25,14 @@ import java.util.Objects;
* A memoized supplier is a {@link CheckedSupplier}
* which gets a value by invoking its initializer once.
* and then keeps returning the same value as its supplied results.
- *
+ * <p>
* This class is thread safe.
*
* @param <RETURN> The return type of the supplier.
* @param <THROW> The throwable type of the supplier.
*/
public final class MemoizedCheckedSupplier<RETURN, THROW extends Throwable>
+ extends MemoizedBase<RETURN, THROW>
implements CheckedSupplier<RETURN, THROW> {
/**
* @param supplier to supply at most one non-null value.
@@ -45,9 +46,6 @@ public final class MemoizedCheckedSupplier<RETURN, THROW
extends Throwable>
private final CheckedSupplier<RETURN, THROW> initializer;
- @SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type
- private volatile RETURN value = null;
-
/**
* Create a memoized supplier.
* @param initializer to supply at most one non-null value.
@@ -60,16 +58,7 @@ public final class MemoizedCheckedSupplier<RETURN, THROW
extends Throwable>
/** @return the lazily initialized object. */
@Override
public RETURN get() throws THROW {
- RETURN v = value;
- if (v == null) {
- synchronized (this) {
- v = value;
- if (v == null) {
- v = value = Objects.requireNonNull(initializer.get(),
"initializer.get() returns null");
- }
- }
- }
- return v;
+ return init(initializer);
}
/**
@@ -77,16 +66,6 @@ public final class MemoizedCheckedSupplier<RETURN, THROW
extends Throwable>
* @throws NullPointerException if the object is uninitialized.
*/
public RETURN getUnchecked() {
- return Objects.requireNonNull(value, "value == null");
- }
-
- /** @return is the object initialized? */
- public boolean isInitialized() {
- return value != null;
- }
-
- @Override
- public String toString() {
- return isInitialized()? "Memoized:" + value: "UNINITIALIZED";
+ return getInitializedValue();
}
}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/util/MemoizedFunction.java
b/ratis-common/src/main/java/org/apache/ratis/util/MemoizedFunction.java
new file mode 100644
index 000000000..13272f0f6
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/MemoizedFunction.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import java.util.Objects;
+import java.util.function.Function;
+
+/**
+ * A memoized function is a {@link Function}
+ * which returns a value by invoking its initializer once
+ * and then keeps returning the same value as its result.
+ * <p>
+ * This class is similar to {@link MemoizedSupplier} except that
+ * the initializer takes a parameter.
+ * <p>
+ * This class is thread safe.
+ *
+ * @param <RETURN> The function result type.
+ */
+public final class MemoizedFunction<PARAMETER, RETURN>
+ extends MemoizedBase<RETURN, RuntimeException>
+ implements Function<PARAMETER, RETURN> {
+ /**
+ * @param function to supply at most one non-null value.
+ * @return a {@link MemoizedFunction} with the given function.
+ */
+ public static <P, R> MemoizedFunction<P, R> valueOf(Function<P, R> function)
{
+ return function instanceof MemoizedFunction ?
+ (MemoizedFunction<P, R>) function : new MemoizedFunction<>(function);
+ }
+
+ private final Function<PARAMETER, RETURN> initializer;
+
+ /**
+ * Create a memoized function.
+ * @param initializer to supply at most one non-null value.
+ */
+ private MemoizedFunction(Function<PARAMETER, RETURN> initializer) {
+ Objects.requireNonNull(initializer, "initializer == null");
+ this.initializer = initializer;
+ }
+
+ /**
+ * @param parameter for passing to the initializer.
+ * Since the returned function is memoized, the parameter
is only used at the first call.
+ * The parameter is ignored in the subsequent calls.
+ *
+ * @return the lazily initialized object.
+ */
+ @Override
+ public RETURN apply(PARAMETER parameter) {
+ return init(() -> initializer.apply(parameter));
+ }
+}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/util/MemoizedSupplier.java
b/ratis-common/src/main/java/org/apache/ratis/util/MemoizedSupplier.java
index 5c2754a30..588f7ff79 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/MemoizedSupplier.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/MemoizedSupplier.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -24,12 +24,14 @@ import java.util.function.Supplier;
* A memoized supplier is a {@link Supplier}
* which gets a value by invoking its initializer once
* and then keeps returning the same value as its supplied results.
- *
+ * <p>
* This class is thread safe.
*
* @param <T> The supplier result type.
*/
-public final class MemoizedSupplier<T> implements Supplier<T> {
+public final class MemoizedSupplier<T>
+ extends MemoizedBase<T, RuntimeException>
+ implements Supplier<T> {
/**
* @param supplier to supply at most one non-null value.
* @return a {@link MemoizedSupplier} with the given supplier.
@@ -40,8 +42,6 @@ public final class MemoizedSupplier<T> implements Supplier<T>
{
}
private final Supplier<T> initializer;
- @SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type
- private volatile T value = null;
/**
* Create a memoized supplier.
@@ -55,26 +55,6 @@ public final class MemoizedSupplier<T> implements
Supplier<T> {
/** @return the lazily initialized object. */
@Override
public T get() {
- T v = value;
- if (v == null) {
- synchronized (this) {
- v = value;
- if (v == null) {
- v = value = Objects.requireNonNull(initializer.get(),
- "initializer.get() returns null");
- }
- }
- }
- return v;
- }
-
- /** @return is the object initialized? */
- public boolean isInitialized() {
- return value != null;
- }
-
- @Override
- public String toString() {
- return isInitialized()? "Memoized:" + get(): "UNINITIALIZED";
+ return init(initializer::get);
}
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
b/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
index 0f3266e65..50a256e5f 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
@@ -81,6 +81,23 @@ public final class StringUtils {
return String.format(Locale.ENGLISH, format, objects);
}
+ public static String bytes2ShortString(ByteString bytes) {
+ return bytes.isValidUtf8() ? bytes2ShortUtf8String(bytes) :
StringUtils.bytes2HexShortString(bytes);
+ }
+
+ public static String bytes2ShortUtf8String(ByteString bytes) {
+ final String utf8 = bytes.toStringUtf8();
+ if (utf8.isEmpty()) {
+ return "<EMPTY_UTF8>";
+ }
+ final int length = utf8.length();
+ if (length <= 10) {
+ return utf8;
+ }
+ // return only the first 10 characters
+ return utf8.substring(0, 10) + "...(length=" + length + ")";
+ }
+
public static String bytes2HexShortString(ByteString bytes) {
final int size = bytes.size();
if (size == 0) {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index a9c80d000..af7fd27a1 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -18,6 +18,7 @@
package org.apache.ratis.server.impl;
import org.apache.ratis.client.impl.ClientProtoUtils;
+import org.apache.ratis.client.impl.OrderedAsync;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.metrics.Timekeeper;
import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
@@ -1064,6 +1065,10 @@ class RaftServerImpl implements RaftServer.Division,
}
private CompletableFuture<RaftClientReply> watchAsync(RaftClientRequest
request) {
+ if
(OrderedAsync.DUMMY.getContent().equals(request.getMessage().getContent())) {
+ return
CompletableFuture.completedFuture(RaftClientReply.newBuilder().setRequest(request).build());
+ }
+
final CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
if (reply != null) {
return reply;
@@ -1078,7 +1083,7 @@ class RaftServerImpl implements RaftServer.Division,
private CompletableFuture<RaftClientReply> staleReadAsync(RaftClientRequest
request) {
final long minIndex = request.getType().getStaleRead().getMinIndex();
final long commitIndex = state.getLog().getLastCommittedIndex();
- LOG.debug("{}: minIndex={}, commitIndex={}", getMemberId(), minIndex,
commitIndex);
+ LOG.debug("{}: minIndex={}, commitIndex={} from {}", getMemberId(),
minIndex, commitIndex, request.getClientId());
if (commitIndex < minIndex) {
final StaleReadException e = new StaleReadException(
"Unable to serve stale-read due to server commit index = " +
commitIndex + " < min = " + minIndex);
diff --git
a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java
b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java
index dd536508c..09781b546 100644
--- a/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java
@@ -88,10 +88,9 @@ public abstract class LinearizableReadTests<CLUSTER extends
MiniRaftCluster>
RaftServerConfigKeys.Read.setOption(p, LINEARIZABLE);
RaftServerConfigKeys.Read.setLeaderLeaseEnabled(p, isLeaderLeaseEnabled());
RaftServerConfigKeys.Read.ReadIndex.setType(p, readIndexType());
- // Disable dummy request since currently the request is implemented as a
watch request
- // which can cause follower client to trigger failover to leader which
will cause the
- // all reads to be sent to the leader, making the follower read moot.
- RaftClientConfigKeys.Async.Experimental.setSendDummyRequest(p, false);
+
+ // Enable dummy request so linearizable-read tests exercise the default
ordered-async bootstrap path.
+ RaftClientConfigKeys.Async.Experimental.setSendDummyRequest(p, true);
}
@Test
@@ -151,13 +150,16 @@ public abstract class LinearizableReadTests<CLUSTER
extends MiniRaftCluster>
final int n = 100;
final List<Reply> f0Replies = new ArrayList<>(n);
final List<Reply> f1Replies = new ArrayList<>(n);
- try (RaftClient client = cluster.createClient(leaderId)) {
+ try (RaftClient client = cluster.createClient(leaderId);
+ RaftClient c0 = cluster.createClient(f0);
+ RaftClient c1 = cluster.createClient(f1);
+ ) {
for (int i = 0; i < n; i++) {
final int count = i + 1;
assertReplyExact(count, client.io().send(INCREMENT));
- f0Replies.add(new Reply(count, client.async().sendReadOnly(QUERY,
f0)));
- f1Replies.add(new Reply(count, client.async().sendReadOnly(QUERY,
f1)));
+ f0Replies.add(new Reply(count, c0.async().sendReadOnly(QUERY, f0)));
+ f1Replies.add(new Reply(count, c1.async().sendReadOnly(QUERY, f1)));
}
for (int i = 0; i < n; i++) {
diff --git
a/ratis-server/src/test/java/org/apache/ratis/client/impl/RaftClientTestUtil.java
b/ratis-server/src/test/java/org/apache/ratis/client/impl/RaftClientTestUtil.java
index d90b0cc53..886879a47 100644
---
a/ratis-server/src/test/java/org/apache/ratis/client/impl/RaftClientTestUtil.java
+++
b/ratis-server/src/test/java/org/apache/ratis/client/impl/RaftClientTestUtil.java
@@ -29,7 +29,8 @@ import org.apache.ratis.rpc.CallId;
/** Interface for testing raft client. */
public interface RaftClientTestUtil {
static void assertAsyncRequestSemaphore(RaftClient client, int
expectedAvailablePermits, int expectedQueueLength) {
- ((RaftClientImpl)
client).getOrderedAsync().assertRequestSemaphore(expectedAvailablePermits,
expectedQueueLength);
+ ((RaftClientImpl) client).getOrderedAsync(null)
+ .assertRequestSemaphore(expectedAvailablePermits, expectedQueueLength);
}
static ClientInvocationId getClientInvocationId(RaftClient client) {
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index 87ffa8d19..825ae8f55 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -67,6 +67,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Timer;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -93,6 +94,8 @@ public abstract class MiniRaftCluster implements Closeable {
TimeDuration.valueOf(100, TimeUnit.MILLISECONDS);
static final AtomicInteger THREAD_COUNT = new AtomicInteger(0);
+ static final AtomicInteger CLIENT_ID = new AtomicInteger(0);
+
public abstract static class Factory<CLUSTER extends MiniRaftCluster> {
public interface Get<CLUSTER extends MiniRaftCluster> {
Supplier<RaftProperties> PROPERTIES =
JavaUtils.memoize(RaftProperties::new);
@@ -747,6 +750,7 @@ public abstract class MiniRaftCluster implements Closeable {
public RaftClient createClient(RaftPeerId leaderId, RaftGroup raftGroup,
RetryPolicy retryPolicy, RaftPeer primaryServer) {
RaftClient.Builder builder = RaftClient.newBuilder()
+ .setClientId(ClientId.valueOf(new UUID(0,
CLIENT_ID.incrementAndGet())))
.setRaftGroup(raftGroup)
.setLeaderId(leaderId)
.setProperties(properties)