poorbarcode commented on code in PR #25915:
URL: https://github.com/apache/pulsar/pull/25915#discussion_r3363601657
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java:
##########
@@ -665,16 +667,60 @@ protected Consumer getActiveConsumer(Subscription
subscription) {
return null;
}
- protected boolean hasLocalProducers() {
- if (producers.isEmpty()) {
- return false;
+ public abstract CompletableFuture<Void> closeReplProducersIfNoBacklog();
+
+ public abstract CompletableFuture<Void> startReplProducers();
+
+ public void disconnectReplicatorIfNoTrafficForLongTime() {
+ updateLocalProducersEmptyTime();
+
+ final Long cachedTime = localProducersEmptyTime;
+ // Still active.
+ if (cachedTime == null) {
+ return;
+ }
+ // Disabled the feature.
+ int threshold =
brokerService.getPulsar().getConfig().getBrokerReplicationInactiveThresholdSeconds();
+ if (threshold <= 0) {
+ return;
+ }
+ // Check and close replication producers.
+ if (System.currentTimeMillis() - cachedTime > threshold * 1000L) {
+ log.info().attr("brokerReplicationInactiveThresholdSeconds",
threshold)
+ .log("Disconnecting replication producers since no
producer is active for a long time.");
+ closeReplProducersIfNoBacklog().whenCompleteAsync((__, ex) -> {
Review Comment:
The previous solution solved the issue you mentioned; your suggestion here
https://github.com/apache/pulsar/pull/25915#issuecomment-4597972779 introduced
a regression
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]