hanishi commented on code in PR #1494:
URL: https://github.com/apache/pekko-connectors/pull/1494#discussion_r2921743419
##########
google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/javadsl/GooglePubSub.scala:
##########
@@ -45,12 +51,48 @@ object GooglePubSub {
/**
* Create a source that emits messages for a given subscription using a
StreamingPullRequest.
*
- * The materialized value can be used to cancel the source.
+ * Automatically reconnects when the server closes the StreamingPull
connection (due to idle
+ * timeout, rebalancing, etc.), matching the behavior of Google's official
client library.
+ * Retryable gRPC errors (UNAVAILABLE, DEADLINE_EXCEEDED, INTERNAL, etc.)
trigger reconnection
+ * with exponential backoff. Non-retryable errors (PERMISSION_DENIED,
NOT_FOUND, etc.) cause
+ * the stream to fail immediately.
+ *
+ * @param request the subscription FQRS and ack deadline fields are
mandatory for the request
+ * @param pollInterval time between StreamingPullRequest messages are
being sent
+ * @param restartSettings settings for the exponential backoff reconnection
behavior,
+ * defaults match Google's client library (100ms to
10s)
+ */
+ def subscribe(request: StreamingPullRequest,
+ pollInterval: Duration,
+ restartSettings: RestartSettings): Source[ReceivedMessage, NotUsed] =
+ RestartSource.withBackoff(restartSettings,
+ () => rawSubscribe(request,
pollInterval).mapMaterializedValue[NotUsed](_ => NotUsed))
+
+ /**
+ * Create a source that emits messages for a given subscription using a
StreamingPullRequest.
+ *
+ * Automatically reconnects when the server closes the StreamingPull
connection,
+ * using default settings that match Google's official client library (100ms
to 10s).
*
* @param request the subscription FQRS and ack deadline fields are
mandatory for the request
* @param pollInterval time between StreamingPullRequest messages are being
sent
*/
def subscribe(request: StreamingPullRequest,
+ pollInterval: Duration): Source[ReceivedMessage, NotUsed] =
Review Comment:
Thanks for the feedback!
I'll keep the existing signature and add the overload with `@since 2.0.0` on
the new public methods.
That said, the overall design still feels a bit sketchy to me.
I'll update the issue description with a Mermaid diagram to clarify the
intent and give us a better foundation for discussion before I finalize the
implementation.
I'll also need to add tests, and I want to study the existing test structure
a bit first.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]