lhotari commented on code in PR #25915:
URL: https://github.com/apache/pulsar/pull/25915#discussion_r3344005651
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -741,6 +741,10 @@ protected void startInactivityMonitor() {
int interval =
pulsar().getConfiguration().getBrokerDeleteInactiveTopicsFrequencySeconds();
inactivityMonitor.scheduleAtFixedRateNonConcurrently(() ->
checkGC(), interval, interval,
TimeUnit.SECONDS);
+ if
(pulsar().getConfig().getBrokerReplicationInactiveThresholdSeconds() > 0) {
Review Comment:
`brokerReplicationInactiveThresholdSeconds` is type of `Integer` in
`ServiceConfiguration`. Technically it can be set to `null`. Therefore a null
check could be useful.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java:
##########
@@ -665,16 +667,54 @@ protected Consumer getActiveConsumer(Subscription
subscription) {
return null;
}
- protected boolean hasLocalProducers() {
- if (producers.isEmpty()) {
- return false;
+ public abstract CompletableFuture<Void> closeReplProducersIfNoBacklog();
+
+ public abstract CompletableFuture<Void> startReplProducers();
+
+ public void disconnectReplicatorIfNoTrafficForLongTime() {
+ updateLocalProducersEmptyTime();
+
+ final Long cachedTime = localProducersEmptyTime;
+ if (cachedTime == null) {
+ return;
}
+ int threshold =
brokerService.getPulsar().getConfig().getBrokerReplicationInactiveThresholdSeconds();
+ if (System.currentTimeMillis() - cachedTime > threshold * 1000L) {
Review Comment:
since `brokerReplicationInactiveThresholdSeconds` is a dynamic
configuration, it's possible to change the value dynamically. It would be
expected that setting it to `0` would disable the check. This condition would
evaluate to true for all replicators if that would be performed.
One possible way to address the problem is to make the setting non-dynamic
with `dynamic = false`.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java:
##########
@@ -598,12 +598,18 @@ public CompletableFuture<Void> close(
return closeFuture;
}
- public CompletableFuture<Void> stopReplProducers() {
+ public CompletableFuture<Void> closeReplProducersIfNoBacklog() {
Review Comment:
Local Claude Code review comment:
> ### `NonPersistentTopic` idle-disconnect uses `terminate()`, which can
never be restarted
>
> For `PersistentTopic`, `closeReplProducersIfNoBacklog()` calls
`replicator.disconnect()` → `State.Disconnected`, and `startProducer()` revives
from `Disconnected` (`AbstractReplicator#startProducer`). So the
disconnect→reconnect cycle this PR relies on works.
>
> For `NonPersistentTopic`, the renamed `closeReplProducersIfNoBacklog()`
still calls `replicator.terminate()` → `State.Terminated`. That breaks the
reconnect path in two ways:
>
> - `startProducer()` is a **no-op** from `Terminated`
(`AbstractReplicator#startProducer`: *"Skip the producer creation since the
replicator is terminating"*), so neither the `startReplProducers()` safety net
nor `addProducer()` can revive it.
> - `terminate()` does **not** remove the replicator from the `replicators`
map (only `removeReplicator()` does). `checkReplication()` only recreates
*missing* replicators (`!replicators.containsKey(cluster)`), so it won't
recreate the terminated-but-present one either.
>
> Net effect: once a non-persistent replicated topic that has a remote
producer (but no local producers) goes idle past
`brokerReplicationInactiveThresholdSeconds`,
`disconnectReplicatorIfNoTrafficForLongTime()` terminates its outbound
replicators **permanently** — replication stays broken until the topic is
unloaded/reloaded, even if a local producer reconnects later. This violates the
disconnect→reconnect design the PR is built on.
>
> The previous `terminate()` usage was always immediately followed by
`delete()`, so the permanent state didn't matter; this PR introduces a
terminate-without-delete path where it does.
>
> The two new tests cover only the persistent topic; the non-persistent path
is untested.
>
> **Suggestion:** make `NonPersistentTopic.closeReplProducersIfNoBacklog()`
use reconnectable (`disconnect()`) semantics consistent with `PersistentTopic`,
or remove+recreate the replicator, and add a non-persistent test for the idle
disconnect→reconnect cycle.
>
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -741,6 +741,10 @@ protected void startInactivityMonitor() {
int interval =
pulsar().getConfiguration().getBrokerDeleteInactiveTopicsFrequencySeconds();
inactivityMonitor.scheduleAtFixedRateNonConcurrently(() ->
checkGC(), interval, interval,
TimeUnit.SECONDS);
+ if
(pulsar().getConfig().getBrokerReplicationInactiveThresholdSeconds() > 0) {
Review Comment:
Another detail is that the field has `dynamic = true`. The scheduled task
wouldn't get cancelled and rescheduled with the updated value. Again, setting
to `dynamic = false` would be a way to address this without adding support for
dynamic configuration of `brokerReplicationInactiveThresholdSeconds`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]