This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4d0a85f2486 KAFKA-20417: Add KIP-1035 self-managed offset lifecycle
integration tests (#21999)
4d0a85f2486 is described below
commit 4d0a85f2486afa345e38ffa873a5d33cf27a8409
Author: Bill Bejeck <[email protected]>
AuthorDate: Fri Apr 17 13:45:21 2026 -0400
KAFKA-20417: Add KIP-1035 self-managed offset lifecycle integration tests
(#21999)
Adds `SelfManagedOffsetLifecycleIntegrationTest` — companion to the
recovery tests.
Validates the happy path for column-family-based offset management.
After each clean shutdown, tests directly inspect the CF to assert
status=closed and offsets populated. Covers:
- State preservation across clean restarts - Multiple
restart cycles - No changelog restoration after clean shutdown (via
StateRestoreListener) - Clean restart with no data processed
- Interactive query state verification across restarts
All tests parameterized for ALOS and EOS.
Reviewers: Matthias J. Sax <[email protected]>
---
.../SelfManagedOffsetLifecycleIntegrationTest.java | 532 +++++++++++++++++++++
1 file changed, 532 insertions(+)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SelfManagedOffsetLifecycleIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SelfManagedOffsetLifecycleIntegrationTest.java
new file mode 100644
index 00000000000..d6a55bdb7ce
--- /dev/null
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SelfManagedOffsetLifecycleIntegrationTest.java
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StoreQueryParameters;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.processor.StateRestoreListener;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.streams.processor.internals.StateDirectory;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.internals.RocksDBStoreTestingUtils;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Integration tests for KIP-1035 column family offset normal lifecycle.
+ *
+ * <p>Validates that offsets stored in RocksDB column families are correctly
persisted on
+ * clean shutdown and read back on restart. After each clean stop, tests
directly inspect
+ * the CF to assert status=closed and offsets populated. On restart, a
+ * {@code StateRestoreListener} verifies no changelog restoration occurs, and
cumulative
+ * count assertions confirm state continuity. All tests are parameterized for
ALOS and EOS.
+ */
+@Tag("integration")
+@Timeout(600)
+public class SelfManagedOffsetLifecycleIntegrationTest {
+
+ private static final int NUM_BROKERS = 1;
+ private static final int NUM_PARTITIONS = 3;
+ private static final String INPUT_TOPIC = "input-topic";
+ private static final String OUTPUT_TOPIC = "output-topic";
+ private static final String STORE_NAME = "counts-store";
+ private static final long COMMIT_INTERVAL_MS = 100L;
+ private static final Duration STREAMS_CLOSE_TIMEOUT =
Duration.ofSeconds(5);
+
+ private static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS);
+
+ private int consumerGroupCounter = 0;
+
+ private Properties streamsConfig;
+ private KafkaStreams streams;
+ private File stateDir;
+
+ @BeforeAll
+ public static void startCluster() throws IOException, InterruptedException
{
+ CLUSTER.start();
+ }
+
+ @AfterAll
+ public static void stopCluster() {
+ CLUSTER.stop();
+ }
+
+ @BeforeEach
+ public void setUp(final TestInfo testInfo) throws InterruptedException {
+ CLUSTER.deleteAllTopics();
+ CLUSTER.createTopic(INPUT_TOPIC, NUM_PARTITIONS, 1);
+ CLUSTER.createTopic(OUTPUT_TOPIC, NUM_PARTITIONS, 1);
+
+ final String safeTestName = safeUniqueTestName(testInfo);
+
+ streamsConfig = new Properties();
+ streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" +
safeTestName);
+ streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
+ streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
+ streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
+ stateDir = TestUtils.tempDirectory();
+ streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.getPath());
+ streamsConfig.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+ streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
COMMIT_INTERVAL_MS);
+ streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 5000);
+ streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
+ }
+
+ @AfterEach
+ public void tearDown() {
+ if (streams != null) {
+ closeStreams(streams);
+ streams.cleanUp();
+ }
+ }
+
+ private StreamsBuilder buildCountTopology() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final KStream<String, String> stream = builder.stream(INPUT_TOPIC);
+ stream
+ .groupByKey()
+ .count(Materialized.as(STORE_NAME))
+ .toStream()
+ .to(OUTPUT_TOPIC);
+ return builder;
+ }
+
+ private void closeStreams(final KafkaStreams kafkaStreams) {
+ kafkaStreams.close(STREAMS_CLOSE_TIMEOUT);
+ }
+
+ private KafkaStreams startStreams() throws Exception {
+ return startStreams(false);
+ }
+
+ private KafkaStreams startStreams(final boolean cleanUp) throws Exception {
+ final StreamsBuilder builder = buildCountTopology();
+ streams = new KafkaStreams(builder.build(), streamsConfig);
+ if (cleanUp) {
+ streams.cleanUp();
+ }
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
+ return streams;
+ }
+
+ private KafkaStreams startStreamsWithRestoreListener(final
StateRestoreListener listener) throws Exception {
+ final StreamsBuilder builder = buildCountTopology();
+ streams = new KafkaStreams(builder.build(), streamsConfig);
+ streams.setGlobalStateRestoreListener(listener);
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
+ return streams;
+ }
+
+ private Properties producerConfig() {
+ final Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
+ return props;
+ }
+
+ private Properties readCommittedConsumerConfig() {
+ final Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "verify-consumer-" +
consumerGroupCounter++);
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
LongDeserializer.class.getName());
+ return props;
+ }
+
+ private void produceRecords(final List<KeyValue<String, String>> records) {
+ IntegrationTestUtils.produceKeyValuesSynchronously(
+ INPUT_TOPIC,
+ records,
+ producerConfig(),
+ CLUSTER.time
+ );
+ }
+
+ private List<KeyValue<String, Long>> waitForOutput(final int
expectedCount) throws Exception {
+ return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+ readCommittedConsumerConfig(),
+ OUTPUT_TOPIC,
+ expectedCount
+ );
+ }
+
+ /**
+ * Extracts the latest count for each key from the output records.
+ * Since the count store emits updates, the last value for each key is the
current count.
+ */
+ private Map<String, Long> latestCountsFromOutput(final
List<KeyValue<String, Long>> output) {
+ final Map<String, Long> latest = new HashMap<>();
+ for (final KeyValue<String, Long> record : output) {
+ latest.put(record.key, record.value);
+ }
+ return latest;
+ }
+
+ /**
+ * Queries the state store via interactive queries and returns all
key-value pairs.
+ */
+ private Map<String, Long> queryStore(final KafkaStreams kafkaStreams)
throws Exception {
+ final ReadOnlyKeyValueStore<String, Long> store = kafkaStreams.store(
+ StoreQueryParameters.fromNameAndType(STORE_NAME,
QueryableStoreTypes.keyValueStore())
+ );
+ final Map<String, Long> result = new HashMap<>();
+ try (var iter = store.all()) {
+ while (iter.hasNext()) {
+ final KeyValue<String, Long> kv = iter.next();
+ result.put(kv.key, kv.value);
+ }
+ }
+ return result;
+ }
+
+ // -----------------------------------------------------------
+ // Column family inspection helpers
+ // -----------------------------------------------------------
+
+ private List<File> findAllStoreDirs(final String storeName) {
+ final String appId =
streamsConfig.getProperty(StreamsConfig.APPLICATION_ID_CONFIG);
+ return RocksDBStoreTestingUtils.findAllStoreDirs(stateDir, appId,
storeName);
+ }
+
+ /**
+ * Asserts that all store directories have the expected status (0L =
closed, 1L = open).
+ */
+ private void assertStoreStatus(final long expectedStatus) throws Exception
{
+ for (final File storeDir : findAllStoreDirs(STORE_NAME)) {
+ final Long status =
RocksDBStoreTestingUtils.readStoreStatus(storeDir);
+ assertEquals(expectedStatus, status,
+ "Store status in " + storeDir + " should be " +
(expectedStatus == 0L ? "closed" : "open"));
+ }
+ }
+
+ /**
+ * Asserts that all store directories have non-empty offsets in the CF.
+ */
+ private void assertOffsetsPopulated() throws Exception {
+ for (final File storeDir : findAllStoreDirs(STORE_NAME)) {
+ final Map<String, Long> offsets =
RocksDBStoreTestingUtils.readOffsets(storeDir);
+ if (!offsets.isEmpty()) {
+ return; // At least one store dir has offsets — with
partitioning, not all may
+ }
+ }
+ throw new AssertionError("Expected at least one store directory to
have populated offsets");
+ }
+
+ /**
+ * Start, produce, stop cleanly, restart (no cleanUp), produce more.
+ * Counts should be cumulative, proving offsets in the CF were persisted
and read back.
+ */
+ @ParameterizedTest
+ @ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE,
StreamsConfig.EXACTLY_ONCE_V2})
+ public void shouldPreserveStateAcrossCleanRestart(final String
processingGuarantee) throws Exception {
+ streamsConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
processingGuarantee);
+
+ final List<KeyValue<String, String>> batch1 = Arrays.asList(
+ new KeyValue<>("A", "v1"),
+ new KeyValue<>("B", "v1"),
+ new KeyValue<>("A", "v2")
+ );
+
+ startStreams(true);
+ produceRecords(batch1);
+ waitForOutput(batch1.size());
+
+ // Clean stop — verify CF state, then restart without cleanUp
+ closeStreams(streams);
+ streams = null;
+
+ assertStoreStatus(0L);
+ assertOffsetsPopulated();
+
+ // Restart without cleanUp so local state is preserved
+ startStreams(false);
+
+ final List<KeyValue<String, String>> batch2 = Arrays.asList(
+ new KeyValue<>("A", "v3"),
+ new KeyValue<>("B", "v2")
+ );
+ produceRecords(batch2);
+
+ final List<KeyValue<String, Long>> allOutput =
waitForOutput(batch1.size() + batch2.size());
+ final Map<String, Long> counts = latestCountsFromOutput(allOutput);
+
+ // A: 3 total (v1, v2, v3), B: 2 total (v1, v2)
+ assertEquals(3L, counts.get("A"), "A count should be cumulative across
restart");
+ assertEquals(2L, counts.get("B"), "B count should be cumulative across
restart");
+ }
+
+ /**
+ * Multiple restart cycles: start, produce batch1, stop, restart, produce
batch2,
+ * stop, restart, produce batch3. Final counts should equal totals across
all 3 batches.
+ */
+ @ParameterizedTest
+ @ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE,
StreamsConfig.EXACTLY_ONCE_V2})
+ public void shouldPreserveStateAcrossMultipleRestartCycles(final String
processingGuarantee) throws Exception {
+ streamsConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
processingGuarantee);
+
+ // Cycle 1
+ startStreams(true);
+ final List<KeyValue<String, String>> batch1 = Arrays.asList(
+ new KeyValue<>("A", "v1"),
+ new KeyValue<>("B", "v1")
+ );
+ produceRecords(batch1);
+ waitForOutput(batch1.size());
+ closeStreams(streams);
+ streams = null;
+
+ // Cycle 2
+ startStreams(false);
+ final List<KeyValue<String, String>> batch2 = Arrays.asList(
+ new KeyValue<>("A", "v2"),
+ new KeyValue<>("C", "v1")
+ );
+ produceRecords(batch2);
+ waitForOutput(batch1.size() + batch2.size());
+ closeStreams(streams);
+ streams = null;
+
+ // Cycle 3
+ startStreams(false);
+ final List<KeyValue<String, String>> batch3 = Arrays.asList(
+ new KeyValue<>("A", "v3"),
+ new KeyValue<>("B", "v2"),
+ new KeyValue<>("C", "v2")
+ );
+ produceRecords(batch3);
+
+ final int totalRecords = batch1.size() + batch2.size() + batch3.size();
+ final List<KeyValue<String, Long>> allOutput =
waitForOutput(totalRecords);
+ final Map<String, Long> counts = latestCountsFromOutput(allOutput);
+
+ // A: 3 (v1, v2, v3), B: 2 (v1, v2), C: 2 (v1, v2)
+ assertEquals(3L, counts.get("A"), "A count across 3 cycles");
+ assertEquals(2L, counts.get("B"), "B count across 3 cycles");
+ assertEquals(2L, counts.get("C"), "C count across 3 cycles");
+ }
+
+ /**
+ * After a clean shutdown, restarting should not require full changelog
restoration.
+ * Uses a TrackingRestoreListener to verify no records are restored on
restart.
+ */
+ @ParameterizedTest
+ @ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE,
StreamsConfig.EXACTLY_ONCE_V2})
+ public void shouldNotRestoreFromChangelogOnCleanRestart(final String
processingGuarantee) throws Exception {
+ streamsConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
processingGuarantee);
+
+ final List<KeyValue<String, String>> batch1 = Arrays.asList(
+ new KeyValue<>("A", "v1"),
+ new KeyValue<>("B", "v1"),
+ new KeyValue<>("A", "v2")
+ );
+
+ startStreams(true);
+ produceRecords(batch1);
+ waitForOutput(batch1.size());
+
+ closeStreams(streams);
+ streams = null;
+
+ // Verify CF state after clean shutdown: status=closed, offsets
populated
+ assertStoreStatus(0L);
+ assertOffsetsPopulated();
+
+ // Restart with a restore listener — should see 0 records restored
+ final TrackingRestoreListener restoreListener = new
TrackingRestoreListener();
+ startStreamsWithRestoreListener(restoreListener);
+
+ final List<KeyValue<String, String>> batch2 = Arrays.asList(
+ new KeyValue<>("A", "v3"),
+ new KeyValue<>("C", "v1")
+ );
+ produceRecords(batch2);
+
+ final List<KeyValue<String, Long>> allOutput =
waitForOutput(batch1.size() + batch2.size());
+ final Map<String, Long> counts = latestCountsFromOutput(allOutput);
+
+ assertEquals(3L, counts.get("A"), "A count should be cumulative");
+ assertEquals(1L, counts.get("B"), "B count should be preserved");
+ assertEquals(1L, counts.get("C"), "C count should reflect new record");
+ assertEquals(0L, restoreListener.totalRestored.get(),
+ "No records should be restored from changelog after clean
shutdown");
+ }
+
+ /**
+ * Edge case: start, reach RUNNING, stop cleanly without producing any
records,
+ * then restart and produce. The store was initialized but never committed.
+ */
+ @ParameterizedTest
+ @ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE,
StreamsConfig.EXACTLY_ONCE_V2})
+ public void shouldHandleCleanRestartWithNoDataProcessed(final String
processingGuarantee) throws Exception {
+ streamsConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
processingGuarantee);
+
+ // Start and immediately stop — no records produced
+ startStreams(true);
+ closeStreams(streams);
+ streams = null;
+
+ // After clean shutdown with no data, status should still be closed
+ assertStoreStatus(0L);
+
+ // Restart — should not treat empty CF as corruption.
+ // Capture logs from ProcessorStateManager and StateDirectory to verify
+ // no corruption-related warnings are emitted during restart.
+ try (final LogCaptureAppender stateManagerAppender =
LogCaptureAppender.createAndRegister(ProcessorStateManager.class);
+ final LogCaptureAppender stateDirAppender =
LogCaptureAppender.createAndRegister(StateDirectory.class)) {
+
+ startStreams(false);
+
+ final List<KeyValue<String, String>> records = Arrays.asList(
+ new KeyValue<>("A", "v1"),
+ new KeyValue<>("B", "v1")
+ );
+ produceRecords(records);
+
+ final List<KeyValue<String, Long>> output =
waitForOutput(records.size());
+ final Map<String, Long> counts = latestCountsFromOutput(output);
+
+ assertEquals(1L, counts.get("A"));
+ assertEquals(1L, counts.get("B"));
+
+ assertNoCorruptionWarnings(stateManagerAppender);
+ assertNoCorruptionWarnings(stateDirAppender);
+ }
+ }
+
+ private void assertNoCorruptionWarnings(final LogCaptureAppender appender)
{
+ for (final String message : appender.getMessages()) {
+ if (message.contains("did not find checkpoint offsets while stores
are not empty")) {
+ throw new AssertionError("Unexpected corruption warning in
logs: " + message);
+ }
+ if (message.contains("Failed to register startup state stores for
task")) {
+ throw new AssertionError("Unexpected startup store failure in
logs: " + message);
+ }
+ }
+ }
+
+ /**
+ * Validates that the in-store state (via interactive queries) is
preserved across
+ * a clean restart, not just the output topic.
+ */
+ @ParameterizedTest
+ @ValueSource(strings = {StreamsConfig.AT_LEAST_ONCE,
StreamsConfig.EXACTLY_ONCE_V2})
+ public void shouldVerifyStoreStateViaInteractiveQueriesAcrossRestart(final
String processingGuarantee) throws Exception {
+ streamsConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
processingGuarantee);
+
+ final List<KeyValue<String, String>> records = Arrays.asList(
+ new KeyValue<>("A", "v1"),
+ new KeyValue<>("B", "v1"),
+ new KeyValue<>("A", "v2")
+ );
+
+ startStreams(true);
+ produceRecords(records);
+ waitForOutput(records.size());
+
+ // Query store before restart
+ final Map<String, Long> countsBefore = queryStore(streams);
+ assertEquals(2L, countsBefore.get("A"));
+ assertEquals(1L, countsBefore.get("B"));
+
+ closeStreams(streams);
+ streams = null;
+
+ // Verify CF persisted correctly
+ assertStoreStatus(0L);
+ assertOffsetsPopulated();
+
+ // Restart and query again — counts should match
+ startStreams(false);
+ final Map<String, Long> countsAfter = queryStore(streams);
+
+ assertEquals(countsBefore, countsAfter,
+ "Store state via IQ should be identical after clean restart");
+ }
+
+ /**
+ * A StateRestoreListener that tracks the total number of records restored.
+ */
+ static class TrackingRestoreListener implements StateRestoreListener {
+ private final AtomicLong totalRestored = new AtomicLong(0);
+
+ @Override
+ public void onRestoreStart(final TopicPartition topicPartition,
+ final String storeName,
+ final long startingOffset,
+ final long endingOffset) {
+ // no-op
+ }
+
+ @Override
+ public void onBatchRestored(final TopicPartition topicPartition,
+ final String storeName,
+ final long batchEndOffset,
+ final long numRestored) {
+ totalRestored.addAndGet(numRestored);
+ }
+
+ @Override
+ public void onRestoreEnd(final TopicPartition topicPartition,
+ final String storeName,
+ final long totalRestored) {
+ // no-op
+ }
+ }
+}