This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit dd8ce54c998fa7230e59773baf85df0ca7aee949 Author: void-ptr974 <[email protected]> AuthorDate: Thu Jun 4 02:16:26 2026 +0800 [improve][offload] Coalesce automatic offload triggers to reduce retry loops and ledger scans (#25793) (cherry picked from commit 7ecedb8265b1d802ef1a571d7780f4a73141251b) --- .../impl/AutomaticOffloadTriggerController.java | 86 ++++++++++++++ .../mledger/impl/ManagedLedgerFactoryImpl.java | 4 +- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 90 +++++++++++++-- .../AutomaticOffloadTriggerControllerTest.java | 85 ++++++++++++++ .../bookkeeper/mledger/impl/OffloadPrefixTest.java | 124 +++++++++++++++++++++ 5 files changed, 375 insertions(+), 14 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AutomaticOffloadTriggerController.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AutomaticOffloadTriggerController.java new file mode 100644 index 00000000000..35f5a267063 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AutomaticOffloadTriggerController.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Coalesces repeated automatic offload triggers into at most one active run and one follow-up run. + */ +final class AutomaticOffloadTriggerController { + private static final int IDLE = 0; + private static final int RUNNING = 1; + private static final int RUNNING_WITH_PENDING_TRIGGER = 2; + + private final AtomicInteger state = new AtomicInteger(IDLE); + + /** + * Records an automatic offload trigger. + * + * @return true when the caller must start a new automatic offload run + */ + boolean requestRun() { + while (true) { + int current = state.get(); + switch (current) { + case IDLE: + if (state.compareAndSet(IDLE, RUNNING)) { + return true; + } + break; + case RUNNING: + if (state.compareAndSet(RUNNING, RUNNING_WITH_PENDING_TRIGGER)) { + return false; + } + break; + case RUNNING_WITH_PENDING_TRIGGER: + return false; + default: + throw new IllegalStateException("Unknown automatic offload trigger state: " + current); + } + } + } + + /** + * Records completion of the current automatic offload run. + * + * @return true when the caller must immediately start one coalesced follow-up run + */ + boolean completeRun() { + while (true) { + int current = state.get(); + switch (current) { + case IDLE: + return false; + case RUNNING: + if (state.compareAndSet(RUNNING, IDLE)) { + return false; + } + break; + case RUNNING_WITH_PENDING_TRIGGER: + if (state.compareAndSet(RUNNING_WITH_PENDING_TRIGGER, RUNNING)) { + return true; + } + break; + default: + throw new IllegalStateException("Unknown automatic offload trigger state: " + current); + } + } + } +} diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index bcca3fb9a52..d45692df78f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -19,7 +19,7 @@ package org.apache.bookkeeper.mledger.impl; import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException; -import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.NULL_OFFLOAD_PROMISE; +import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.AUTOMATIC_OFFLOAD_TRIGGER; import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables; import com.google.common.base.Predicates; import com.google.common.collect.BoundType; @@ -430,7 +430,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { future.complete(newledger); // May need to trigger offloading if (config.isTriggerOffloadOnTopicLoad()) { - newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE); + newledger.maybeOffloadInBackground(AUTOMATIC_OFFLOAD_TRIGGER); } }); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index f1eaa04d313..d647cc789a4 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -224,8 +224,20 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { protected final CallbackMutex trimmerMutex = new CallbackMutex(); protected final CallbackMutex offloadMutex = new CallbackMutex(); - public static final CompletableFuture<Position> NULL_OFFLOAD_PROMISE = CompletableFuture + private final AutomaticOffloadTriggerController automaticOffloadTriggerController = + new AutomaticOffloadTriggerController(); + // Identity sentinel for automatic offload requests. The completed Position value is not used. + public static final CompletableFuture<Position> AUTOMATIC_OFFLOAD_TRIGGER = CompletableFuture .completedFuture(PositionFactory.LATEST); + + private enum OffloadRequestSource { + AUTOMATIC, + EXPLICIT + } + + private record OffloadThresholds(long thresholdInBytes, long thresholdInSeconds) { + } + @VisibleForTesting @Getter protected volatile LedgerHandle currentLedger; @@ -1853,7 +1865,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { trimConsumedLedgersInBackground(); - maybeOffloadInBackground(NULL_OFFLOAD_PROMISE); + maybeOffloadInBackground(AUTOMATIC_OFFLOAD_TRIGGER); createLedgerAfterClosed(); } @@ -2674,22 +2686,73 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } public void maybeOffloadInBackground(CompletableFuture<Position> promise) { - if (getOffloadPoliciesIfAppendable().isEmpty()) { + if (promise == AUTOMATIC_OFFLOAD_TRIGGER) { + if (automaticOffloadTriggerController.requestRun()) { + startAutomaticOffload(); + } + return; + } + + maybeOffloadInBackground(promise, OffloadRequestSource.EXPLICIT); + } + + private void startAutomaticOffload() { + CompletableFuture<Position> automaticOffloadCompletion = new CompletableFuture<>(); + automaticOffloadCompletion.whenComplete((res, ex) -> finishAutomaticOffload(ex)); + try { + maybeOffloadInBackground(automaticOffloadCompletion, OffloadRequestSource.AUTOMATIC); + } catch (RuntimeException e) { + automaticOffloadCompletion.completeExceptionally(e); + } + } + + private void maybeOffloadInBackground(CompletableFuture<Position> promise, OffloadRequestSource source) { + Optional<OffloadThresholds> offloadThresholds = getOffloadThresholds(); + if (offloadThresholds.isEmpty()) { + if (source == OffloadRequestSource.AUTOMATIC) { + promise.complete(PositionFactory.LATEST); + } return; } - final OffloadPolicies policies = config.getLedgerOffloader().getOffloadPolicies(); + OffloadThresholds thresholds = offloadThresholds.get(); + try { + executor.execute(() -> maybeOffload(thresholds.thresholdInBytes(), thresholds.thresholdInSeconds(), + promise, source)); + } catch (RuntimeException e) { + promise.completeExceptionally(e); + } + } + + private Optional<OffloadThresholds> getOffloadThresholds() { + Optional<OffloadPolicies> optionalOffloadPolicies = getOffloadPoliciesIfAppendable(); + if (optionalOffloadPolicies.isEmpty()) { + return Optional.empty(); + } + + final OffloadPolicies policies = optionalOffloadPolicies.get(); final long offloadThresholdInBytes = Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInBytes()).orElse(-1L); final long offloadThresholdInSeconds = Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L); if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) { - executor.execute(() -> maybeOffload(offloadThresholdInBytes, offloadThresholdInSeconds, promise)); + return Optional.of(new OffloadThresholds(offloadThresholdInBytes, offloadThresholdInSeconds)); + } + + return Optional.empty(); + } + + private void finishAutomaticOffload(Throwable exception) { + if (exception != null && log.isDebugEnabled()) { + log.debug("Failed to automatically offload ledgers", exception); + } + if (automaticOffloadTriggerController.completeRun()) { + startAutomaticOffload(); } } private void maybeOffload(long offloadThresholdInBytes, long offloadThresholdInSeconds, - CompletableFuture<Position> finalPromise) { + CompletableFuture<Position> finalPromise, OffloadRequestSource source) { if (getOffloadPoliciesIfAppendable().isEmpty()) { String msg = String.format("[%s] Nothing to offload due to offloader or offloadPolicies is NULL", name); finalPromise.completeExceptionally(new IllegalArgumentException(msg)); @@ -2704,8 +2767,12 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } if (!offloadMutex.tryLock()) { - scheduledExecutor.schedule(() -> maybeOffloadInBackground(finalPromise), - 100, TimeUnit.MILLISECONDS); + try { + scheduledExecutor.schedule(() -> maybeOffloadInBackground(finalPromise, source), + 100, TimeUnit.MILLISECONDS); + } catch (RuntimeException e) { + finalPromise.completeExceptionally(e); + } return; } @@ -2795,12 +2862,11 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { private Optional<OffloadPolicies> getOffloadPoliciesIfAppendable() { LedgerOffloader ledgerOffloader = config.getLedgerOffloader(); - if (ledgerOffloader == null - || !ledgerOffloader.isAppendable() - || ledgerOffloader.getOffloadPolicies() == null) { + if (ledgerOffloader == null || !ledgerOffloader.isAppendable()) { return Optional.empty(); } - return Optional.ofNullable(ledgerOffloader.getOffloadPolicies()); + OffloadPolicies offloadPolicies = ledgerOffloader.getOffloadPolicies(); + return Optional.ofNullable(offloadPolicies); } @VisibleForTesting diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/AutomaticOffloadTriggerControllerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/AutomaticOffloadTriggerControllerTest.java new file mode 100644 index 00000000000..9aefc7e3e1e --- /dev/null +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/AutomaticOffloadTriggerControllerTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl; + +import static org.assertj.core.api.Assertions.assertThat; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import org.testng.annotations.Test; + +public class AutomaticOffloadTriggerControllerTest { + + @Test + public void triggersCoalesceWhileRunIsActive() { + AutomaticOffloadTriggerController controller = new AutomaticOffloadTriggerController(); + + assertThat(controller.requestRun()).isTrue(); + assertThat(controller.requestRun()).isFalse(); + assertThat(controller.requestRun()).isFalse(); + } + + @Test + public void pendingTriggerSchedulesOneFollowUpRun() { + AutomaticOffloadTriggerController controller = new AutomaticOffloadTriggerController(); + + assertThat(controller.requestRun()).isTrue(); + assertThat(controller.requestRun()).isFalse(); + + assertThat(controller.completeRun()).isTrue(); + assertThat(controller.completeRun()).isFalse(); + assertThat(controller.requestRun()).isTrue(); + } + + @Test(timeOut = 30000) + public void concurrentTriggerAndCompletionAlwaysReserveOneFollowUpRun() throws Exception { + ExecutorService executor = Executors.newFixedThreadPool(2); + try { + for (int i = 0; i < 1000; i++) { + AutomaticOffloadTriggerController controller = new AutomaticOffloadTriggerController(); + assertThat(controller.requestRun()).isTrue(); + + // Completion and a new trigger can race; exactly one side must reserve the follow-up run. + CyclicBarrier barrier = new CyclicBarrier(3); + Future<Boolean> completeResult = executor.submit(() -> { + barrier.await(5, TimeUnit.SECONDS); + return controller.completeRun(); + }); + Future<Boolean> triggerResult = executor.submit(() -> { + barrier.await(5, TimeUnit.SECONDS); + return controller.requestRun(); + }); + + barrier.await(5, TimeUnit.SECONDS); + boolean followUpReservedByComplete = completeResult.get(5, TimeUnit.SECONDS); + boolean followUpReservedByTrigger = triggerResult.get(5, TimeUnit.SECONDS); + + assertThat(followUpReservedByComplete) + .as("iteration %s must reserve exactly one follow-up run", i) + .isNotEqualTo(followUpReservedByTrigger); + assertThat(controller.completeRun()).isFalse(); + } + } finally { + executor.shutdownNow(); + executor.awaitTermination(5, TimeUnit.SECONDS); + } + } +} diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java index 2a50330a0da..434980f0007 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java @@ -1184,6 +1184,130 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase { } } + @Test + public void automaticOffloadTriggersAreCoalescedWhileOffloadInProgress() throws Exception { + CompletableFuture<Void> slowOffload = new CompletableFuture<>(); + CountDownLatch offloadRunning = new CountDownLatch(1); + AtomicInteger offloadPolicyCalls = new AtomicInteger(); + MockLedgerOffloader offloader = new MockLedgerOffloader() { + @Override + public CompletableFuture<Void> offload(ReadHandle ledger, + UUID uuid, + Map<String, String> extraMetadata) { + offloadRunning.countDown(); + return slowOffload.thenCompose((res) -> super.offload(ledger, uuid, extraMetadata)); + } + + @Override + public OffloadPoliciesImpl getOffloadPolicies() { + offloadPolicyCalls.incrementAndGet(); + return super.getOffloadPolicies(); + } + }; + + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(10); + config.setRetentionTime(10, TimeUnit.MINUTES); + config.setRetentionSizeInMB(10); + offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(0L); + offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInSeconds(null); + config.setLedgerOffloader(offloader); + + ManagedLedgerImpl ledger = + (ManagedLedgerImpl) factory.open("my_test_ledger" + UUID.randomUUID(), config); + + for (int i = 0; i < 25; i++) { + ledger.addEntry(buildEntry(10, "entry-" + i)); + } + assertTrue(offloadRunning.await(5, TimeUnit.SECONDS)); + + // Repeated automatic triggers should stop at the controller and avoid another policy lookup. + int callsBeforeRepeatedTriggers = offloadPolicyCalls.get(); + for (int i = 0; i < 20; i++) { + ledger.maybeOffloadInBackground(ManagedLedgerImpl.AUTOMATIC_OFFLOAD_TRIGGER); + } + + assertEquals(offloadPolicyCalls.get(), callsBeforeRepeatedTriggers); + + slowOffload.complete(null); + + assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 2); + List<Long> allLedgerIds = ledger.getLedgersInfoAsList().stream().map(LedgerInfo::getLedgerId).toList(); + assertEquals(offloader.offloadedLedgers(), Set.of(allLedgerIds.get(0), allLedgerIds.get(1))); + } + + @Test + public void automaticOffloadRunsAgainForCoalescedTrigger() throws Exception { + CompletableFuture<Void> slowOffload = new CompletableFuture<>(); + CountDownLatch offloadRunning = new CountDownLatch(1); + MockLedgerOffloader offloader = new MockLedgerOffloader() { + @Override + public CompletableFuture<Void> offload(ReadHandle ledger, + UUID uuid, + Map<String, String> extraMetadata) { + offloadRunning.countDown(); + return slowOffload.thenCompose((res) -> super.offload(ledger, uuid, extraMetadata)); + } + }; + + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(10); + config.setRetentionTime(10, TimeUnit.MINUTES); + config.setRetentionSizeInMB(10); + offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(0L); + offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInSeconds(null); + config.setLedgerOffloader(offloader); + + ManagedLedgerImpl ledger = + (ManagedLedgerImpl) factory.open("my_test_ledger" + UUID.randomUUID(), config); + + for (int i = 0; i < 11; i++) { + ledger.addEntry(buildEntry(10, "entry-" + i)); + } + assertTrue(offloadRunning.await(5, TimeUnit.SECONDS)); + + // The next ledger closes after the first automatic scan, so it depends on the coalesced rerun. + for (int i = 11; i < 21; i++) { + ledger.addEntry(buildEntry(10, "entry-" + i)); + } + assertEquals(offloader.offloadedLedgers().size(), 0); + + slowOffload.complete(null); + + assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 2); + List<Long> allLedgerIds = ledger.getLedgersInfoAsList().stream().map(LedgerInfo::getLedgerId).toList(); + assertEquals(offloader.offloadedLedgers(), Set.of(allLedgerIds.get(0), allLedgerIds.get(1))); + } + + @Test + public void automaticOffloadWithoutThresholdDoesNotBlockLaterTriggers() throws Exception { + MockLedgerOffloader offloader = new MockLedgerOffloader(); + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(10); + config.setRetentionTime(10, TimeUnit.MINUTES); + config.setRetentionSizeInMB(10); + offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(-1L); + offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInSeconds(null); + config.setLedgerOffloader(offloader); + + ManagedLedgerImpl ledger = + (ManagedLedgerImpl) factory.open("my_test_ledger" + UUID.randomUUID(), config); + + for (int i = 0; i < 25; i++) { + ledger.addEntry(buildEntry(10, "entry-" + i)); + } + ledger.maybeOffloadInBackground(ManagedLedgerImpl.AUTOMATIC_OFFLOAD_TRIGGER); + assertEquals(offloader.offloadedLedgers().size(), 0); + + // A disabled automatic trigger must complete internally so a later valid trigger can run. + offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(0L); + ledger.maybeOffloadInBackground(ManagedLedgerImpl.AUTOMATIC_OFFLOAD_TRIGGER); + + assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 2); + List<Long> allLedgerIds = ledger.getLedgersInfoAsList().stream().map(LedgerInfo::getLedgerId).toList(); + assertEquals(offloader.offloadedLedgers(), Set.of(allLedgerIds.get(0), allLedgerIds.get(1))); + } + @DataProvider(name = "offloadAsSoonAsClosed") public Object[][] offloadAsSoonAsClosedProvider() { return new Object[][]{
