hanishi commented on code in PR #1494:
URL: https://github.com/apache/pekko-connectors/pull/1494#discussion_r2921743419


##########
google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/javadsl/GooglePubSub.scala:
##########
@@ -45,12 +51,48 @@ object GooglePubSub {
   /**
    * Create a source that emits messages for a given subscription using a 
StreamingPullRequest.
    *
-   * The materialized value can be used to cancel the source.
+   * Automatically reconnects when the server closes the StreamingPull 
connection (due to idle
+   * timeout, rebalancing, etc.), matching the behavior of Google's official 
client library.
+   * Retryable gRPC errors (UNAVAILABLE, DEADLINE_EXCEEDED, INTERNAL, etc.) 
trigger reconnection
+   * with exponential backoff. Non-retryable errors (PERMISSION_DENIED, 
NOT_FOUND, etc.) cause
+   * the stream to fail immediately.
+   *
+   * @param request         the subscription FQRS and ack deadline fields are 
mandatory for the request
+   * @param pollInterval    time between StreamingPullRequest messages are 
being sent
+   * @param restartSettings settings for the exponential backoff reconnection 
behavior,
+   *                        defaults match Google's client library (100ms to 
10s)
+   */
+  def subscribe(request: StreamingPullRequest,
+      pollInterval: Duration,
+      restartSettings: RestartSettings): Source[ReceivedMessage, NotUsed] =
+    RestartSource.withBackoff(restartSettings,
+      () => rawSubscribe(request, 
pollInterval).mapMaterializedValue[NotUsed](_ => NotUsed))
+
+  /**
+   * Create a source that emits messages for a given subscription using a 
StreamingPullRequest.
+   *
+   * Automatically reconnects when the server closes the StreamingPull 
connection,
+   * using default settings that match Google's official client library (100ms 
to 10s).
    *
    * @param request      the subscription FQRS and ack deadline fields are 
mandatory for the request
    * @param pollInterval time between StreamingPullRequest messages are being 
sent
    */
   def subscribe(request: StreamingPullRequest,
+      pollInterval: Duration): Source[ReceivedMessage, NotUsed] =

Review Comment:
   Thanks for the feedback! 
   
   I'll keep the existing signature and add the overload with `@since 2.0.0` on 
the new public methods.
   
   That said, the overall design still feels a bit sketchy to me.
   I'll update the issue description with a Mermaid diagram to clarify the 
intent and give us a better foundation for discussion before I finalize the 
implementation.
   
   I'll also need to add tests, and I want to study the existing test structure 
a bit first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to