This is an automated email from the ASF dual-hosted git repository.

oxsean pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new a37bc2c368 feat: Enhance Server-Sent Events support (#15387)
a37bc2c368 is described below

commit a37bc2c36823a1ab1cc49888524c161b6cd6e643
Author: Sean Yang <[email protected]>
AuthorDate: Thu May 15 21:37:43 2025 +0800

    feat: Enhance Server-Sent Events support (#15387)
---
 .../springboot/demo/servlet/GreeterService.java    |   3 +
 .../demo/servlet/GreeterServiceImpl.java           |  22 ++
 .../support/jaxrs/FallbackArgumentResolver.java    |   3 +
 .../support/spring/FallbackArgumentResolver.java   |   3 +
 .../dubbo/remoting/http12/HttpConstants.java       |   5 -
 .../remoting/http12/message/ServerSentEvent.java   | 227 +++++++++++++++++++++
 .../http12/message/ServerSentEventEncoder.java     | 108 ++++++++++
 .../rpc/protocol/tri/ReflectionPackableMethod.java |   2 +-
 .../DefaultHttp11ServerTransportListener.java      |   1 -
 .../h12/http1}/Http1SseServerChannelObserver.java  |  27 +--
 .../h12/http2/Http2SseServerChannelObserver.java   |  24 +--
 .../tri/rest/mapping/meta/ParameterMeta.java       |   5 +
 .../support/basic/FallbackArgumentResolver.java    |   4 +
 .../dubbo/rpc/protocol/tri/test/TestResponse.java  |  11 +-
 14 files changed, 401 insertions(+), 44 deletions(-)

diff --git 
a/dubbo-demo/dubbo-demo-spring-boot/dubbo-demo-spring-boot-servlet/src/main/java/org/apache/dubbo/springboot/demo/servlet/GreeterService.java
 
b/dubbo-demo/dubbo-demo-spring-boot/dubbo-demo-spring-boot-servlet/src/main/java/org/apache/dubbo/springboot/demo/servlet/GreeterService.java
index 1c3345ee0b..96248f07c2 100644
--- 
a/dubbo-demo/dubbo-demo-spring-boot/dubbo-demo-spring-boot-servlet/src/main/java/org/apache/dubbo/springboot/demo/servlet/GreeterService.java
+++ 
b/dubbo-demo/dubbo-demo-spring-boot/dubbo-demo-spring-boot-servlet/src/main/java/org/apache/dubbo/springboot/demo/servlet/GreeterService.java
@@ -17,6 +17,7 @@
 package org.apache.dubbo.springboot.demo.servlet;
 
 import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.remoting.http12.message.ServerSentEvent;
 
 import java.util.concurrent.CompletableFuture;
 
@@ -39,6 +40,8 @@ public interface GreeterService {
 
     void sayHelloServerStreamNoParameter(StreamObserver<HelloReply> 
responseObserver);
 
+    void sayHelloServerStreamSSE(StreamObserver<ServerSentEvent<HelloReply>> 
responseObserver);
+
     /**
      * Sends greetings with bi streaming
      */
diff --git 
a/dubbo-demo/dubbo-demo-spring-boot/dubbo-demo-spring-boot-servlet/src/main/java/org/apache/dubbo/springboot/demo/servlet/GreeterServiceImpl.java
 
b/dubbo-demo/dubbo-demo-spring-boot/dubbo-demo-spring-boot-servlet/src/main/java/org/apache/dubbo/springboot/demo/servlet/GreeterServiceImpl.java
index 8626e9aa83..264d6fd6da 100644
--- 
a/dubbo-demo/dubbo-demo-spring-boot/dubbo-demo-spring-boot-servlet/src/main/java/org/apache/dubbo/springboot/demo/servlet/GreeterServiceImpl.java
+++ 
b/dubbo-demo/dubbo-demo-spring-boot/dubbo-demo-spring-boot-servlet/src/main/java/org/apache/dubbo/springboot/demo/servlet/GreeterServiceImpl.java
@@ -18,7 +18,9 @@ package org.apache.dubbo.springboot.demo.servlet;
 
 import org.apache.dubbo.common.stream.StreamObserver;
 import org.apache.dubbo.config.annotation.DubboService;
+import org.apache.dubbo.remoting.http12.message.ServerSentEvent;
 
+import java.time.Duration;
 import java.util.concurrent.CompletableFuture;
 
 import org.slf4j.Logger;
@@ -63,6 +65,26 @@ public class GreeterServiceImpl implements GreeterService {
         responseObserver.onCompleted();
     }
 
+    @Override
+    public void 
sayHelloServerStreamSSE(StreamObserver<ServerSentEvent<HelloReply>> 
responseObserver) {
+        LOGGER.info("Received sayHelloServerStreamSSE request");
+        responseObserver.onNext(ServerSentEvent.<HelloReply>builder()
+                .retry(Duration.ofSeconds(20))
+                .build());
+        responseObserver.onNext(ServerSentEvent.<HelloReply>builder()
+                .event("say")
+                .comment("hello world")
+                .build());
+        for (int i = 1; i < 6; i++) {
+            LOGGER.info("sayHelloServerStreamSSE onNext:  {} times", i);
+            responseObserver.onNext(ServerSentEvent.<HelloReply>builder()
+                    .data(toReply("Hello " + ' ' + i + " times"))
+                    .build());
+        }
+        LOGGER.info("sayHelloServerStreamSSE onCompleted");
+        responseObserver.onCompleted();
+    }
+
     @Override
     public StreamObserver<HelloRequest> 
sayHelloBiStream(StreamObserver<HelloReply> responseObserver) {
         LOGGER.info("Received sayHelloBiStream request");
diff --git 
a/dubbo-plugin/dubbo-rest-jaxrs/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/support/jaxrs/FallbackArgumentResolver.java
 
b/dubbo-plugin/dubbo-rest-jaxrs/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/support/jaxrs/FallbackArgumentResolver.java
index 11d88ddabe..ff6243c225 100644
--- 
a/dubbo-plugin/dubbo-rest-jaxrs/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/support/jaxrs/FallbackArgumentResolver.java
+++ 
b/dubbo-plugin/dubbo-rest-jaxrs/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/support/jaxrs/FallbackArgumentResolver.java
@@ -51,6 +51,9 @@ public class FallbackArgumentResolver extends 
AbstractArgumentResolver {
         if (value != null) {
             return value;
         }
+        if (meta.parameter().isStream()) {
+            return null;
+        }
         if (meta.parameter().isSimple()) {
             return request.parameter(meta.name());
         }
diff --git 
a/dubbo-plugin/dubbo-rest-spring/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/support/spring/FallbackArgumentResolver.java
 
b/dubbo-plugin/dubbo-rest-spring/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/support/spring/FallbackArgumentResolver.java
index e9e2806bea..2f698435fb 100644
--- 
a/dubbo-plugin/dubbo-rest-spring/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/support/spring/FallbackArgumentResolver.java
+++ 
b/dubbo-plugin/dubbo-rest-spring/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/support/spring/FallbackArgumentResolver.java
@@ -42,6 +42,9 @@ public class FallbackArgumentResolver extends 
AbstractArgumentResolver {
     @Override
     protected Object resolveValue(NamedValueMeta meta, HttpRequest request, 
HttpResponse response) {
         ParameterMeta parameter = meta.parameter();
+        if (parameter.isStream()) {
+            return null;
+        }
         if (parameter.isSimple()) {
             return request.parameter(meta.name());
         }
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpConstants.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpConstants.java
index ef6995b857..a746bbc2f2 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpConstants.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpConstants.java
@@ -16,8 +16,6 @@
  */
 package org.apache.dubbo.remoting.http12;
 
-import java.nio.charset.StandardCharsets;
-
 public final class HttpConstants {
 
     public static final String TRAILERS = "trailers";
@@ -33,8 +31,5 @@ public final class HttpConstants {
     public static final String HTTPS = "https";
     public static final String HTTP = "http";
 
-    public static final byte[] SERVER_SENT_EVENT_DATA_PREFIX_BYTES = 
"data:".getBytes(StandardCharsets.US_ASCII);
-    public static final byte[] SERVER_SENT_EVENT_LF_BYTES = 
"\n\n".getBytes(StandardCharsets.US_ASCII);
-
     private HttpConstants() {}
 }
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/ServerSentEvent.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/ServerSentEvent.java
new file mode 100644
index 0000000000..1bd7bd5868
--- /dev/null
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/ServerSentEvent.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.http12.message;
+
+import java.time.Duration;
+
+/**
+ * Represents a Server-Sent Event according to the HTML specification.
+ * <p>
+ * Server-Sent Events (SSE) is a server push technology enabling a client to 
receive automatic updates from a server via HTTP connection.
+ * The server can send new data to the client at any time by pushing messages, 
without the need to reestablish the connection.
+ * <p>
+ * This class encapsulates the structure of a Server-Sent Event, which may 
include:
+ * <ul>
+ *   <li>An event ID</li>
+ *   <li>An event type</li>
+ *   <li>A retry interval</li>
+ *   <li>A comment</li>
+ *   <li>Data payload</li>
+ * </ul>
+ * <p>
+ * Use the {@link #builder()} method to create instances of this class.
+ *
+ * @param <T> the type of data that this event contains
+ * @see <a 
href="https://html.spec.whatwg.org/multipage/server-sent-events.html";>Server-Sent
 Events</a>
+ */
+public final class ServerSentEvent<T> {
+
+    /**
+     * The event ID that can be used for tracking or resuming event streams.
+     */
+    private final String id;
+
+    /**
+     * The event type or name that identifies the type of event.
+     */
+    private final String event;
+
+    /**
+     * The reconnection time in milliseconds that the client should wait 
before reconnecting
+     * after a connection is closed.
+     */
+    private final Duration retry;
+
+    /**
+     * A comment that will be ignored by event-processing clients but can be 
useful for debugging.
+     */
+    private final String comment;
+
+    /**
+     * The data payload of this event.
+     */
+    private final T data;
+
+    /**
+     * Constructs a new ServerSentEvent with the specified properties.
+     * <p>
+     * It's recommended to use the {@link #builder()} method instead of this 
constructor directly.
+     *
+     * @param id      the event ID, can be null
+     * @param event   the event type, can be null
+     * @param retry   the reconnection time, can be null
+     * @param comment the comment, can be null
+     * @param data    the data payload, can be null
+     */
+    public ServerSentEvent(String id, String event, Duration retry, String 
comment, T data) {
+        this.id = id;
+        this.event = event;
+        this.retry = retry;
+        this.comment = comment;
+        this.data = data;
+    }
+
+    /**
+     * Returns the event ID.
+     *
+     * @return the event ID, may be null
+     */
+    public String getId() {
+        return id;
+    }
+
+    /**
+     * Returns the event type.
+     *
+     * @return the event type, may be null
+     */
+    public String getEvent() {
+        return event;
+    }
+
+    /**
+     * Returns the reconnection time that clients should wait before 
reconnecting.
+     *
+     * @return the reconnection time as a Duration, may be null
+     */
+    public Duration getRetry() {
+        return retry;
+    }
+
+    /**
+     * Returns the comment associated with this event.
+     *
+     * @return the comment, may be null
+     */
+    public String getComment() {
+        return comment;
+    }
+
+    /**
+     * Returns the data payload of this event.
+     *
+     * @return the data payload, may be null
+     */
+    public T getData() {
+        return data;
+    }
+
+    @Override
+    public String toString() {
+        return "ServerSentEvent{id='" + id + '\'' + ", event='" + event + '\'' 
+ ", retry=" + retry + ", comment='"
+                + comment + '\'' + ", data=" + data + '}';
+    }
+
+    /**
+     * Creates a new {@link Builder} instance.
+     *
+     * @param <T> the type of data that the event will contain
+     * @return a new builder
+     */
+    public static <T> Builder<T> builder() {
+        return new Builder<>();
+    }
+
+    /**
+     * Builder for {@link ServerSentEvent}.
+     *
+     * @param <T> the type of data that the event will contain
+     */
+    public static final class Builder<T> {
+        private String id;
+        private String event;
+        private Duration retry;
+        private String comment;
+        private T data;
+
+        private Builder() {}
+
+        /**
+         * Sets the id of the event.
+         *
+         * @param id the id
+         * @return this builder
+         */
+        public Builder<T> id(String id) {
+            this.id = id;
+            return this;
+        }
+
+        /**
+         * Sets the event type.
+         *
+         * @param event the event type
+         * @return this builder
+         */
+        public Builder<T> event(String event) {
+            this.event = event;
+            return this;
+        }
+
+        /**
+         * Sets the retry duration.
+         *
+         * @param retry the retry duration
+         * @return this builder
+         */
+        public Builder<T> retry(Duration retry) {
+            this.retry = retry;
+            return this;
+        }
+
+        /**
+         * Sets the comment.
+         *
+         * @param comment the comment
+         * @return this builder
+         */
+        public Builder<T> comment(String comment) {
+            this.comment = comment;
+            return this;
+        }
+
+        /**
+         * Sets the data.
+         *
+         * @param data the data
+         * @return this builder
+         */
+        public Builder<T> data(T data) {
+            this.data = data;
+            return this;
+        }
+
+        /**
+         * Builds a new {@link ServerSentEvent} with the configured properties.
+         *
+         * @return the built event
+         */
+        public ServerSentEvent<T> build() {
+            return new ServerSentEvent<>(id, event, retry, comment, data);
+        }
+    }
+}
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/ServerSentEventEncoder.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/ServerSentEventEncoder.java
new file mode 100644
index 0000000000..7338271df6
--- /dev/null
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/ServerSentEventEncoder.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.http12.message;
+
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.remoting.http12.exception.EncodeException;
+
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+import java.util.List;
+
+/**
+ * Encode the data according to the Server-Sent Events specification.
+ * <p>
+ * The formatted string follows the text/event-stream format as defined in the 
HTML specification.
+ * Each field is formatted as a line with the field name, followed by a colon, 
followed by the field value,
+ * and ending with a newline character.
+ *
+ * @see <a 
href="https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation";>Event
 stream interpretation</a>
+ */
+public final class ServerSentEventEncoder implements HttpMessageEncoder {
+
+    private final HttpMessageEncoder httpMessageEncoder;
+
+    public ServerSentEventEncoder(HttpMessageEncoder httpMessageEncoder) {
+        this.httpMessageEncoder = httpMessageEncoder;
+    }
+
+    @Override
+    public void encode(OutputStream outputStream, Object data, Charset 
charset) throws EncodeException {
+        StringBuilder sb = new StringBuilder(256);
+
+        if (data instanceof ServerSentEvent) {
+            ServerSentEvent<?> event = (ServerSentEvent<?>) data;
+            if (event.getId() != null) {
+                appendField(sb, "id", event.getId());
+            }
+            if (event.getEvent() != null) {
+                appendField(sb, "event", event.getEvent());
+            }
+            if (event.getRetry() != null) {
+                appendField(sb, "retry", event.getRetry().toMillis());
+            }
+            if (event.getComment() != null) {
+                sb.append(':')
+                        .append(StringUtils.replace(event.getComment(), "\n", 
"\n:"))
+                        .append('\n');
+            }
+            if (event.getData() != null) {
+                encodeData(sb, event.getData(), charset);
+            }
+        } else {
+            encodeData(sb, data, charset);
+        }
+
+        sb.append('\n');
+
+        try {
+            outputStream.write(sb.toString().getBytes(charset));
+        } catch (Exception e) {
+            throw new EncodeException("Error encoding ServerSentEvent", e);
+        }
+    }
+
+    private void encodeData(StringBuilder sb, Object data, Charset charset) {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream(256);
+        httpMessageEncoder.encode(bos, data, charset);
+        String dataStr = new String(bos.toByteArray(), charset);
+        List<String> lines = StringUtils.splitToList(dataStr, '\n');
+        for (int i = 0, size = lines.size(); i < size; i++) {
+            appendField(sb, "data", lines.get(i));
+        }
+    }
+
+    private static void appendField(StringBuilder sb, String name, Object 
value) {
+        sb.append(name).append(':').append(value).append('\n');
+    }
+
+    @Override
+    public String contentType() {
+        return httpMessageEncoder.contentType();
+    }
+
+    @Override
+    public MediaType mediaType() {
+        return httpMessageEncoder.mediaType();
+    }
+
+    @Override
+    public boolean supports(String mediaType) {
+        return httpMessageEncoder.supports(mediaType);
+    }
+}
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ReflectionPackableMethod.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ReflectionPackableMethod.java
index 4fd388d336..ae19f2a464 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ReflectionPackableMethod.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ReflectionPackableMethod.java
@@ -115,7 +115,7 @@ public class ReflectionPackableMethod implements 
PackableMethod {
         return new ReflectionPackableMethod(methodDescriptor, url, 
serializeName, allSerialize);
     }
 
-    static boolean isStreamType(Class<?> type) {
+    public static boolean isStreamType(Class<?> type) {
         return StreamObserver.class.isAssignableFrom(type) || 
GRPC_STREAM_CLASS.equalsIgnoreCase(type.getName());
     }
 
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java
index 5e03be4c40..8aae457476 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java
@@ -25,7 +25,6 @@ import org.apache.dubbo.remoting.http12.HttpInputMessage;
 import org.apache.dubbo.remoting.http12.RequestMetadata;
 import org.apache.dubbo.remoting.http12.h1.Http1ServerChannelObserver;
 import org.apache.dubbo.remoting.http12.h1.Http1ServerTransportListener;
-import org.apache.dubbo.remoting.http12.h1.Http1SseServerChannelObserver;
 import org.apache.dubbo.remoting.http12.message.DefaultListeningDecoder;
 import org.apache.dubbo.remoting.http12.message.MediaType;
 import org.apache.dubbo.remoting.http12.message.codec.JsonCodec;
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1SseServerChannelObserver.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/Http1SseServerChannelObserver.java
similarity index 67%
rename from 
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1SseServerChannelObserver.java
rename to 
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/Http1SseServerChannelObserver.java
index 486af06e2a..867bc0ff7e 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1SseServerChannelObserver.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/Http1SseServerChannelObserver.java
@@ -14,15 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dubbo.remoting.http12.h1;
+package org.apache.dubbo.rpc.protocol.tri.h12.http1;
 
 import org.apache.dubbo.remoting.http12.HttpChannel;
 import org.apache.dubbo.remoting.http12.HttpConstants;
 import org.apache.dubbo.remoting.http12.HttpHeaderNames;
 import org.apache.dubbo.remoting.http12.HttpMetadata;
-import org.apache.dubbo.remoting.http12.HttpOutputMessage;
-
-import java.io.IOException;
+import org.apache.dubbo.remoting.http12.h1.Http1ServerChannelObserver;
+import org.apache.dubbo.remoting.http12.message.HttpMessageEncoder;
+import org.apache.dubbo.remoting.http12.message.ServerSentEventEncoder;
 
 public class Http1SseServerChannelObserver extends Http1ServerChannelObserver {
 
@@ -30,24 +30,15 @@ public class Http1SseServerChannelObserver extends 
Http1ServerChannelObserver {
         super(httpChannel);
     }
 
+    @Override
+    public void setResponseEncoder(HttpMessageEncoder responseEncoder) {
+        super.setResponseEncoder(new ServerSentEventEncoder(responseEncoder));
+    }
+
     @Override
     protected HttpMetadata encodeHttpMetadata(boolean endStream) {
         return super.encodeHttpMetadata(endStream)
                 .header(HttpHeaderNames.TRANSFER_ENCODING.getKey(), 
HttpConstants.CHUNKED)
                 .header(HttpHeaderNames.CACHE_CONTROL.getKey(), 
HttpConstants.NO_CACHE);
     }
-
-    @Override
-    protected void preOutputMessage(HttpOutputMessage message) throws 
IOException {
-        HttpOutputMessage prefixMessage = getHttpChannel().newOutputMessage();
-        
prefixMessage.getBody().write(HttpConstants.SERVER_SENT_EVENT_DATA_PREFIX_BYTES);
-        getHttpChannel().writeMessage(prefixMessage);
-    }
-
-    @Override
-    protected void postOutputMessage(HttpOutputMessage message) throws 
IOException {
-        HttpOutputMessage lfMessage = getHttpChannel().newOutputMessage();
-        lfMessage.getBody().write(HttpConstants.SERVER_SENT_EVENT_LF_BYTES);
-        getHttpChannel().writeMessage(lfMessage);
-    }
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2SseServerChannelObserver.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2SseServerChannelObserver.java
index b36dc5e353..807d74da52 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2SseServerChannelObserver.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2SseServerChannelObserver.java
@@ -19,12 +19,11 @@ package org.apache.dubbo.rpc.protocol.tri.h12.http2;
 import org.apache.dubbo.remoting.http12.HttpConstants;
 import org.apache.dubbo.remoting.http12.HttpHeaderNames;
 import org.apache.dubbo.remoting.http12.HttpMetadata;
-import org.apache.dubbo.remoting.http12.HttpOutputMessage;
 import org.apache.dubbo.remoting.http12.h2.H2StreamChannel;
+import org.apache.dubbo.remoting.http12.message.HttpMessageEncoder;
+import org.apache.dubbo.remoting.http12.message.ServerSentEventEncoder;
 import org.apache.dubbo.rpc.model.FrameworkModel;
 
-import java.io.IOException;
-
 public final class Http2SseServerChannelObserver extends 
Http2StreamServerChannelObserver {
 
     public Http2SseServerChannelObserver(FrameworkModel frameworkModel, 
H2StreamChannel h2StreamChannel) {
@@ -32,22 +31,13 @@ public final class Http2SseServerChannelObserver extends 
Http2StreamServerChanne
     }
 
     @Override
-    protected HttpMetadata encodeHttpMetadata(boolean endStream) {
-        return super.encodeHttpMetadata(endStream)
-                .header(HttpHeaderNames.CACHE_CONTROL.getKey(), 
HttpConstants.NO_CACHE);
-    }
-
-    @Override
-    protected void preOutputMessage(HttpOutputMessage message) throws 
IOException {
-        HttpOutputMessage prefixMessage = getHttpChannel().newOutputMessage();
-        
prefixMessage.getBody().write(HttpConstants.SERVER_SENT_EVENT_DATA_PREFIX_BYTES);
-        getHttpChannel().writeMessage(prefixMessage);
+    public void setResponseEncoder(HttpMessageEncoder responseEncoder) {
+        super.setResponseEncoder(new ServerSentEventEncoder(responseEncoder));
     }
 
     @Override
-    protected void postOutputMessage(HttpOutputMessage message) throws 
IOException {
-        HttpOutputMessage lfMessage = getHttpChannel().newOutputMessage();
-        lfMessage.getBody().write(HttpConstants.SERVER_SENT_EVENT_LF_BYTES);
-        getHttpChannel().writeMessage(lfMessage);
+    protected HttpMetadata encodeHttpMetadata(boolean endStream) {
+        return super.encodeHttpMetadata(endStream)
+                .header(HttpHeaderNames.CACHE_CONTROL.getKey(), 
HttpConstants.NO_CACHE);
     }
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/mapping/meta/ParameterMeta.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/mapping/meta/ParameterMeta.java
index d9a46ef296..1b9a34503c 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/mapping/meta/ParameterMeta.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/mapping/meta/ParameterMeta.java
@@ -18,6 +18,7 @@ package org.apache.dubbo.rpc.protocol.tri.rest.mapping.meta;
 
 import org.apache.dubbo.remoting.http12.HttpRequest;
 import org.apache.dubbo.remoting.http12.HttpResponse;
+import org.apache.dubbo.rpc.protocol.tri.ReflectionPackableMethod;
 import org.apache.dubbo.rpc.protocol.tri.rest.Messages;
 import org.apache.dubbo.rpc.protocol.tri.rest.RestException;
 import org.apache.dubbo.rpc.protocol.tri.rest.util.RestToolKit;
@@ -79,6 +80,10 @@ public abstract class ParameterMeta extends 
AnnotationSupport {
         return simple;
     }
 
+    public final boolean isStream() {
+        return ReflectionPackableMethod.isStreamType(getType());
+    }
+
     public final Class<?> getActualType() {
         Class<?> type = actualType;
         if (type == null) {
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/support/basic/FallbackArgumentResolver.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/support/basic/FallbackArgumentResolver.java
index 82caf9dfd5..1e42dfe223 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/support/basic/FallbackArgumentResolver.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/support/basic/FallbackArgumentResolver.java
@@ -111,6 +111,10 @@ public class FallbackArgumentResolver extends 
AbstractArgumentResolver {
             }
         }
 
+        if (meta.parameter().isStream()) {
+            return null;
+        }
+
         if (single) {
             if (Map.class.isAssignableFrom(meta.type())) {
                 return RequestUtils.getParametersMap(request);
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/test/TestResponse.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/test/TestResponse.java
index 44470d9e41..3e22f5125f 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/test/TestResponse.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/test/TestResponse.java
@@ -30,6 +30,8 @@ import java.util.List;
 
 import io.netty.handler.codec.http2.Http2Headers.PseudoHeaderName;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+
 @SuppressWarnings("unchecked")
 public class TestResponse {
 
@@ -84,10 +86,15 @@ public class TestResponse {
             bodies = new ArrayList<>(oss.size());
             boolean isTextEvent = 
MediaType.TEXT_EVENT_STREAM.getName().equals(getContentType());
             for (int i = 0, size = oss.size(); i < size; i++) {
-                if (isTextEvent && i % 3 != 1) {
+                ByteArrayOutputStream bos = (ByteArrayOutputStream) oss.get(i);
+                if (isTextEvent) {
+                    String data = new String(bos.toByteArray(), UTF_8);
+                    if (data.startsWith("data:")) {
+                        String body = data.substring(5, data.length() - 2);
+                        bodies.add((T) decoder.decode(new 
ByteArrayInputStream(body.getBytes(UTF_8)), type));
+                    }
                     continue;
                 }
-                ByteArrayOutputStream bos = (ByteArrayOutputStream) oss.get(i);
                 if (bos.size() == 0) {
                     bodies.add(null);
                 } else {

Reply via email to