This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new dcd395c RATIS-1406. Add an optional API to StateMachine.DataStream
for providing an Executor. (#503)
dcd395c is described below
commit dcd395c3b629cd2ae4092a3b04085ae986b00f60
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Fri Sep 24 09:24:18 2021 +0800
RATIS-1406. Add an optional API to StateMachine.DataStream for providing an
Executor. (#503)
---
.../java/org/apache/ratis/netty/server/DataStreamManagement.java | 8 +++++++-
.../main/java/org/apache/ratis/statemachine/StateMachine.java | 9 +++++++++
2 files changed, 16 insertions(+), 1 deletion(-)
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 415ac29..0daa72b 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -84,7 +84,7 @@ public class DataStreamManagement {
CompletableFuture<Long> write(ByteBuf buf, WriteOption[] options, Executor
executor) {
return composeAsync(writeFuture, executor,
- n -> streamFuture.thenApplyAsync(stream -> writeTo(buf, options,
stream), executor));
+ n -> streamFuture.thenCompose(stream -> writeToAsync(buf, options,
stream, executor)));
}
}
@@ -243,6 +243,12 @@ public class DataStreamManagement {
return composed;
}
+ static CompletableFuture<Long> writeToAsync(ByteBuf buf, WriteOption[]
options, DataStream stream,
+ Executor defaultExecutor) {
+ final Executor e =
Optional.ofNullable(stream.getExecutor()).orElse(defaultExecutor);
+ return CompletableFuture.supplyAsync(() -> writeTo(buf, options, stream),
e);
+ }
+
static long writeTo(ByteBuf buf, WriteOption[] options, DataStream stream) {
final DataChannel channel = stream.getDataChannel();
long byteWritten = 0;
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index c0a8916..772cc71 100644
---
a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++
b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -43,6 +43,7 @@ import java.io.IOException;
import java.nio.channels.WritableByteChannel;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import java.util.function.Function;
/**
@@ -277,6 +278,14 @@ public interface StateMachine extends Closeable {
* @return a future for the cleanup task.
*/
CompletableFuture<?> cleanUp();
+
+ /**
+ * @return an {@link Executor} for executing the streaming tasks of this
stream.
+ * If the returned value is null, the default {@link Executor}
will be used.
+ */
+ default Executor getExecutor() {
+ return null;
+ }
}
/**