C0urante commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1555264597
########## connect/runtime/src/test/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStoreTest.java: ########## @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.KafkaBasedLog; +import org.apache.kafka.connect.util.LoggingContext; +import org.apache.kafka.connect.util.TopicAdmin; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutionException; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.mockito.Mockito.mock; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.when; + +public class ConnectorOffsetBackingStoreTest { + + private static final String NAMESPACE = "namespace"; + // Connect format - any types should be accepted here + private static final Map<String, Object> OFFSET_KEY = Collections.singletonMap("key", "key"); + private static final Map<String, Object> OFFSET_VALUE = Collections.singletonMap("key", 12); + + // Serialized + private static final byte[] OFFSET_KEY_SERIALIZED = "key-serialized".getBytes(); + private static final byte[] OFFSET_VALUE_SERIALIZED = "value-serialized".getBytes(); + + private static final Exception PRODUCE_EXCEPTION = new KafkaException(); + + private final Converter keyConverter = mock(Converter.class); + private final Converter valueConverter = mock(Converter.class); Review Comment: We tend to use the `@Mock` annotation instead of final fields: ```suggestion @Mock private Converter keyConverter; @Mock private Converter valueConverter; ``` ########## connect/runtime/src/test/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStoreTest.java: ########## @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.KafkaBasedLog; +import org.apache.kafka.connect.util.LoggingContext; +import org.apache.kafka.connect.util.TopicAdmin; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutionException; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.mockito.Mockito.mock; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.when; + +public class ConnectorOffsetBackingStoreTest { Review Comment: We can use strict stubbing (note that this also requires a couple imports to be added): ```suggestion @RunWith(MockitoJUnitRunner.StrictStubs.class) public class ConnectorOffsetBackingStoreTest { ``` ########## checkstyle/suppressions.xml: ########## @@ -166,7 +166,7 @@ files="(KafkaConfigBackingStore|Values|ConnectMetricsRegistry).java"/> <suppress checks="NPathComplexity" - files="(DistributedHerder|AbstractHerder|RestClient|RestServer|JsonConverter|KafkaConfigBackingStore|FileStreamSourceTask|WorkerSourceTask|TopicAdmin).java"/> + files="(DistributedHerder|AbstractHerder|RestClient|RestServer|JsonConverter|KafkaConfigBackingStore|FileStreamSourceTask|WorkerSourceTask|TopicAdmin|ConnectorOffsetBackingStore).java"/> Review Comment: Is this change still necessary? ########## connect/runtime/src/test/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStoreTest.java: ########## @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.KafkaBasedLog; +import org.apache.kafka.connect.util.LoggingContext; +import org.apache.kafka.connect.util.TopicAdmin; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutionException; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.mockito.Mockito.mock; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.when; + +public class ConnectorOffsetBackingStoreTest { + + private static final String NAMESPACE = "namespace"; + // Connect format - any types should be accepted here + private static final Map<String, Object> OFFSET_KEY = Collections.singletonMap("key", "key"); + private static final Map<String, Object> OFFSET_VALUE = Collections.singletonMap("key", 12); + + // Serialized + private static final byte[] OFFSET_KEY_SERIALIZED = "key-serialized".getBytes(); + private static final byte[] OFFSET_VALUE_SERIALIZED = "value-serialized".getBytes(); + + private static final Exception PRODUCE_EXCEPTION = new KafkaException(); + + private final Converter keyConverter = mock(Converter.class); + private final Converter valueConverter = mock(Converter.class); + + + @Test + public void testFlushFailureWhenWriteToSecondaryStoreFailsForTombstoneOffsets() throws Exception { + + KafkaOffsetBackingStore connectorStore = setupOffsetBackingStoreWithProducer("topic1", false); + KafkaOffsetBackingStore workerStore = setupOffsetBackingStoreWithProducer("topic2", true); + + ConnectorOffsetBackingStore offsetBackingStore = ConnectorOffsetBackingStore.withConnectorAndWorkerStores( + () -> LoggingContext.forConnector("source-connector"), + workerStore, + connectorStore, + "offsets-topic", + mock(TopicAdmin.class)); + + + try { + offsetBackingStore.set(getSerialisedOffsets(OFFSET_KEY, OFFSET_KEY_SERIALIZED, null, null), (error, result) -> { + assertEquals(PRODUCE_EXCEPTION, error); + assertNull(result); + }).get(1000L, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + assertEquals(PRODUCE_EXCEPTION, e.getCause()); + } + } + + @Test + public void testFlushSuccessWhenWritesSucceedToBothPrimaryAndSecondaryStoresForTombstoneOffsets() throws Exception { + + KafkaOffsetBackingStore connectorStore = setupOffsetBackingStoreWithProducer("topic1", false); + KafkaOffsetBackingStore workerStore = setupOffsetBackingStoreWithProducer("topic2", false); + + ConnectorOffsetBackingStore offsetBackingStore = ConnectorOffsetBackingStore.withConnectorAndWorkerStores( + () -> LoggingContext.forConnector("source-connector"), + workerStore, + connectorStore, + "offsets-topic", + mock(TopicAdmin.class)); + + offsetBackingStore.set(getSerialisedOffsets(OFFSET_KEY, OFFSET_KEY_SERIALIZED, null, null), (error, result) -> { + assertNull(error); + assertNull(result); + }).get(1000L, TimeUnit.MILLISECONDS); + } + + @Test + public void testFlushSuccessWhenWriteToSecondaryStoreFailsForNonTombstoneOffsets() throws Exception { + + KafkaOffsetBackingStore connectorStore = setupOffsetBackingStoreWithProducer("topic1", false); + KafkaOffsetBackingStore workerStore = setupOffsetBackingStoreWithProducer("topic2", true); + + ConnectorOffsetBackingStore offsetBackingStore = ConnectorOffsetBackingStore.withConnectorAndWorkerStores( + () -> LoggingContext.forConnector("source-connector"), + workerStore, + connectorStore, + "offsets-topic", + mock(TopicAdmin.class)); + + offsetBackingStore.set(getSerialisedOffsets(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED), (error, result) -> { + assertNull(error); + assertNull(result); + }).get(1000L, TimeUnit.MILLISECONDS); + } + + @Test + public void testFlushSuccessWhenWritesToPrimaryAndSecondaryStoreSucceeds() throws Exception { + + KafkaOffsetBackingStore connectorStore = setupOffsetBackingStoreWithProducer("topic1", false); + KafkaOffsetBackingStore workerStore = setupOffsetBackingStoreWithProducer("topic2", false); + + ConnectorOffsetBackingStore offsetBackingStore = ConnectorOffsetBackingStore.withConnectorAndWorkerStores( + () -> LoggingContext.forConnector("source-connector"), + workerStore, + connectorStore, + "offsets-topic", + mock(TopicAdmin.class)); + + offsetBackingStore.set(getSerialisedOffsets(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED), (error, result) -> { + assertNull(error); + assertNull(result); + }).get(1000L, TimeUnit.MILLISECONDS); + } + + @Test + public void testFlushFailureWhenWritesToPrimaryStoreFailsAndSecondarySucceeds() throws Exception { + + KafkaOffsetBackingStore connectorStore = setupOffsetBackingStoreWithProducer("topic1", true); + KafkaOffsetBackingStore workerStore = setupOffsetBackingStoreWithProducer("topic2", false); + + ConnectorOffsetBackingStore offsetBackingStore = ConnectorOffsetBackingStore.withConnectorAndWorkerStores( + () -> LoggingContext.forConnector("source-connector"), + workerStore, + connectorStore, + "offsets-topic", + mock(TopicAdmin.class)); + + try { + offsetBackingStore.set(getSerialisedOffsets(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED), (error, result) -> { + assertEquals(PRODUCE_EXCEPTION, error); + assertNull(result); + }).get(1000L, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + assertEquals(PRODUCE_EXCEPTION, e.getCause()); + } + } + + @Test + public void testFlushFailureWhenWritesToPrimaryStoreFailsAndSecondarySucceedsForTombstoneRecords() throws Exception { + + KafkaOffsetBackingStore connectorStore = setupOffsetBackingStoreWithProducer("topic1", true); + KafkaOffsetBackingStore workerStore = setupOffsetBackingStoreWithProducer("topic2", false); + + ConnectorOffsetBackingStore offsetBackingStore = ConnectorOffsetBackingStore.withConnectorAndWorkerStores( + () -> LoggingContext.forConnector("source-connector"), + workerStore, + connectorStore, + "offsets-topic", + mock(TopicAdmin.class)); + + try { + offsetBackingStore.set(getSerialisedOffsets(OFFSET_KEY, OFFSET_KEY_SERIALIZED, null, null), (error, result) -> { + assertEquals(PRODUCE_EXCEPTION, error); + assertNull(result); + }).get(1000L, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + assertEquals(PRODUCE_EXCEPTION, e.getCause()); + } + } + + @Test + public void testFlushSuccessWhenWritesToPrimaryStoreSucceedsWithNoSecondaryStore() throws Exception { + + KafkaOffsetBackingStore connectorStore = setupOffsetBackingStoreWithProducer("topic1", false); + + ConnectorOffsetBackingStore offsetBackingStore = ConnectorOffsetBackingStore.withOnlyConnectorStore( + () -> LoggingContext.forConnector("source-connector"), + connectorStore, + "offsets-topic", + mock(TopicAdmin.class)); + + offsetBackingStore.set(getSerialisedOffsets(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED), (error, result) -> { + assertNull(error); + assertNull(result); + }).get(1000L, TimeUnit.MILLISECONDS); + + } + + @Test + public void testFlushFailureWhenWritesToPrimaryStoreFailsWithNoSecondaryStore() throws Exception { + + KafkaOffsetBackingStore connectorStore = setupOffsetBackingStoreWithProducer("topic1", true); + + ConnectorOffsetBackingStore offsetBackingStore = ConnectorOffsetBackingStore.withOnlyConnectorStore( + () -> LoggingContext.forConnector("source-connector"), + connectorStore, + "offsets-topic", + mock(TopicAdmin.class)); + + try { + offsetBackingStore.set(getSerialisedOffsets(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED), (error, result) -> { + assertEquals(PRODUCE_EXCEPTION, error); + assertNull(result); + }).get(1000L, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + assertEquals(PRODUCE_EXCEPTION, e.getCause()); + } + } + + @SuppressWarnings("unchecked") + private KafkaOffsetBackingStore setupOffsetBackingStoreWithProducer(String topic, boolean throwingProducer) { Review Comment: Maybe a shorter name like `createStore` could suffice? ########## connect/runtime/src/test/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStoreTest.java: ########## @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.KafkaBasedLog; +import org.apache.kafka.connect.util.LoggingContext; +import org.apache.kafka.connect.util.TopicAdmin; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutionException; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.mockito.Mockito.mock; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.when; + +public class ConnectorOffsetBackingStoreTest { + + private static final String NAMESPACE = "namespace"; + // Connect format - any types should be accepted here + private static final Map<String, Object> OFFSET_KEY = Collections.singletonMap("key", "key"); + private static final Map<String, Object> OFFSET_VALUE = Collections.singletonMap("key", 12); + + // Serialized + private static final byte[] OFFSET_KEY_SERIALIZED = "key-serialized".getBytes(); + private static final byte[] OFFSET_VALUE_SERIALIZED = "value-serialized".getBytes(); + + private static final Exception PRODUCE_EXCEPTION = new KafkaException(); + + private final Converter keyConverter = mock(Converter.class); + private final Converter valueConverter = mock(Converter.class); + + + @Test + public void testFlushFailureWhenWriteToSecondaryStoreFailsForTombstoneOffsets() throws Exception { + + KafkaOffsetBackingStore connectorStore = setupOffsetBackingStoreWithProducer("topic1", false); + KafkaOffsetBackingStore workerStore = setupOffsetBackingStoreWithProducer("topic2", true); + + ConnectorOffsetBackingStore offsetBackingStore = ConnectorOffsetBackingStore.withConnectorAndWorkerStores( + () -> LoggingContext.forConnector("source-connector"), + workerStore, + connectorStore, + "offsets-topic", + mock(TopicAdmin.class)); + Review Comment: ```suggestion ``` ########## connect/runtime/src/test/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStoreTest.java: ########## @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.KafkaBasedLog; +import org.apache.kafka.connect.util.LoggingContext; +import org.apache.kafka.connect.util.TopicAdmin; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutionException; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.mockito.Mockito.mock; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.when; + +public class ConnectorOffsetBackingStoreTest { + + private static final String NAMESPACE = "namespace"; + // Connect format - any types should be accepted here + private static final Map<String, Object> OFFSET_KEY = Collections.singletonMap("key", "key"); + private static final Map<String, Object> OFFSET_VALUE = Collections.singletonMap("key", 12); + + // Serialized + private static final byte[] OFFSET_KEY_SERIALIZED = "key-serialized".getBytes(); + private static final byte[] OFFSET_VALUE_SERIALIZED = "value-serialized".getBytes(); + + private static final Exception PRODUCE_EXCEPTION = new KafkaException(); + + private final Converter keyConverter = mock(Converter.class); + private final Converter valueConverter = mock(Converter.class); + + + @Test + public void testFlushFailureWhenWriteToSecondaryStoreFailsForTombstoneOffsets() throws Exception { + Review Comment: These newlines right after method declarations aren't necessary and add a bit of bloat to the test suite; can we remove all of them? ########## connect/runtime/src/test/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStoreTest.java: ########## @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.KafkaBasedLog; +import org.apache.kafka.connect.util.LoggingContext; +import org.apache.kafka.connect.util.TopicAdmin; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutionException; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.mockito.Mockito.mock; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.when; + +public class ConnectorOffsetBackingStoreTest { + + private static final String NAMESPACE = "namespace"; + // Connect format - any types should be accepted here + private static final Map<String, Object> OFFSET_KEY = Collections.singletonMap("key", "key"); + private static final Map<String, Object> OFFSET_VALUE = Collections.singletonMap("key", 12); + + // Serialized + private static final byte[] OFFSET_KEY_SERIALIZED = "key-serialized".getBytes(); + private static final byte[] OFFSET_VALUE_SERIALIZED = "value-serialized".getBytes(); + + private static final Exception PRODUCE_EXCEPTION = new KafkaException(); + + private final Converter keyConverter = mock(Converter.class); + private final Converter valueConverter = mock(Converter.class); + + + @Test + public void testFlushFailureWhenWriteToSecondaryStoreFailsForTombstoneOffsets() throws Exception { + + KafkaOffsetBackingStore connectorStore = setupOffsetBackingStoreWithProducer("topic1", false); + KafkaOffsetBackingStore workerStore = setupOffsetBackingStoreWithProducer("topic2", true); + + ConnectorOffsetBackingStore offsetBackingStore = ConnectorOffsetBackingStore.withConnectorAndWorkerStores( + () -> LoggingContext.forConnector("source-connector"), + workerStore, + connectorStore, + "offsets-topic", + mock(TopicAdmin.class)); + + + try { + offsetBackingStore.set(getSerialisedOffsets(OFFSET_KEY, OFFSET_KEY_SERIALIZED, null, null), (error, result) -> { + assertEquals(PRODUCE_EXCEPTION, error); + assertNull(result); + }).get(1000L, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { Review Comment: Doesn't this cause the test to pass if no exception is thrown? I think we may want to use `assertThrows` here instead of try/catch. ########## connect/runtime/src/test/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStoreTest.java: ########## @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.KafkaBasedLog; +import org.apache.kafka.connect.util.LoggingContext; +import org.apache.kafka.connect.util.TopicAdmin; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutionException; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.mockito.Mockito.mock; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.when; + +public class ConnectorOffsetBackingStoreTest { + + private static final String NAMESPACE = "namespace"; + // Connect format - any types should be accepted here + private static final Map<String, Object> OFFSET_KEY = Collections.singletonMap("key", "key"); + private static final Map<String, Object> OFFSET_VALUE = Collections.singletonMap("key", 12); + + // Serialized + private static final byte[] OFFSET_KEY_SERIALIZED = "key-serialized".getBytes(); + private static final byte[] OFFSET_VALUE_SERIALIZED = "value-serialized".getBytes(); + + private static final Exception PRODUCE_EXCEPTION = new KafkaException(); + + private final Converter keyConverter = mock(Converter.class); + private final Converter valueConverter = mock(Converter.class); + + + @Test + public void testFlushFailureWhenWriteToSecondaryStoreFailsForTombstoneOffsets() throws Exception { + + KafkaOffsetBackingStore connectorStore = setupOffsetBackingStoreWithProducer("topic1", false); + KafkaOffsetBackingStore workerStore = setupOffsetBackingStoreWithProducer("topic2", true); + + ConnectorOffsetBackingStore offsetBackingStore = ConnectorOffsetBackingStore.withConnectorAndWorkerStores( + () -> LoggingContext.forConnector("source-connector"), + workerStore, + connectorStore, + "offsets-topic", + mock(TopicAdmin.class)); + + + try { + offsetBackingStore.set(getSerialisedOffsets(OFFSET_KEY, OFFSET_KEY_SERIALIZED, null, null), (error, result) -> { + assertEquals(PRODUCE_EXCEPTION, error); + assertNull(result); Review Comment: Doesn't this cause the test to pass if the callback is never invoked? I can think of two alternatives: 1. Use something like an `AtomicBoolean callbackInvoked` that gets set to true inside the callback, then assert that it's true outside of the callback 2. Capture the result and error values given to the callback (probably instead `AtomicReference` fields) so that assertions can be made on them outside of the callback -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
