pjfanning commented on code in PR #1494:
URL: https://github.com/apache/pekko-connectors/pull/1494#discussion_r2919162645
##########
google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/javadsl/GooglePubSub.scala:
##########
@@ -45,12 +51,48 @@ object GooglePubSub {
/**
* Create a source that emits messages for a given subscription using a
StreamingPullRequest.
*
- * The materialized value can be used to cancel the source.
+ * Automatically reconnects when the server closes the StreamingPull
connection (due to idle
+ * timeout, rebalancing, etc.), matching the behavior of Google's official
client library.
+ * Retryable gRPC errors (UNAVAILABLE, DEADLINE_EXCEEDED, INTERNAL, etc.)
trigger reconnection
+ * with exponential backoff. Non-retryable errors (PERMISSION_DENIED,
NOT_FOUND, etc.) cause
+ * the stream to fail immediately.
+ *
+ * @param request the subscription FQRS and ack deadline fields are
mandatory for the request
+ * @param pollInterval time between StreamingPullRequest messages are
being sent
+ * @param restartSettings settings for the exponential backoff reconnection
behavior,
+ * defaults match Google's client library (100ms to
10s)
+ */
+ def subscribe(request: StreamingPullRequest,
+ pollInterval: Duration,
+ restartSettings: RestartSettings): Source[ReceivedMessage, NotUsed] =
+ RestartSource.withBackoff(restartSettings,
+ () => rawSubscribe(request,
pollInterval).mapMaterializedValue[NotUsed](_ => NotUsed))
+
+ /**
+ * Create a source that emits messages for a given subscription using a
StreamingPullRequest.
+ *
+ * Automatically reconnects when the server closes the StreamingPull
connection,
+ * using default settings that match Google's official client library (100ms
to 10s).
*
* @param request the subscription FQRS and ack deadline fields are
mandatory for the request
* @param pollInterval time between StreamingPullRequest messages are being
sent
*/
def subscribe(request: StreamingPullRequest,
+ pollInterval: Duration): Source[ReceivedMessage, NotUsed] =
Review Comment:
You need to keep the existing signature. You can overload the method though.
Add `@since 2.0.0` to the new public methods including overloads
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]