This is an automated email from the ASF dual-hosted git repository.
oxsean pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.3 by this push:
new a37bc2c368 feat: Enhance Server-Sent Events support (#15387)
a37bc2c368 is described below
commit a37bc2c36823a1ab1cc49888524c161b6cd6e643
Author: Sean Yang <[email protected]>
AuthorDate: Thu May 15 21:37:43 2025 +0800
feat: Enhance Server-Sent Events support (#15387)
---
.../springboot/demo/servlet/GreeterService.java | 3 +
.../demo/servlet/GreeterServiceImpl.java | 22 ++
.../support/jaxrs/FallbackArgumentResolver.java | 3 +
.../support/spring/FallbackArgumentResolver.java | 3 +
.../dubbo/remoting/http12/HttpConstants.java | 5 -
.../remoting/http12/message/ServerSentEvent.java | 227 +++++++++++++++++++++
.../http12/message/ServerSentEventEncoder.java | 108 ++++++++++
.../rpc/protocol/tri/ReflectionPackableMethod.java | 2 +-
.../DefaultHttp11ServerTransportListener.java | 1 -
.../h12/http1}/Http1SseServerChannelObserver.java | 27 +--
.../h12/http2/Http2SseServerChannelObserver.java | 24 +--
.../tri/rest/mapping/meta/ParameterMeta.java | 5 +
.../support/basic/FallbackArgumentResolver.java | 4 +
.../dubbo/rpc/protocol/tri/test/TestResponse.java | 11 +-
14 files changed, 401 insertions(+), 44 deletions(-)
diff --git
a/dubbo-demo/dubbo-demo-spring-boot/dubbo-demo-spring-boot-servlet/src/main/java/org/apache/dubbo/springboot/demo/servlet/GreeterService.java
b/dubbo-demo/dubbo-demo-spring-boot/dubbo-demo-spring-boot-servlet/src/main/java/org/apache/dubbo/springboot/demo/servlet/GreeterService.java
index 1c3345ee0b..96248f07c2 100644
---
a/dubbo-demo/dubbo-demo-spring-boot/dubbo-demo-spring-boot-servlet/src/main/java/org/apache/dubbo/springboot/demo/servlet/GreeterService.java
+++
b/dubbo-demo/dubbo-demo-spring-boot/dubbo-demo-spring-boot-servlet/src/main/java/org/apache/dubbo/springboot/demo/servlet/GreeterService.java
@@ -17,6 +17,7 @@
package org.apache.dubbo.springboot.demo.servlet;
import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.remoting.http12.message.ServerSentEvent;
import java.util.concurrent.CompletableFuture;
@@ -39,6 +40,8 @@ public interface GreeterService {
void sayHelloServerStreamNoParameter(StreamObserver<HelloReply>
responseObserver);
+ void sayHelloServerStreamSSE(StreamObserver<ServerSentEvent<HelloReply>>
responseObserver);
+
/**
* Sends greetings with bi streaming
*/
diff --git
a/dubbo-demo/dubbo-demo-spring-boot/dubbo-demo-spring-boot-servlet/src/main/java/org/apache/dubbo/springboot/demo/servlet/GreeterServiceImpl.java
b/dubbo-demo/dubbo-demo-spring-boot/dubbo-demo-spring-boot-servlet/src/main/java/org/apache/dubbo/springboot/demo/servlet/GreeterServiceImpl.java
index 8626e9aa83..264d6fd6da 100644
---
a/dubbo-demo/dubbo-demo-spring-boot/dubbo-demo-spring-boot-servlet/src/main/java/org/apache/dubbo/springboot/demo/servlet/GreeterServiceImpl.java
+++
b/dubbo-demo/dubbo-demo-spring-boot/dubbo-demo-spring-boot-servlet/src/main/java/org/apache/dubbo/springboot/demo/servlet/GreeterServiceImpl.java
@@ -18,7 +18,9 @@ package org.apache.dubbo.springboot.demo.servlet;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.config.annotation.DubboService;
+import org.apache.dubbo.remoting.http12.message.ServerSentEvent;
+import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
@@ -63,6 +65,26 @@ public class GreeterServiceImpl implements GreeterService {
responseObserver.onCompleted();
}
+ @Override
+ public void
sayHelloServerStreamSSE(StreamObserver<ServerSentEvent<HelloReply>>
responseObserver) {
+ LOGGER.info("Received sayHelloServerStreamSSE request");
+ responseObserver.onNext(ServerSentEvent.<HelloReply>builder()
+ .retry(Duration.ofSeconds(20))
+ .build());
+ responseObserver.onNext(ServerSentEvent.<HelloReply>builder()
+ .event("say")
+ .comment("hello world")
+ .build());
+ for (int i = 1; i < 6; i++) {
+ LOGGER.info("sayHelloServerStreamSSE onNext: {} times", i);
+ responseObserver.onNext(ServerSentEvent.<HelloReply>builder()
+ .data(toReply("Hello " + ' ' + i + " times"))
+ .build());
+ }
+ LOGGER.info("sayHelloServerStreamSSE onCompleted");
+ responseObserver.onCompleted();
+ }
+
@Override
public StreamObserver<HelloRequest>
sayHelloBiStream(StreamObserver<HelloReply> responseObserver) {
LOGGER.info("Received sayHelloBiStream request");
diff --git
a/dubbo-plugin/dubbo-rest-jaxrs/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/support/jaxrs/FallbackArgumentResolver.java
b/dubbo-plugin/dubbo-rest-jaxrs/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/support/jaxrs/FallbackArgumentResolver.java
index 11d88ddabe..ff6243c225 100644
---
a/dubbo-plugin/dubbo-rest-jaxrs/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/support/jaxrs/FallbackArgumentResolver.java
+++
b/dubbo-plugin/dubbo-rest-jaxrs/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/support/jaxrs/FallbackArgumentResolver.java
@@ -51,6 +51,9 @@ public class FallbackArgumentResolver extends
AbstractArgumentResolver {
if (value != null) {
return value;
}
+ if (meta.parameter().isStream()) {
+ return null;
+ }
if (meta.parameter().isSimple()) {
return request.parameter(meta.name());
}
diff --git
a/dubbo-plugin/dubbo-rest-spring/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/support/spring/FallbackArgumentResolver.java
b/dubbo-plugin/dubbo-rest-spring/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/support/spring/FallbackArgumentResolver.java
index e9e2806bea..2f698435fb 100644
---
a/dubbo-plugin/dubbo-rest-spring/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/support/spring/FallbackArgumentResolver.java
+++
b/dubbo-plugin/dubbo-rest-spring/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/support/spring/FallbackArgumentResolver.java
@@ -42,6 +42,9 @@ public class FallbackArgumentResolver extends
AbstractArgumentResolver {
@Override
protected Object resolveValue(NamedValueMeta meta, HttpRequest request,
HttpResponse response) {
ParameterMeta parameter = meta.parameter();
+ if (parameter.isStream()) {
+ return null;
+ }
if (parameter.isSimple()) {
return request.parameter(meta.name());
}
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpConstants.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpConstants.java
index ef6995b857..a746bbc2f2 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpConstants.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/HttpConstants.java
@@ -16,8 +16,6 @@
*/
package org.apache.dubbo.remoting.http12;
-import java.nio.charset.StandardCharsets;
-
public final class HttpConstants {
public static final String TRAILERS = "trailers";
@@ -33,8 +31,5 @@ public final class HttpConstants {
public static final String HTTPS = "https";
public static final String HTTP = "http";
- public static final byte[] SERVER_SENT_EVENT_DATA_PREFIX_BYTES =
"data:".getBytes(StandardCharsets.US_ASCII);
- public static final byte[] SERVER_SENT_EVENT_LF_BYTES =
"\n\n".getBytes(StandardCharsets.US_ASCII);
-
private HttpConstants() {}
}
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/ServerSentEvent.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/ServerSentEvent.java
new file mode 100644
index 0000000000..1bd7bd5868
--- /dev/null
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/ServerSentEvent.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.http12.message;
+
+import java.time.Duration;
+
+/**
+ * Represents a Server-Sent Event according to the HTML specification.
+ * <p>
+ * Server-Sent Events (SSE) is a server push technology enabling a client to
receive automatic updates from a server via HTTP connection.
+ * The server can send new data to the client at any time by pushing messages,
without the need to reestablish the connection.
+ * <p>
+ * This class encapsulates the structure of a Server-Sent Event, which may
include:
+ * <ul>
+ * <li>An event ID</li>
+ * <li>An event type</li>
+ * <li>A retry interval</li>
+ * <li>A comment</li>
+ * <li>Data payload</li>
+ * </ul>
+ * <p>
+ * Use the {@link #builder()} method to create instances of this class.
+ *
+ * @param <T> the type of data that this event contains
+ * @see <a
href="https://html.spec.whatwg.org/multipage/server-sent-events.html">Server-Sent
Events</a>
+ */
+public final class ServerSentEvent<T> {
+
+ /**
+ * The event ID that can be used for tracking or resuming event streams.
+ */
+ private final String id;
+
+ /**
+ * The event type or name that identifies the type of event.
+ */
+ private final String event;
+
+ /**
+ * The reconnection time in milliseconds that the client should wait
before reconnecting
+ * after a connection is closed.
+ */
+ private final Duration retry;
+
+ /**
+ * A comment that will be ignored by event-processing clients but can be
useful for debugging.
+ */
+ private final String comment;
+
+ /**
+ * The data payload of this event.
+ */
+ private final T data;
+
+ /**
+ * Constructs a new ServerSentEvent with the specified properties.
+ * <p>
+ * It's recommended to use the {@link #builder()} method instead of this
constructor directly.
+ *
+ * @param id the event ID, can be null
+ * @param event the event type, can be null
+ * @param retry the reconnection time, can be null
+ * @param comment the comment, can be null
+ * @param data the data payload, can be null
+ */
+ public ServerSentEvent(String id, String event, Duration retry, String
comment, T data) {
+ this.id = id;
+ this.event = event;
+ this.retry = retry;
+ this.comment = comment;
+ this.data = data;
+ }
+
+ /**
+ * Returns the event ID.
+ *
+ * @return the event ID, may be null
+ */
+ public String getId() {
+ return id;
+ }
+
+ /**
+ * Returns the event type.
+ *
+ * @return the event type, may be null
+ */
+ public String getEvent() {
+ return event;
+ }
+
+ /**
+ * Returns the reconnection time that clients should wait before
reconnecting.
+ *
+ * @return the reconnection time as a Duration, may be null
+ */
+ public Duration getRetry() {
+ return retry;
+ }
+
+ /**
+ * Returns the comment associated with this event.
+ *
+ * @return the comment, may be null
+ */
+ public String getComment() {
+ return comment;
+ }
+
+ /**
+ * Returns the data payload of this event.
+ *
+ * @return the data payload, may be null
+ */
+ public T getData() {
+ return data;
+ }
+
+ @Override
+ public String toString() {
+ return "ServerSentEvent{id='" + id + '\'' + ", event='" + event + '\''
+ ", retry=" + retry + ", comment='"
+ + comment + '\'' + ", data=" + data + '}';
+ }
+
+ /**
+ * Creates a new {@link Builder} instance.
+ *
+ * @param <T> the type of data that the event will contain
+ * @return a new builder
+ */
+ public static <T> Builder<T> builder() {
+ return new Builder<>();
+ }
+
+ /**
+ * Builder for {@link ServerSentEvent}.
+ *
+ * @param <T> the type of data that the event will contain
+ */
+ public static final class Builder<T> {
+ private String id;
+ private String event;
+ private Duration retry;
+ private String comment;
+ private T data;
+
+ private Builder() {}
+
+ /**
+ * Sets the id of the event.
+ *
+ * @param id the id
+ * @return this builder
+ */
+ public Builder<T> id(String id) {
+ this.id = id;
+ return this;
+ }
+
+ /**
+ * Sets the event type.
+ *
+ * @param event the event type
+ * @return this builder
+ */
+ public Builder<T> event(String event) {
+ this.event = event;
+ return this;
+ }
+
+ /**
+ * Sets the retry duration.
+ *
+ * @param retry the retry duration
+ * @return this builder
+ */
+ public Builder<T> retry(Duration retry) {
+ this.retry = retry;
+ return this;
+ }
+
+ /**
+ * Sets the comment.
+ *
+ * @param comment the comment
+ * @return this builder
+ */
+ public Builder<T> comment(String comment) {
+ this.comment = comment;
+ return this;
+ }
+
+ /**
+ * Sets the data.
+ *
+ * @param data the data
+ * @return this builder
+ */
+ public Builder<T> data(T data) {
+ this.data = data;
+ return this;
+ }
+
+ /**
+ * Builds a new {@link ServerSentEvent} with the configured properties.
+ *
+ * @return the built event
+ */
+ public ServerSentEvent<T> build() {
+ return new ServerSentEvent<>(id, event, retry, comment, data);
+ }
+ }
+}
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/ServerSentEventEncoder.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/ServerSentEventEncoder.java
new file mode 100644
index 0000000000..7338271df6
--- /dev/null
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/ServerSentEventEncoder.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.http12.message;
+
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.remoting.http12.exception.EncodeException;
+
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+import java.util.List;
+
+/**
+ * Encode the data according to the Server-Sent Events specification.
+ * <p>
+ * The formatted string follows the text/event-stream format as defined in the
HTML specification.
+ * Each field is formatted as a line with the field name, followed by a colon,
followed by the field value,
+ * and ending with a newline character.
+ *
+ * @see <a
href="https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation">Event
stream interpretation</a>
+ */
+public final class ServerSentEventEncoder implements HttpMessageEncoder {
+
+ private final HttpMessageEncoder httpMessageEncoder;
+
+ public ServerSentEventEncoder(HttpMessageEncoder httpMessageEncoder) {
+ this.httpMessageEncoder = httpMessageEncoder;
+ }
+
+ @Override
+ public void encode(OutputStream outputStream, Object data, Charset
charset) throws EncodeException {
+ StringBuilder sb = new StringBuilder(256);
+
+ if (data instanceof ServerSentEvent) {
+ ServerSentEvent<?> event = (ServerSentEvent<?>) data;
+ if (event.getId() != null) {
+ appendField(sb, "id", event.getId());
+ }
+ if (event.getEvent() != null) {
+ appendField(sb, "event", event.getEvent());
+ }
+ if (event.getRetry() != null) {
+ appendField(sb, "retry", event.getRetry().toMillis());
+ }
+ if (event.getComment() != null) {
+ sb.append(':')
+ .append(StringUtils.replace(event.getComment(), "\n",
"\n:"))
+ .append('\n');
+ }
+ if (event.getData() != null) {
+ encodeData(sb, event.getData(), charset);
+ }
+ } else {
+ encodeData(sb, data, charset);
+ }
+
+ sb.append('\n');
+
+ try {
+ outputStream.write(sb.toString().getBytes(charset));
+ } catch (Exception e) {
+ throw new EncodeException("Error encoding ServerSentEvent", e);
+ }
+ }
+
+ private void encodeData(StringBuilder sb, Object data, Charset charset) {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream(256);
+ httpMessageEncoder.encode(bos, data, charset);
+ String dataStr = new String(bos.toByteArray(), charset);
+ List<String> lines = StringUtils.splitToList(dataStr, '\n');
+ for (int i = 0, size = lines.size(); i < size; i++) {
+ appendField(sb, "data", lines.get(i));
+ }
+ }
+
+ private static void appendField(StringBuilder sb, String name, Object
value) {
+ sb.append(name).append(':').append(value).append('\n');
+ }
+
+ @Override
+ public String contentType() {
+ return httpMessageEncoder.contentType();
+ }
+
+ @Override
+ public MediaType mediaType() {
+ return httpMessageEncoder.mediaType();
+ }
+
+ @Override
+ public boolean supports(String mediaType) {
+ return httpMessageEncoder.supports(mediaType);
+ }
+}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ReflectionPackableMethod.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ReflectionPackableMethod.java
index 4fd388d336..ae19f2a464 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ReflectionPackableMethod.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ReflectionPackableMethod.java
@@ -115,7 +115,7 @@ public class ReflectionPackableMethod implements
PackableMethod {
return new ReflectionPackableMethod(methodDescriptor, url,
serializeName, allSerialize);
}
- static boolean isStreamType(Class<?> type) {
+ public static boolean isStreamType(Class<?> type) {
return StreamObserver.class.isAssignableFrom(type) ||
GRPC_STREAM_CLASS.equalsIgnoreCase(type.getName());
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java
index 5e03be4c40..8aae457476 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java
@@ -25,7 +25,6 @@ import org.apache.dubbo.remoting.http12.HttpInputMessage;
import org.apache.dubbo.remoting.http12.RequestMetadata;
import org.apache.dubbo.remoting.http12.h1.Http1ServerChannelObserver;
import org.apache.dubbo.remoting.http12.h1.Http1ServerTransportListener;
-import org.apache.dubbo.remoting.http12.h1.Http1SseServerChannelObserver;
import org.apache.dubbo.remoting.http12.message.DefaultListeningDecoder;
import org.apache.dubbo.remoting.http12.message.MediaType;
import org.apache.dubbo.remoting.http12.message.codec.JsonCodec;
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1SseServerChannelObserver.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/Http1SseServerChannelObserver.java
similarity index 67%
rename from
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1SseServerChannelObserver.java
rename to
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/Http1SseServerChannelObserver.java
index 486af06e2a..867bc0ff7e 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1SseServerChannelObserver.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/Http1SseServerChannelObserver.java
@@ -14,15 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.remoting.http12.h1;
+package org.apache.dubbo.rpc.protocol.tri.h12.http1;
import org.apache.dubbo.remoting.http12.HttpChannel;
import org.apache.dubbo.remoting.http12.HttpConstants;
import org.apache.dubbo.remoting.http12.HttpHeaderNames;
import org.apache.dubbo.remoting.http12.HttpMetadata;
-import org.apache.dubbo.remoting.http12.HttpOutputMessage;
-
-import java.io.IOException;
+import org.apache.dubbo.remoting.http12.h1.Http1ServerChannelObserver;
+import org.apache.dubbo.remoting.http12.message.HttpMessageEncoder;
+import org.apache.dubbo.remoting.http12.message.ServerSentEventEncoder;
public class Http1SseServerChannelObserver extends Http1ServerChannelObserver {
@@ -30,24 +30,15 @@ public class Http1SseServerChannelObserver extends
Http1ServerChannelObserver {
super(httpChannel);
}
+ @Override
+ public void setResponseEncoder(HttpMessageEncoder responseEncoder) {
+ super.setResponseEncoder(new ServerSentEventEncoder(responseEncoder));
+ }
+
@Override
protected HttpMetadata encodeHttpMetadata(boolean endStream) {
return super.encodeHttpMetadata(endStream)
.header(HttpHeaderNames.TRANSFER_ENCODING.getKey(),
HttpConstants.CHUNKED)
.header(HttpHeaderNames.CACHE_CONTROL.getKey(),
HttpConstants.NO_CACHE);
}
-
- @Override
- protected void preOutputMessage(HttpOutputMessage message) throws
IOException {
- HttpOutputMessage prefixMessage = getHttpChannel().newOutputMessage();
-
prefixMessage.getBody().write(HttpConstants.SERVER_SENT_EVENT_DATA_PREFIX_BYTES);
- getHttpChannel().writeMessage(prefixMessage);
- }
-
- @Override
- protected void postOutputMessage(HttpOutputMessage message) throws
IOException {
- HttpOutputMessage lfMessage = getHttpChannel().newOutputMessage();
- lfMessage.getBody().write(HttpConstants.SERVER_SENT_EVENT_LF_BYTES);
- getHttpChannel().writeMessage(lfMessage);
- }
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2SseServerChannelObserver.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2SseServerChannelObserver.java
index b36dc5e353..807d74da52 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2SseServerChannelObserver.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2SseServerChannelObserver.java
@@ -19,12 +19,11 @@ package org.apache.dubbo.rpc.protocol.tri.h12.http2;
import org.apache.dubbo.remoting.http12.HttpConstants;
import org.apache.dubbo.remoting.http12.HttpHeaderNames;
import org.apache.dubbo.remoting.http12.HttpMetadata;
-import org.apache.dubbo.remoting.http12.HttpOutputMessage;
import org.apache.dubbo.remoting.http12.h2.H2StreamChannel;
+import org.apache.dubbo.remoting.http12.message.HttpMessageEncoder;
+import org.apache.dubbo.remoting.http12.message.ServerSentEventEncoder;
import org.apache.dubbo.rpc.model.FrameworkModel;
-import java.io.IOException;
-
public final class Http2SseServerChannelObserver extends
Http2StreamServerChannelObserver {
public Http2SseServerChannelObserver(FrameworkModel frameworkModel,
H2StreamChannel h2StreamChannel) {
@@ -32,22 +31,13 @@ public final class Http2SseServerChannelObserver extends
Http2StreamServerChanne
}
@Override
- protected HttpMetadata encodeHttpMetadata(boolean endStream) {
- return super.encodeHttpMetadata(endStream)
- .header(HttpHeaderNames.CACHE_CONTROL.getKey(),
HttpConstants.NO_CACHE);
- }
-
- @Override
- protected void preOutputMessage(HttpOutputMessage message) throws
IOException {
- HttpOutputMessage prefixMessage = getHttpChannel().newOutputMessage();
-
prefixMessage.getBody().write(HttpConstants.SERVER_SENT_EVENT_DATA_PREFIX_BYTES);
- getHttpChannel().writeMessage(prefixMessage);
+ public void setResponseEncoder(HttpMessageEncoder responseEncoder) {
+ super.setResponseEncoder(new ServerSentEventEncoder(responseEncoder));
}
@Override
- protected void postOutputMessage(HttpOutputMessage message) throws
IOException {
- HttpOutputMessage lfMessage = getHttpChannel().newOutputMessage();
- lfMessage.getBody().write(HttpConstants.SERVER_SENT_EVENT_LF_BYTES);
- getHttpChannel().writeMessage(lfMessage);
+ protected HttpMetadata encodeHttpMetadata(boolean endStream) {
+ return super.encodeHttpMetadata(endStream)
+ .header(HttpHeaderNames.CACHE_CONTROL.getKey(),
HttpConstants.NO_CACHE);
}
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/mapping/meta/ParameterMeta.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/mapping/meta/ParameterMeta.java
index d9a46ef296..1b9a34503c 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/mapping/meta/ParameterMeta.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/mapping/meta/ParameterMeta.java
@@ -18,6 +18,7 @@ package org.apache.dubbo.rpc.protocol.tri.rest.mapping.meta;
import org.apache.dubbo.remoting.http12.HttpRequest;
import org.apache.dubbo.remoting.http12.HttpResponse;
+import org.apache.dubbo.rpc.protocol.tri.ReflectionPackableMethod;
import org.apache.dubbo.rpc.protocol.tri.rest.Messages;
import org.apache.dubbo.rpc.protocol.tri.rest.RestException;
import org.apache.dubbo.rpc.protocol.tri.rest.util.RestToolKit;
@@ -79,6 +80,10 @@ public abstract class ParameterMeta extends
AnnotationSupport {
return simple;
}
+ public final boolean isStream() {
+ return ReflectionPackableMethod.isStreamType(getType());
+ }
+
public final Class<?> getActualType() {
Class<?> type = actualType;
if (type == null) {
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/support/basic/FallbackArgumentResolver.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/support/basic/FallbackArgumentResolver.java
index 82caf9dfd5..1e42dfe223 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/support/basic/FallbackArgumentResolver.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/support/basic/FallbackArgumentResolver.java
@@ -111,6 +111,10 @@ public class FallbackArgumentResolver extends
AbstractArgumentResolver {
}
}
+ if (meta.parameter().isStream()) {
+ return null;
+ }
+
if (single) {
if (Map.class.isAssignableFrom(meta.type())) {
return RequestUtils.getParametersMap(request);
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/test/TestResponse.java
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/test/TestResponse.java
index 44470d9e41..3e22f5125f 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/test/TestResponse.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/test/TestResponse.java
@@ -30,6 +30,8 @@ import java.util.List;
import io.netty.handler.codec.http2.Http2Headers.PseudoHeaderName;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
@SuppressWarnings("unchecked")
public class TestResponse {
@@ -84,10 +86,15 @@ public class TestResponse {
bodies = new ArrayList<>(oss.size());
boolean isTextEvent =
MediaType.TEXT_EVENT_STREAM.getName().equals(getContentType());
for (int i = 0, size = oss.size(); i < size; i++) {
- if (isTextEvent && i % 3 != 1) {
+ ByteArrayOutputStream bos = (ByteArrayOutputStream) oss.get(i);
+ if (isTextEvent) {
+ String data = new String(bos.toByteArray(), UTF_8);
+ if (data.startsWith("data:")) {
+ String body = data.substring(5, data.length() - 2);
+ bodies.add((T) decoder.decode(new
ByteArrayInputStream(body.getBytes(UTF_8)), type));
+ }
continue;
}
- ByteArrayOutputStream bos = (ByteArrayOutputStream) oss.get(i);
if (bos.size() == 0) {
bodies.add(null);
} else {