This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new dcd395c  RATIS-1406. Add an optional API to StateMachine.DataStream 
for providing an Executor. (#503)
dcd395c is described below

commit dcd395c3b629cd2ae4092a3b04085ae986b00f60
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Fri Sep 24 09:24:18 2021 +0800

    RATIS-1406. Add an optional API to StateMachine.DataStream for providing an 
Executor. (#503)
---
 .../java/org/apache/ratis/netty/server/DataStreamManagement.java | 8 +++++++-
 .../main/java/org/apache/ratis/statemachine/StateMachine.java    | 9 +++++++++
 2 files changed, 16 insertions(+), 1 deletion(-)

diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 415ac29..0daa72b 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -84,7 +84,7 @@ public class DataStreamManagement {
 
     CompletableFuture<Long> write(ByteBuf buf, WriteOption[] options, Executor 
executor) {
       return composeAsync(writeFuture, executor,
-          n -> streamFuture.thenApplyAsync(stream -> writeTo(buf, options, 
stream), executor));
+          n -> streamFuture.thenCompose(stream -> writeToAsync(buf, options, 
stream, executor)));
     }
   }
 
@@ -243,6 +243,12 @@ public class DataStreamManagement {
     return composed;
   }
 
+  static CompletableFuture<Long> writeToAsync(ByteBuf buf, WriteOption[] 
options, DataStream stream,
+      Executor defaultExecutor) {
+    final Executor e = 
Optional.ofNullable(stream.getExecutor()).orElse(defaultExecutor);
+    return CompletableFuture.supplyAsync(() -> writeTo(buf, options, stream), 
e);
+  }
+
   static long writeTo(ByteBuf buf, WriteOption[] options, DataStream stream) {
     final DataChannel channel = stream.getDataChannel();
     long byteWritten = 0;
diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
 
b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index c0a8916..772cc71 100644
--- 
a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++ 
b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -43,6 +43,7 @@ import java.io.IOException;
 import java.nio.channels.WritableByteChannel;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.function.Function;
 
 /**
@@ -277,6 +278,14 @@ public interface StateMachine extends Closeable {
      * @return a future for the cleanup task.
      */
     CompletableFuture<?> cleanUp();
+
+    /**
+     * @return an {@link Executor} for executing the streaming tasks of this 
stream.
+     *         If the returned value is null, the default {@link Executor} 
will be used.
+     */
+    default Executor getExecutor() {
+      return null;
+    }
   }
 
   /**

Reply via email to