johnnychhsu commented on code in PR #16126:
URL: https://github.com/apache/kafka/pull/16126#discussion_r1627506866


##########
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java:
##########
@@ -194,121 +234,1293 @@ public void setUp() {
     }
 
     @Test
-    public void testTaskCountRecordsAndGenerations() throws Exception {
-        expectConfigure();
-        expectStart(Collections.emptyList(), Collections.emptyMap());
+    public void testStartStop() {
+        props.put("config.storage.min.insync.replicas", "3");
+        props.put("config.storage.max.message.bytes", "1001");
+        createStore();
 
-        // Task configs should read to end, write to the log, read to end, 
write root, then read to end again
-        expectReadToEnd(new LinkedHashMap<>());
+        when(configLog.partitionCount()).thenReturn(1);
+
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+
+        verifyConfigure();
+        assertEquals(TOPIC, capturedTopic.getValue());
+        assertEquals("org.apache.kafka.common.serialization.StringSerializer", 
capturedProducerProps.getValue().get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
+        
assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", 
capturedProducerProps.getValue().get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
+        
assertEquals("org.apache.kafka.common.serialization.StringDeserializer", 
capturedConsumerProps.getValue().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
+        
assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", 
capturedConsumerProps.getValue().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
+
+        assertEquals(TOPIC, capturedNewTopic.getValue().name());
+        assertEquals(1, capturedNewTopic.getValue().numPartitions());
+        assertEquals(TOPIC_REPLICATION_FACTOR, 
capturedNewTopic.getValue().replicationFactor());
+        assertEquals("3", 
capturedNewTopic.getValue().configs().get("min.insync.replicas"));
+        assertEquals("1001", 
capturedNewTopic.getValue().configs().get("max.message.bytes"));
+
+        configStorage.start();
+        configStorage.stop();
+
+        verify(configLog).start();
+        verify(configLog).stop();
+    }
+
+    @Test
+    public void testSnapshotCannotMutateInternalState() {
+        props.put("config.storage.min.insync.replicas", "3");
+        props.put("config.storage.max.message.bytes", "1001");
+        createStore();
+
+        when(configLog.partitionCount()).thenReturn(1);
+
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+        verifyConfigure();
+
+        configStorage.start();
+        ClusterConfigState snapshot = configStorage.snapshot();
+        assertNotSame(snapshot.connectorTaskCounts, 
configStorage.connectorTaskCounts);
+        assertNotSame(snapshot.connectorConfigs, 
configStorage.connectorConfigs);
+        assertNotSame(snapshot.connectorTargetStates, 
configStorage.connectorTargetStates);
+        assertNotSame(snapshot.taskConfigs, configStorage.taskConfigs);
+        assertNotSame(snapshot.connectorTaskCountRecords, 
configStorage.connectorTaskCountRecords);
+        assertNotSame(snapshot.connectorTaskConfigGenerations, 
configStorage.connectorTaskConfigGenerations);
+        assertNotSame(snapshot.connectorsPendingFencing, 
configStorage.connectorsPendingFencing);
+        assertNotSame(snapshot.inconsistentConnectors, 
configStorage.inconsistent);
+    }
+
+    @Test
+    public void testPutConnectorConfig() throws Exception {
+        when(configLog.partitionCount()).thenReturn(1);
+
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+        verifyConfigure();
+        configStorage.start();
+
+        // Null before writing
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(-1, configState.offset());
+        assertNull(configState.connectorConfig(CONNECTOR_IDS.get(0)));
+        assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1)));
+
+        String configKey = CONNECTOR_CONFIG_KEYS.get(1);
+        String targetStateKey = TARGET_STATE_KEYS.get(1);
+
+        
doAnswer(expectReadToEnd(Collections.singletonMap(CONNECTOR_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(0))))
+                
.doAnswer(expectReadToEnd(Collections.singletonMap(CONNECTOR_CONFIG_KEYS.get(1),
 CONFIGS_SERIALIZED.get(1))))
+                // Config deletion
+                .doAnswer(expectReadToEnd(new LinkedHashMap<String, byte[]>() 
{{
+                            put(configKey, null);
+                            put(targetStateKey, null);
+                        }})
+                ).when(configLog).readToEnd();
+
+        // Writing should block until it is written and read back from Kafka
         expectConvertWriteRead(
-                TASK_CONFIG_KEYS.get(0), 
KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
+                CONNECTOR_CONFIG_KEYS.get(0), 
KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
                 "properties", SAMPLE_CONFIGS.get(0));
+
+        configStorage.putConnectorConfig(CONNECTOR_IDS.get(0), 
SAMPLE_CONFIGS.get(0), null);
+        configState = configStorage.snapshot();
+
+        assertEquals(1, configState.offset());
+        assertEquals(SAMPLE_CONFIGS.get(0), 
configState.connectorConfig(CONNECTOR_IDS.get(0)));
+        assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1)));
+        
verify(configUpdateListener).onConnectorConfigUpdate(CONNECTOR_IDS.get(0));
+
+        // Second should also block and all configs should still be available
         expectConvertWriteRead(
-                TASK_CONFIG_KEYS.get(1), 
KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1),
+                CONNECTOR_CONFIG_KEYS.get(1), 
KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1),
                 "properties", SAMPLE_CONFIGS.get(1));
-        expectReadToEnd(new LinkedHashMap<>());
+
+        configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), 
SAMPLE_CONFIGS.get(1), null);
+        configState = configStorage.snapshot();
+
+        assertEquals(2, configState.offset());
+        assertEquals(SAMPLE_CONFIGS.get(0), 
configState.connectorConfig(CONNECTOR_IDS.get(0)));
+        assertEquals(SAMPLE_CONFIGS.get(1), 
configState.connectorConfig(CONNECTOR_IDS.get(1)));
+        
verify(configUpdateListener).onConnectorConfigUpdate(CONNECTOR_IDS.get(1));
+
+        // Config deletion
+        when(producerFuture.get(anyLong(), 
any(TimeUnit.class))).thenReturn(null);
+        when(converter.toConnectData(TOPIC, null)).thenReturn(new 
SchemaAndValue(null, null));
+        
when(configLog.sendWithReceipt(AdditionalMatchers.or(Mockito.eq(configKey), 
Mockito.eq(targetStateKey)),
+                Mockito.isNull())).thenReturn(producerFuture);
+
+        // Deletion should remove the second one we added
+        configStorage.removeConnectorConfig(CONNECTOR_IDS.get(1));
+        configState = configStorage.snapshot();
+
+        assertEquals(4, configState.offset());
+        assertEquals(SAMPLE_CONFIGS.get(0), 
configState.connectorConfig(CONNECTOR_IDS.get(0)));
+        assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1)));
+        assertNull(configState.targetState(CONNECTOR_IDS.get(1)));
+        
verify(configUpdateListener).onConnectorConfigRemove(CONNECTOR_IDS.get(1));
+
+        configStorage.stop();
+        verify(configLog).stop();
+    }
+
+    @Test
+    public void testPutConnectorConfigWithTargetState() throws Exception {
+        when(configLog.partitionCount()).thenReturn(1);
+
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+        verifyConfigure();
+        configStorage.start();
+
+        // Null before writing
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(-1, configState.offset());
+        assertNull(configState.connectorConfig(CONNECTOR_IDS.get(0)));
+        assertNull(configState.targetState(CONNECTOR_IDS.get(0)));
+
+        doAnswer(expectReadToEnd(new LinkedHashMap<String, byte[]>() {{
+                    put(TARGET_STATE_KEYS.get(0), 
TARGET_STATES_SERIALIZED.get(2));
+                    put(CONNECTOR_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(0));
+                }})
+        ).when(configLog).readToEnd();
+
+        // We expect to write the target state first, followed by the config 
write and then a read to end
         expectConvertWriteRead(
-                COMMIT_TASKS_CONFIG_KEYS.get(0), 
KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2),
-                "tasks", 2); // Starts with 0 tasks, after update has 2
-        // As soon as root is rewritten, we should see a callback notifying us 
that we reconfigured some tasks
-        configUpdateListener.onTaskConfigUpdate(Arrays.asList(TASK_IDS.get(0), 
TASK_IDS.get(1)));
-        EasyMock.expectLastCall();
+                TARGET_STATE_KEYS.get(0), 
KafkaConfigBackingStore.TARGET_STATE_V1, TARGET_STATES_SERIALIZED.get(2),
+                "state.v2", TargetState.STOPPED.name());
+
+        expectConvertWriteRead(
+                CONNECTOR_CONFIG_KEYS.get(0), 
KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
+                "properties", SAMPLE_CONFIGS.get(0));
+
+        // Writing should block until it is written and read back from Kafka
+        configStorage.putConnectorConfig(CONNECTOR_IDS.get(0), 
SAMPLE_CONFIGS.get(0), TargetState.STOPPED);
+        configState = configStorage.snapshot();
+        assertEquals(2, configState.offset());
+        assertEquals(TargetState.STOPPED, 
configState.targetState(CONNECTOR_IDS.get(0)));
+        assertEquals(SAMPLE_CONFIGS.get(0), 
configState.connectorConfig(CONNECTOR_IDS.get(0)));
+
+        // We don't expect the config update listener's 
onConnectorTargetStateChange hook to be invoked
+        verify(configUpdateListener, 
never()).onConnectorTargetStateChange(anyString());
+
+        
verify(configUpdateListener).onConnectorConfigUpdate(CONNECTOR_IDS.get(0));
+
+        configStorage.stop();
+        verify(configLog).stop();
+    }
+
+    @Test
+    public void testPutConnectorConfigProducerError() throws Exception {
+        when(configLog.partitionCount()).thenReturn(1);
+
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+        verifyConfigure();
+        configStorage.start();
+
+        when(converter.fromConnectData(TOPIC, 
KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, 
CONNECTOR_CONFIG_STRUCTS.get(0)))
+                .thenReturn(CONFIGS_SERIALIZED.get(0));
+        when(configLog.sendWithReceipt(anyString(), 
any(byte[].class))).thenReturn(producerFuture);
+
+        // Verify initial state
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(-1, configState.offset());
+        assertEquals(0, configState.connectors().size());
+
+        Exception thrownException = new ExecutionException(new 
TopicAuthorizationException(Collections.singleton("test")));
+        when(producerFuture.get(anyLong(), 
any(TimeUnit.class))).thenThrow(thrownException);
+
+        // verify that the producer exception from KafkaBasedLog::send is 
propagated
+        ConnectException e = assertThrows(ConnectException.class, () -> 
configStorage.putConnectorConfig(CONNECTOR_IDS.get(0),
+                SAMPLE_CONFIGS.get(0), null));
+        assertTrue(e.getMessage().contains("Error writing connector 
configuration to Kafka"));
+        assertEquals(thrownException, e.getCause());
+
+        configStorage.stop();
+        verify(configLog).stop();
+    }
+
+    @Test
+    public void testRemoveConnectorConfigSlowProducer() throws Exception {
+        when(configLog.partitionCount()).thenReturn(1);
+
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+        verifyConfigure();
+        configStorage.start();
+
+        @SuppressWarnings("unchecked")
+        Future<RecordMetadata> connectorConfigProducerFuture = 
mock(Future.class);
+
+        @SuppressWarnings("unchecked")
+        Future<RecordMetadata> targetStateProducerFuture = mock(Future.class);
+
+        when(configLog.sendWithReceipt(anyString(), isNull()))
+                // tombstone for the connector config
+                .thenReturn(connectorConfigProducerFuture)
+                // tombstone for the connector target state
+                .thenReturn(targetStateProducerFuture);
+
+        
when(connectorConfigProducerFuture.get(eq(READ_WRITE_TOTAL_TIMEOUT_MS), 
any(TimeUnit.class)))
+                .thenAnswer((Answer<RecordMetadata>) invocation -> {
+                    time.sleep(READ_WRITE_TOTAL_TIMEOUT_MS - 1000);
+                    return null;
+                });
+
+        // the future get timeout is expected to be reduced according to how 
long the previous Future::get took
+        when(targetStateProducerFuture.get(eq(1000L), any(TimeUnit.class)))
+                .thenAnswer((Answer<RecordMetadata>) invocation -> {
+                    time.sleep(1000);
+                    return null;
+                });
+
+        @SuppressWarnings("unchecked")
+        Future<Void> future = mock(Future.class);
+        when(configLog.readToEnd()).thenReturn(future);
+
+        // the Future::get calls on the previous two producer futures 
exhausted the overall timeout; so expect the
+        // timeout on the log read future to be 0
+        when(future.get(eq(0L), any(TimeUnit.class))).thenReturn(null);
+
+        configStorage.removeConnectorConfig("test-connector");
+        configStorage.stop();
+        verify(configLog).stop();
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testWritePrivileges() throws Exception {
+        // With exactly.once.source.support = preparing (or also, "enabled"), 
we need to use a transactional producer
+        // to write some types of messages to the config topic
+        props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "preparing");
+        createStore();
+
+        when(configLog.partitionCount()).thenReturn(1);
+
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+        verifyConfigure();
+        configStorage.start();
+
+        // Try and fail to write a task count record to the config topic 
without write privileges
+        when(converter.fromConnectData(TOPIC, 
KafkaConfigBackingStore.TASK_COUNT_RECORD_V0, 
CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(0)))
+                .thenReturn(CONFIGS_SERIALIZED.get(0));
+
+        // Should fail the first time since we haven't claimed write privileges
+        assertThrows(IllegalStateException.class, () -> 
configStorage.putTaskCountRecord(CONNECTOR_IDS.get(0), 6));
+
+        // Claim write privileges
+        
doReturn(fencableProducer).when(configStorage).createFencableProducer();
+        // And write the task count record successfully
+        
when(fencableProducer.send(any(ProducerRecord.class))).thenReturn(null);
+        
doAnswer(expectReadToEnd(Collections.singletonMap(CONNECTOR_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(0))))
+                
.doAnswer(expectReadToEnd(Collections.singletonMap(CONNECTOR_CONFIG_KEYS.get(1),
 CONFIGS_SERIALIZED.get(2))))
+                .when(configLog).readToEnd();
+        when(converter.toConnectData(TOPIC, CONFIGS_SERIALIZED.get(0)))
+                .thenReturn(new SchemaAndValue(null, 
structToMap(CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(0))));
+
+        // Should succeed now
+        configStorage.claimWritePrivileges();
+        configStorage.putTaskCountRecord(CONNECTOR_IDS.get(0), 6);
+
+        verify(fencableProducer).beginTransaction();
+        verify(fencableProducer).commitTransaction();
+
+        // Try to write a connector config
+        when(converter.fromConnectData(TOPIC, 
KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, 
CONNECTOR_CONFIG_STRUCTS.get(0)))
+                .thenReturn(CONFIGS_SERIALIZED.get(1));
+        // Get fenced out
+        doThrow(new ProducerFencedException("Better luck next time"))
+                .doNothing()
+                .when(fencableProducer).commitTransaction();
+
+        // Should fail again when we get fenced out
+        assertThrows(PrivilegedWriteException.class, () -> 
configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(0), 
null));
+
+        verify(fencableProducer, times(2)).beginTransaction();
+        verify(fencableProducer).close(Duration.ZERO);
+
+        // Should fail if we retry without reclaiming write privileges
+        assertThrows(IllegalStateException.class, () -> 
configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(0), 
null));
+
+        // In the meantime, write a target state (which doesn't require write 
privileges)
+        when(converter.fromConnectData(TOPIC, 
KafkaConfigBackingStore.TARGET_STATE_V1, TARGET_STATE_PAUSED))
+                .thenReturn(CONFIGS_SERIALIZED.get(1));
+        when(configLog.sendWithReceipt("target-state-" + CONNECTOR_IDS.get(1), 
CONFIGS_SERIALIZED.get(1)))
+                .thenReturn(producerFuture);
+        when(producerFuture.get(anyLong(), 
any(TimeUnit.class))).thenReturn(null);
+
+        // Should succeed even without write privileges (target states can be 
written by anyone)
+        configStorage.putTargetState(CONNECTOR_IDS.get(1), TargetState.PAUSED);
+
+        // Reclaim write privileges and successfully write the config
+        when(converter.toConnectData(TOPIC, CONFIGS_SERIALIZED.get(2)))
+                .thenReturn(new SchemaAndValue(null, 
structToMap(CONNECTOR_CONFIG_STRUCTS.get(0))));
+
+        // Should succeed if we re-claim write privileges
+        configStorage.claimWritePrivileges();
+        configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), 
SAMPLE_CONFIGS.get(0), null);
+
+        verify(fencableProducer, times(3)).beginTransaction();
+        verify(fencableProducer, times(3)).commitTransaction();
+        
verify(configUpdateListener).onConnectorConfigUpdate(CONNECTOR_IDS.get(1));
+
+        configStorage.stop();
+        verify(configLog).stop();
+        verify(configStorage, times(2)).createFencableProducer();
+        verify(fencableProducer, times(2)).close(Duration.ZERO);
+    }
+
+    @Test
+    public void testRestoreTargetStateUnexpectedDeletion() {
+        List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+                new ConsumerRecord<>(TOPIC, 0, 0, 0L, 
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(0), new RecordHeaders(), 
Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 1, 0L, 
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(1), new RecordHeaders(), 
Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 2, 0L, 
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
+                        CONFIGS_SERIALIZED.get(2), new RecordHeaders(), 
Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 3, 0L, 
TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(3), new RecordHeaders(), 
Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 4, 0L, 
TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(4), new RecordHeaders(), 
Optional.empty()));
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
+        deserialized.put(CONFIGS_SERIALIZED.get(0), 
CONNECTOR_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(1), 
TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(2), 
TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(3), null);
+        deserialized.put(CONFIGS_SERIALIZED.get(4), 
TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+        logOffset = 5;
+
+        expectStart(existingRecords, deserialized);
+        when(configLog.partitionCount()).thenReturn(1);
+
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+        verifyConfigure();
+        configStorage.start();
+
+        // The target state deletion should reset the state to STARTED
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(5, configState.offset()); // Should always be next to be 
read, even if uncommitted
+        assertEquals(Collections.singletonList(CONNECTOR_IDS.get(0)), new 
ArrayList<>(configState.connectors()));
+        assertEquals(TargetState.STARTED, 
configState.targetState(CONNECTOR_IDS.get(0)));
+
+        configStorage.stop();
+        verify(configLog).stop();
+    }
+
+    @Test
+    public void testRestoreTargetState() {
+        List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+                new ConsumerRecord<>(TOPIC, 0, 0, 0L, 
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(0), new RecordHeaders(), 
Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 1, 0L, 
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(1), new RecordHeaders(), 
Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 2, 0L, 
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
+                        CONFIGS_SERIALIZED.get(2), new RecordHeaders(), 
Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 3, 0L, 
TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(3), new RecordHeaders(), 
Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 4, 0L, 
TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(4), new RecordHeaders(), 
Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 5, 0L, 
TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(1),
+                        CONFIGS_SERIALIZED.get(5), new RecordHeaders(), 
Optional.empty()));
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
+        deserialized.put(CONFIGS_SERIALIZED.get(0), 
CONNECTOR_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(1), 
TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(2), 
TASK_CONFIG_STRUCTS.get(0));
+        // A worker running an older version wrote this target state; make 
sure we can handle it correctly
+        deserialized.put(CONFIGS_SERIALIZED.get(3), 
TARGET_STATE_PAUSED_LEGACY);
+        deserialized.put(CONFIGS_SERIALIZED.get(4), 
TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+        deserialized.put(CONFIGS_SERIALIZED.get(5), TARGET_STATE_STOPPED);
+        logOffset = 6;
+
+        expectStart(existingRecords, deserialized);
+
+        // Shouldn't see any callbacks since this is during startup
+        when(configLog.partitionCount()).thenReturn(1);
+
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+        verifyConfigure();
+        configStorage.start();
+
+        // Should see a single connector with initial state paused
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(6, configState.offset()); // Should always be next to be 
read, even if uncommitted
+        assertEquals(Collections.singletonList(CONNECTOR_IDS.get(0)), new 
ArrayList<>(configState.connectors()));
+        assertEquals(TargetState.PAUSED, 
configState.targetState(CONNECTOR_IDS.get(0)));
+        assertEquals(TargetState.STOPPED, 
configState.targetState(CONNECTOR_IDS.get(1)));
+
+        configStorage.stop();
+        verify(configLog).stop();
+    }
+
+    @Test
+    public void testRestore() {
+        // Restoring data should notify only of the latest values after 
loading is complete. This also validates
+        // that inconsistent state is ignored.
+
+        // Overwrite each type at least once to ensure we see the latest data 
after loading
+        List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+                new ConsumerRecord<>(TOPIC, 0, 0, 0L, 
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(0), new RecordHeaders(), 
Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 1, 0L, 
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(1), new RecordHeaders(), 
Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 2, 0L, 
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(2), new RecordHeaders(), 
Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 3, 0L, 
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
+                        CONFIGS_SERIALIZED.get(3), new RecordHeaders(), 
Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 4, 0L, 
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(4), new RecordHeaders(), 
Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 5, 0L, 
TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(5), new RecordHeaders(), 
Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 6, 0L, 
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_TASK_COUNT_RECORD_KEYS.get(1),
+                        CONFIGS_SERIALIZED.get(6), new RecordHeaders(), 
Optional.empty()),
+                // Connector after root update should make it through, task 
update shouldn't
+                new ConsumerRecord<>(TOPIC, 0, 7, 0L, 
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(7), new RecordHeaders(), 
Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 8, 0L, 
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(8), new RecordHeaders(), 
Optional.empty()));
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
+        deserialized.put(CONFIGS_SERIALIZED.get(0), 
CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(1), 
CONNECTOR_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(2), 
TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(3), 
TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(4), 
CONNECTOR_CONFIG_STRUCTS.get(1));
+        deserialized.put(CONFIGS_SERIALIZED.get(5), 
TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+        deserialized.put(CONFIGS_SERIALIZED.get(6), 
CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(1));
+        deserialized.put(CONFIGS_SERIALIZED.get(7), 
CONNECTOR_CONFIG_STRUCTS.get(2));
+        deserialized.put(CONFIGS_SERIALIZED.get(8), 
TASK_CONFIG_STRUCTS.get(1));
+        logOffset = 9;
+
+        expectStart(existingRecords, deserialized);
+        when(configLog.partitionCount()).thenReturn(1);
+
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+        verifyConfigure();
+        configStorage.start();
+
+        // Should see a single connector and its config should be the last one 
seen anywhere in the log
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(logOffset, configState.offset()); // Should always be 
next to be read, even if uncommitted
+        assertEquals(Collections.singletonList(CONNECTOR_IDS.get(0)), new 
ArrayList<>(configState.connectors()));
+        assertEquals(TargetState.STARTED, 
configState.targetState(CONNECTOR_IDS.get(0)));
+        // CONNECTOR_CONFIG_STRUCTS[2] -> SAMPLE_CONFIGS[2]
+        assertEquals(SAMPLE_CONFIGS.get(2), 
configState.connectorConfig(CONNECTOR_IDS.get(0)));
+        // Should see 2 tasks for that connector. Only config updates before 
the root key update should be reflected
+        assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), 
configState.tasks(CONNECTOR_IDS.get(0)));
+        // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0]
+        assertEquals(SAMPLE_CONFIGS.get(0), 
configState.taskConfig(TASK_IDS.get(0)));
+        assertEquals(SAMPLE_CONFIGS.get(0), 
configState.taskConfig(TASK_IDS.get(1)));
+        assertEquals(9, (int) 
configState.taskCountRecord(CONNECTOR_IDS.get(1)));
+        assertEquals(Collections.EMPTY_SET, 
configState.inconsistentConnectors());
+        assertEquals(Collections.singleton("connector1"), 
configState.connectorsPendingFencing);
+
+        // Shouldn't see any callbacks since this is during startup
+        configStorage.stop();
+        verify(configLog).stop();
+    }
+
+    @Test
+    public void testRestoreConnectorDeletion() {
+        // Restoring data should notify only of the latest values after 
loading is complete. This also validates
+        // that inconsistent state is ignored.
+
+        // Overwrite each type at least once to ensure we see the latest data 
after loading
+        List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+                new ConsumerRecord<>(TOPIC, 0, 0, 0L, 
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(0), new RecordHeaders(), 
Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 1, 0L, 
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(1), new RecordHeaders(), 
Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 2, 0L, 
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
+                        CONFIGS_SERIALIZED.get(2), new RecordHeaders(), 
Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 3, 0L, 
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(3), new RecordHeaders(), 
Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 4, 0L, 
TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(4), new RecordHeaders(), 
Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 5, 0L, 
TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(5), new RecordHeaders(), 
Optional.empty()));
+
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
+        deserialized.put(CONFIGS_SERIALIZED.get(0), 
CONNECTOR_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(1), 
TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(2), 
TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(3), null);
+        deserialized.put(CONFIGS_SERIALIZED.get(4), null);
+        deserialized.put(CONFIGS_SERIALIZED.get(5), 
TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+
+        logOffset = 6;
+        expectStart(existingRecords, deserialized);
+        when(configLog.partitionCount()).thenReturn(1);
+
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+        verifyConfigure();
+        configStorage.start();
+
+        // Should see a single connector and its config should be the last one 
seen anywhere in the log
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(6, configState.offset()); // Should always be next to be 
read, even if uncommitted
+        assertTrue(configState.connectors().isEmpty());
+
+        // Shouldn't see any callbacks since this is during startup
+        configStorage.stop();
+        verify(configLog).stop();
+    }
+
+    @Test
+    public void testRestoreZeroTasks()  {
+        // Restoring data should notify only of the latest values after 
loading is complete. This also validates
+        // that inconsistent state is ignored.
+
+        // Overwrite each type at least once to ensure we see the latest data 
after loading
+        List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+                new ConsumerRecord<>(TOPIC, 0, 0, 0L, 
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(0), new RecordHeaders(), 
Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 1, 0L, 
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(1), new RecordHeaders(), 
Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 2, 0L, 
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
+                        CONFIGS_SERIALIZED.get(2), new RecordHeaders(), 
Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 3, 0L, 
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(3), new RecordHeaders(), 
Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 4, 0L, 
TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(4), new RecordHeaders(), 
Optional.empty()),
+                // Connector after root update should make it through, task 
update shouldn't
+                new ConsumerRecord<>(TOPIC, 0, 5, 0L, 
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(5), new RecordHeaders(), 
Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 6, 0L, 
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(6), new RecordHeaders(), 
Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 7, 0L, 
TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(7), new RecordHeaders(), 
Optional.empty()));
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
+        deserialized.put(CONFIGS_SERIALIZED.get(0), 
CONNECTOR_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(1), 
TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(2), 
TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(3), 
CONNECTOR_CONFIG_STRUCTS.get(1));
+        deserialized.put(CONFIGS_SERIALIZED.get(4), 
TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+        deserialized.put(CONFIGS_SERIALIZED.get(5), 
CONNECTOR_CONFIG_STRUCTS.get(2));
+        deserialized.put(CONFIGS_SERIALIZED.get(6), 
TASK_CONFIG_STRUCTS.get(1));
+        deserialized.put(CONFIGS_SERIALIZED.get(7), 
TASKS_COMMIT_STRUCT_ZERO_TASK_CONNECTOR);
+        logOffset = 8;
+        expectStart(existingRecords, deserialized);
+        when(configLog.partitionCount()).thenReturn(1);
+
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+        verifyConfigure();
+        configStorage.start();
+
+        // Should see a single connector and its config should be the last one 
seen anywhere in the log
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(8, configState.offset()); // Should always be next to be 
read, even if uncommitted
+        assertEquals(Collections.singletonList(CONNECTOR_IDS.get(0)), new 
ArrayList<>(configState.connectors()));
+        // CONNECTOR_CONFIG_STRUCTS[2] -> SAMPLE_CONFIGS[2]
+        assertEquals(SAMPLE_CONFIGS.get(2), 
configState.connectorConfig(CONNECTOR_IDS.get(0)));
+        // Should see 0 tasks for that connector.
+        assertEquals(Collections.emptyList(), 
configState.tasks(CONNECTOR_IDS.get(0)));
+        // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0]
+        assertEquals(Collections.EMPTY_SET, 
configState.inconsistentConnectors());
+
+        // Shouldn't see any callbacks since this is during startup
+        configStorage.stop();
+        verify(configLog).stop();
+    }
+
+    @Test
+    public void testRecordToRestartRequest() {
+        ConsumerRecord<String, byte[]> record = new ConsumerRecord<>(TOPIC, 0, 
0, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0),
+                CONFIGS_SERIALIZED.get(0), new RecordHeaders(), 
Optional.empty());
+        Struct struct = RESTART_REQUEST_STRUCTS.get(0);
+        SchemaAndValue schemaAndValue = new SchemaAndValue(struct.schema(), 
structToMap(struct));
+        RestartRequest restartRequest = 
configStorage.recordToRestartRequest(record, schemaAndValue);
+        assertEquals(CONNECTOR_1_NAME, restartRequest.connectorName());
+        assertEquals(struct.getBoolean(INCLUDE_TASKS_FIELD_NAME), 
restartRequest.includeTasks());
+        assertEquals(struct.getBoolean(ONLY_FAILED_FIELD_NAME), 
restartRequest.onlyFailed());
+    }
+
+    @Test
+    public void testRecordToRestartRequestOnlyFailedInconsistent() {
+        ConsumerRecord<String, byte[]> record = new ConsumerRecord<>(TOPIC, 0, 
0, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0),
+                CONFIGS_SERIALIZED.get(0), new RecordHeaders(), 
Optional.empty());
+        Struct struct = ONLY_FAILED_MISSING_STRUCT;
+        SchemaAndValue schemaAndValue = new SchemaAndValue(struct.schema(), 
structToMap(struct));
+        RestartRequest restartRequest = 
configStorage.recordToRestartRequest(record, schemaAndValue);
+        assertEquals(CONNECTOR_1_NAME, restartRequest.connectorName());
+        assertEquals(struct.getBoolean(INCLUDE_TASKS_FIELD_NAME), 
restartRequest.includeTasks());
+        assertFalse(restartRequest.onlyFailed());
+    }
+
+    @Test
+    public void testRecordToRestartRequestIncludeTasksInconsistent() {
+        ConsumerRecord<String, byte[]> record = new ConsumerRecord<>(TOPIC, 0, 
0, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0),
+                CONFIGS_SERIALIZED.get(0), new RecordHeaders(), 
Optional.empty());
+        Struct struct = INCLUDE_TASKS_MISSING_STRUCT;
+        SchemaAndValue schemaAndValue = new SchemaAndValue(struct.schema(), 
structToMap(struct));
+        RestartRequest restartRequest = 
configStorage.recordToRestartRequest(record, schemaAndValue);
+        assertEquals(CONNECTOR_1_NAME, restartRequest.connectorName());
+        assertFalse(restartRequest.includeTasks());
+        assertEquals(struct.getBoolean(ONLY_FAILED_FIELD_NAME), 
restartRequest.onlyFailed());
+    }
+
+    @Test
+    public void testFencableProducerPropertiesOverrideUserSuppliedValues() {
+        props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "preparing");
+        String groupId = "my-other-connect-cluster";
+        props.put(GROUP_ID_CONFIG, groupId);
+        props.put(TRANSACTIONAL_ID_CONFIG, "my-custom-transactional-id");
+        props.put(ENABLE_IDEMPOTENCE_CONFIG, "false");
+        createStore();
+
+        Map<String, Object> fencableProducerProperties = 
configStorage.fencableProducerProps(config);
+        assertEquals("connect-cluster-" + groupId, 
fencableProducerProperties.get(TRANSACTIONAL_ID_CONFIG));
+        assertEquals("true", 
fencableProducerProperties.get(ENABLE_IDEMPOTENCE_CONFIG));
+    }
+
+    @Test
+    public void 
testConsumerPropertiesDoNotOverrideUserSuppliedValuesWithoutExactlyOnceSourceEnabled()
 {
+        props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "preparing");
+        props.put(ISOLATION_LEVEL_CONFIG, 
IsolationLevel.READ_UNCOMMITTED.toString());
+        createStore();
+
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+        verifyConfigure();
+
+        assertEquals(
+                IsolationLevel.READ_UNCOMMITTED.toString(),
+                capturedConsumerProps.getValue().get(ISOLATION_LEVEL_CONFIG)
+        );
+    }
+
+    @Test
+    public void testClientIds() {
+        props = new HashMap<>(DEFAULT_CONFIG_STORAGE_PROPS);
+        props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled");
+        createStore();
+
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+        verifyConfigure();
+
+        Map<String, Object> fencableProducerProps = 
configStorage.fencableProducerProps(config);
+
+        final String expectedClientId = CLIENT_ID_BASE + "configs";
+        assertEquals(expectedClientId, 
capturedProducerProps.getValue().get(CLIENT_ID_CONFIG));
+        assertEquals(expectedClientId, 
capturedConsumerProps.getValue().get(CLIENT_ID_CONFIG));
+        assertEquals(expectedClientId + "-leader", 
fencableProducerProps.get(CLIENT_ID_CONFIG));
+    }
+
+    @Test
+    public void testExceptionOnStartWhenConfigTopicHasMultiplePartitions() {
+        when(configLog.partitionCount()).thenReturn(2);
+
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+        ConfigException e = assertThrows(ConfigException.class, () -> 
configStorage.start());
+        assertTrue(e.getMessage().contains("required to have a single 
partition"));
+    }
+
+    @Test
+    public void testFencableProducerPropertiesInsertedByDefault() {
+        props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "preparing");
+        String groupId = "my-connect-cluster";
+        props.put(GROUP_ID_CONFIG, groupId);
+        props.remove(TRANSACTIONAL_ID_CONFIG);
+        props.remove(ENABLE_IDEMPOTENCE_CONFIG);
+        createStore();
+
+        Map<String, Object> fencableProducerProperties = 
configStorage.fencableProducerProps(config);
+        assertEquals("connect-cluster-" + groupId, 
fencableProducerProperties.get(TRANSACTIONAL_ID_CONFIG));
+        assertEquals("true", 
fencableProducerProperties.get(ENABLE_IDEMPOTENCE_CONFIG));
+    }
+
+    @Test
+    public void 
testConsumerPropertiesInsertedByDefaultWithExactlyOnceSourceEnabled() {
+        props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled");
+        props.remove(ISOLATION_LEVEL_CONFIG);
+        createStore();
+
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+        verifyConfigure();
+
+        assertEquals(
+                IsolationLevel.READ_COMMITTED.toString(),
+                capturedConsumerProps.getValue().get(ISOLATION_LEVEL_CONFIG)
+        );
+    }
+
+    @Test
+    public void 
testConsumerPropertiesOverrideUserSuppliedValuesWithExactlyOnceSourceEnabled() {
+        props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled");
+        props.put(ISOLATION_LEVEL_CONFIG, 
IsolationLevel.READ_UNCOMMITTED.toString());
+        createStore();
+
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+        verifyConfigure();
+
+        assertEquals(
+                IsolationLevel.READ_COMMITTED.toString(),
+                capturedConsumerProps.getValue().get(ISOLATION_LEVEL_CONFIG)
+        );
+    }
+
+    @Test
+    public void 
testConsumerPropertiesNotInsertedByDefaultWithoutExactlyOnceSourceEnabled() {
+        props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "preparing");
+        props.remove(ISOLATION_LEVEL_CONFIG);
+        createStore();
+
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+        verifyConfigure();
+
+        
assertNull(capturedConsumerProps.getValue().get(ISOLATION_LEVEL_CONFIG));
+    }
+
+    @Test
+    public void testBackgroundConnectorDeletion() throws Exception {
+        // verify that we handle connector deletions correctly when they come 
up through the log
+        List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+                new ConsumerRecord<>(TOPIC, 0, 0, 0L, 
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(0), new RecordHeaders(), 
Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 1, 0L, 
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(1), new RecordHeaders(), 
Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 2, 0L, 
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
+                        CONFIGS_SERIALIZED.get(2), new RecordHeaders(), 
Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 3, 0L, 
TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(3), new RecordHeaders(), 
Optional.empty()));
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
+        deserialized.put(CONFIGS_SERIALIZED.get(0), 
CONNECTOR_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(1), 
TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(2), 
TASK_CONFIG_STRUCTS.get(1));
+        deserialized.put(CONFIGS_SERIALIZED.get(3), 
TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+        logOffset = 5;
+
+        expectStart(existingRecords, deserialized);
+        when(configLog.partitionCount()).thenReturn(1);
+
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+        verifyConfigure();
+
+        configStorage.start();
+        verify(configLog).start();
+
+        // Should see a single connector with initial state paused
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(TargetState.STARTED, 
configState.targetState(CONNECTOR_IDS.get(0)));
+        assertEquals(SAMPLE_CONFIGS.get(0), 
configState.connectorConfig(CONNECTOR_IDS.get(0)));
+        assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(new 
ConnectorTaskId(CONNECTOR_IDS.get(0), 0)));
+        assertEquals(SAMPLE_CONFIGS.get(1), configState.taskConfig(new 
ConnectorTaskId(CONNECTOR_IDS.get(0), 1)));
+        assertEquals(2, configState.taskCount(CONNECTOR_IDS.get(0)));
+
+        LinkedHashMap<String, byte[]> serializedData = new LinkedHashMap<>();
+        serializedData.put(CONNECTOR_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(0));
+        serializedData.put(TARGET_STATE_KEYS.get(0), 
CONFIGS_SERIALIZED.get(1));
+        doAnswer(expectReadToEnd(serializedData)).when(configLog).readToEnd();
+
+        Map<String, Struct> deserializedData = new HashMap<>();
+        deserializedData.put(CONNECTOR_CONFIG_KEYS.get(0), null);
+        deserializedData.put(TARGET_STATE_KEYS.get(0), null);
+        expectRead(serializedData, deserializedData);
+
+        configStorage.refresh(0, TimeUnit.SECONDS);
+        
verify(configUpdateListener).onConnectorConfigRemove(CONNECTOR_IDS.get(0));
+
+        configState = configStorage.snapshot();
+        // Connector should now be removed from the snapshot
+        assertFalse(configState.contains(CONNECTOR_IDS.get(0)));
+        assertEquals(0, configState.taskCount(CONNECTOR_IDS.get(0)));
+        // Ensure that the deleted connector's deferred task updates have been 
cleaned up
+        // in order to prevent unbounded growth of the map
+        assertEquals(Collections.emptyMap(), 
configStorage.deferredTaskUpdates);
+
+        configStorage.stop();
+        verify(configLog).stop();
+    }
+
+    @Test
+    public void testPutTaskConfigsDoesNotResolveAllInconsistencies() throws 
Exception {
+        // Test a case where a failure and compaction has left us in an 
inconsistent state when reading the log.
+        // We start out by loading an initial configuration where we started 
to write a task update, and then
+        // compaction cleaned up the earlier record.
+        List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+                new ConsumerRecord<>(TOPIC, 0, 0, 0L, 
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(0), new RecordHeaders(), 
Optional.empty()),
+                // This is the record that has been compacted:
+                //new ConsumerRecord<>(TOPIC, 0, 1, TASK_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(1)),
+                new ConsumerRecord<>(TOPIC, 0, 2, 0L, 
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
+                        CONFIGS_SERIALIZED.get(2), new RecordHeaders(), 
Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 4, 0L, 
TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(4), new RecordHeaders(), 
Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 5, 0L, 
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(5), new RecordHeaders(), 
Optional.empty()));
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
+        deserialized.put(CONFIGS_SERIALIZED.get(0), 
CONNECTOR_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(2), 
TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(4), 
TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+        deserialized.put(CONFIGS_SERIALIZED.get(5), 
TASK_CONFIG_STRUCTS.get(1));
+        logOffset = 6;
+        expectStart(existingRecords, deserialized);
+        when(configLog.partitionCount()).thenReturn(1);
+
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+        verifyConfigure();
+        configStorage.start();
+
+        // After reading the log, it should have been in an inconsistent state
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(6, configState.offset()); // Should always be next to be 
read, not last committed
+        assertEquals(Collections.singletonList(CONNECTOR_IDS.get(0)), new 
ArrayList<>(configState.connectors()));
+        // Inconsistent data should leave us with no tasks listed for the 
connector and an entry in the inconsistent list
+        assertEquals(Collections.emptyList(), 
configState.tasks(CONNECTOR_IDS.get(0)));
+        // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0]
+        assertNull(configState.taskConfig(TASK_IDS.get(0)));
+        assertNull(configState.taskConfig(TASK_IDS.get(1)));
+        assertEquals(Collections.singleton(CONNECTOR_IDS.get(0)), 
configState.inconsistentConnectors());
 
         // Records to be read by consumer as it reads to the end of the log
         LinkedHashMap<String, byte[]> serializedConfigs = new 
LinkedHashMap<>();
         serializedConfigs.put(TASK_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(0));
-        serializedConfigs.put(TASK_CONFIG_KEYS.get(1), 
CONFIGS_SERIALIZED.get(1));
         serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(2));
-        expectReadToEnd(serializedConfigs);
+        // Successful attempt to write new task config
+        doAnswer(expectReadToEnd(new LinkedHashMap<>()))
+                .doAnswer(expectReadToEnd(new LinkedHashMap<>()))
+                .doAnswer(expectReadToEnd(serializedConfigs))
+                .when(configLog).readToEnd();
+        expectConvertWriteRead(
+                TASK_CONFIG_KEYS.get(0), 
KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
+                "properties", SAMPLE_CONFIGS.get(0));
+        expectConvertWriteRead(
+                COMMIT_TASKS_CONFIG_KEYS.get(0), 
KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2),
+                "tasks", 1); // Updated to just 1 task
+
+        // Next, issue a write that has everything that is needed and it 
should be accepted. Note that in this case
+        // we are going to shrink the number of tasks to 1
+        configStorage.putTaskConfigs("connector1", 
Collections.singletonList(SAMPLE_CONFIGS.get(0)));
+
+        // Validate updated config
+        configState = configStorage.snapshot();
+        // This is only two more ahead of the last one because multiple calls 
fail, and so their configs are not written
+        // to the topic. Only the last call with 1 task config + 1 commit 
actually gets written.
+        assertEquals(8, configState.offset());
+        assertEquals(Collections.singletonList(CONNECTOR_IDS.get(0)), new 
ArrayList<>(configState.connectors()));
+        assertEquals(Collections.singletonList(TASK_IDS.get(0)), 
configState.tasks(CONNECTOR_IDS.get(0)));
+        assertEquals(SAMPLE_CONFIGS.get(0), 
configState.taskConfig(TASK_IDS.get(0)));
+        assertEquals(Collections.EMPTY_SET, 
configState.inconsistentConnectors());
+
+        // As soon as root is rewritten, we should see a callback notifying us 
that we reconfigured some tasks
+        
verify(configUpdateListener).onTaskConfigUpdate(Collections.singletonList(TASK_IDS.get(0)));
+
+        configStorage.stop();
+        verify(configLog).stop();
+    }
+
+    @Test
+    public void testPutRestartRequestOnlyFailed() throws Exception {
+        RestartRequest restartRequest = new 
RestartRequest(CONNECTOR_IDS.get(0), true, false);
+        testPutRestartRequest(restartRequest);
+    }
+
+    @Test
+    public void testPutRestartRequestOnlyFailedIncludingTasks() throws 
Exception {
+        RestartRequest restartRequest = new 
RestartRequest(CONNECTOR_IDS.get(0), true, true);
+        testPutRestartRequest(restartRequest);
+    }
+
+    private void testPutRestartRequest(RestartRequest restartRequest) throws 
Exception {
+        expectStart(Collections.emptyList(), Collections.emptyMap());
+        when(configLog.partitionCount()).thenReturn(1);
+
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+        verifyConfigure();
+
+        configStorage.start();
+        verify(configLog).start();
 
-        // Task count records are read back after writing as well
         expectConvertWriteRead(
-                CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0), 
KafkaConfigBackingStore.TASK_COUNT_RECORD_V0, CONFIGS_SERIALIZED.get(3),
-                "task-count", 4);
-        serializedConfigs = new LinkedHashMap<>();
-        serializedConfigs.put(CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0), 
CONFIGS_SERIALIZED.get(3));
-        expectReadToEnd(serializedConfigs);
+                RESTART_CONNECTOR_KEYS.get(0), 
KafkaConfigBackingStore.RESTART_REQUEST_V0, CONFIGS_SERIALIZED.get(0),
+                ONLY_FAILED_FIELD_NAME, restartRequest.onlyFailed());
+
+        LinkedHashMap<String, byte[]> recordsToRead = new LinkedHashMap<>();
+        recordsToRead.put(RESTART_CONNECTOR_KEYS.get(0), 
CONFIGS_SERIALIZED.get(0));
+        doAnswer(expectReadToEnd(recordsToRead)).when(configLog).readToEnd();
+
+        // Writing should block until it is written and read back from Kafka
+        configStorage.putRestartRequest(restartRequest);
 
-        expectPartitionCount(1);
-        expectStop();
+        final ArgumentCaptor<RestartRequest> restartRequestCaptor = 
ArgumentCaptor.forClass(RestartRequest.class);
+        
verify(configUpdateListener).onRestartRequest(restartRequestCaptor.capture());
 
-        PowerMock.replayAll();
+        assertEquals(restartRequest.connectorName(), 
restartRequestCaptor.getValue().connectorName());
+        assertEquals(restartRequest.onlyFailed(), 
restartRequestCaptor.getValue().onlyFailed());
+        assertEquals(restartRequest.includeTasks(), 
restartRequestCaptor.getValue().includeTasks());
+
+        configStorage.stop();
+        verify(configLog).stop();
+    }
+
+    @Test
+    public void testRestoreRestartRequestInconsistentState() {
+        // Restoring data should notify only of the latest values after 
loading is complete. This also validates
+        // that inconsistent state doesn't prevent startup.
+        // Overwrite each type at least once to ensure we see the latest data 
after loading
+        List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+                new ConsumerRecord<>(TOPIC, 0, 0, 0L, 
TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(0), new RecordHeaders(), 
Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 1, 0L, 
TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(1),
+                        CONFIGS_SERIALIZED.get(1), new RecordHeaders(), 
Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 2, 0L, 
TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(1),
+                        CONFIGS_SERIALIZED.get(2), new RecordHeaders(), 
Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 3, 0L, 
TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(1),
+                        CONFIGS_SERIALIZED.get(3), new RecordHeaders(), 
Optional.empty()));
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
+        deserialized.put(CONFIGS_SERIALIZED.get(0), 
RESTART_REQUEST_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(1), 
RESTART_REQUEST_STRUCTS.get(1));
+        deserialized.put(CONFIGS_SERIALIZED.get(2), 
RESTART_REQUEST_STRUCTS.get(2));
+        deserialized.put(CONFIGS_SERIALIZED.get(3), null);
+        logOffset = 4;
+        expectStart(existingRecords, deserialized);
+        when(configLog.partitionCount()).thenReturn(1);
 
         configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+        verifyConfigure();
+
         configStorage.start();
+        verify(configLog).start();
+
+        // Shouldn't see any callbacks since this is during startup
+        verify(configUpdateListener, 
never()).onConnectorConfigRemove(anyString());
+        verify(configUpdateListener, 
never()).onConnectorConfigUpdate(anyString());
+        verify(configUpdateListener, 
never()).onTaskConfigUpdate(anyCollection());
+        verify(configUpdateListener, 
never()).onConnectorTargetStateChange(anyString());
+        verify(configUpdateListener, 
never()).onSessionKeyUpdate(any(SessionKey.class));
+        verify(configUpdateListener, 
never()).onRestartRequest(any(RestartRequest.class));
+        verify(configUpdateListener, 
never()).onLoggingLevelUpdate(anyString(), anyString());
+
+        configStorage.stop();
+        verify(configLog).stop();
+    }
+
+    @Test
+    public void testPutTaskConfigsZeroTasks() throws Exception {
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+        verifyConfigure();
+        configStorage.start();
+        verify(configLog).start();
+
+        // Records to be read by consumer as it reads to the end of the log
+        doAnswer(expectReadToEnd(new LinkedHashMap<>())).
+                
doAnswer(expectReadToEnd(Collections.singletonMap(COMMIT_TASKS_CONFIG_KEYS.get(0),
 CONFIGS_SERIALIZED.get(0))))
+                .when(configLog).readToEnd();
+
+        expectConvertWriteRead(
+                COMMIT_TASKS_CONFIG_KEYS.get(0), 
KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(0),
+                "tasks", 0); // We have 0 tasks
 
         // Bootstrap as if we had already added the connector, but no tasks 
had been added yet
-        whiteboxAddConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), 
Collections.emptyList());
+        addConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), 
Collections.emptyList());
 
-        // Before anything is written
-        String connectorName = CONNECTOR_IDS.get(0);
+
+        // Null before writing
         ClusterConfigState configState = configStorage.snapshot();
-        assertFalse(configState.pendingFencing(connectorName));
-        assertNull(configState.taskCountRecord(connectorName));
-        assertNull(configState.taskConfigGeneration(connectorName));
+        assertEquals(-1, configState.offset());
 
         // Writing task configs should block until all the writes have been 
performed and the root record update
         // has completed
-        List<Map<String, String>> taskConfigs = 
Arrays.asList(SAMPLE_CONFIGS.get(0), SAMPLE_CONFIGS.get(1));
+        List<Map<String, String>> taskConfigs = Collections.emptyList();
         configStorage.putTaskConfigs("connector1", taskConfigs);
 
+        // Validate root config by listing all connectors and tasks
         configState = configStorage.snapshot();
-        assertEquals(3, configState.offset());
-        assertTrue(configState.pendingFencing(connectorName));
-        assertNull(configState.taskCountRecord(connectorName));
-        assertEquals(0, (long) 
configState.taskConfigGeneration(connectorName));
+        assertEquals(1, configState.offset());
+        String connectorName = CONNECTOR_IDS.get(0);
+        assertEquals(Collections.singletonList(connectorName), new 
ArrayList<>(configState.connectors()));
+        assertEquals(Collections.emptyList(), 
configState.tasks(connectorName));
+        assertEquals(Collections.EMPTY_SET, 
configState.inconsistentConnectors());
+
+        // As soon as root is rewritten, we should see a callback notifying us 
that we reconfigured some tasks
+        
verify(configUpdateListener).onTaskConfigUpdate(Collections.emptyList());
+
+        configStorage.stop();
+        verify(configLog).stop();
+    }
 
-        configStorage.putTaskCountRecord(connectorName, 4);
+    @Test
+    public void testBackgroundUpdateTargetState() throws Exception {
+        // verify that we handle target state changes correctly when they come 
up through the log
+        List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+                new ConsumerRecord<>(TOPIC, 0, 0, 0L, 
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(0), new RecordHeaders(), 
Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 1, 0L, 
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(1), new RecordHeaders(), 
Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 2, 0L, 
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
+                        CONFIGS_SERIALIZED.get(2), new RecordHeaders(), 
Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 3, 0L, 
TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(3), new RecordHeaders(), 
Optional.empty()));
+        LinkedHashMap<byte[], Struct> deserializedOnStartup = new 
LinkedHashMap<>();
+        deserializedOnStartup.put(CONFIGS_SERIALIZED.get(0), 
CONNECTOR_CONFIG_STRUCTS.get(0));
+        deserializedOnStartup.put(CONFIGS_SERIALIZED.get(1), 
TASK_CONFIG_STRUCTS.get(0));
+        deserializedOnStartup.put(CONFIGS_SERIALIZED.get(2), 
TASK_CONFIG_STRUCTS.get(0));
+        deserializedOnStartup.put(CONFIGS_SERIALIZED.get(3), 
TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+        logOffset = 5;
+
+        expectStart(existingRecords, deserializedOnStartup);
+        when(configLog.partitionCount()).thenReturn(1);
 
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+        verifyConfigure();
+        configStorage.start();
+        verify(configLog).start();
+
+        // Should see a single connector with initial state started
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(Collections.singleton(CONNECTOR_IDS.get(0)), 
configStorage.connectorTargetStates.keySet());
+        assertEquals(TargetState.STARTED, 
configState.targetState(CONNECTOR_IDS.get(0)));
+
+        LinkedHashMap<String, byte[]> serializedAfterStartup = new 
LinkedHashMap<>();
+        serializedAfterStartup.put(TARGET_STATE_KEYS.get(0), 
CONFIGS_SERIALIZED.get(0));
+        serializedAfterStartup.put(TARGET_STATE_KEYS.get(1), 
CONFIGS_SERIALIZED.get(1));
+        
doAnswer(expectReadToEnd(serializedAfterStartup)).when(configLog).readToEnd();
+
+        Map<String, Struct> deserializedAfterStartup = new HashMap<>();
+        deserializedAfterStartup.put(TARGET_STATE_KEYS.get(0), 
TARGET_STATE_PAUSED);
+        deserializedAfterStartup.put(TARGET_STATE_KEYS.get(1), 
TARGET_STATE_STOPPED);
+        expectRead(serializedAfterStartup, deserializedAfterStartup);
+
+        // Should see two connectors now, one paused and one stopped
+        configStorage.refresh(0, TimeUnit.SECONDS);
+        
verify(configUpdateListener).onConnectorTargetStateChange(CONNECTOR_IDS.get(0));
         configState = configStorage.snapshot();
-        assertEquals(4, configState.offset());
-        assertFalse(configState.pendingFencing(connectorName));
-        assertEquals(4, (long) configState.taskCountRecord(connectorName));
-        assertEquals(0, (long) 
configState.taskConfigGeneration(connectorName));
+
+        assertEquals(new HashSet<>(CONNECTOR_IDS), 
configStorage.connectorTargetStates.keySet());
+        assertEquals(TargetState.PAUSED, 
configState.targetState(CONNECTOR_IDS.get(0)));
+        assertEquals(TargetState.STOPPED, 
configState.targetState(CONNECTOR_IDS.get(1)));
+
+        configStorage.stop();
+        verify(configStorage).stop();
+    }
+
+    @Test
+    public void testSameTargetState() throws Exception {
+        // verify that we handle target state changes correctly when they come 
up through the log
+        List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+                new ConsumerRecord<>(TOPIC, 0, 0, 0L, 
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(0), new RecordHeaders(), 
Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 1, 0L, 
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(1), new RecordHeaders(), 
Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 2, 0L, 
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
+                        CONFIGS_SERIALIZED.get(2), new RecordHeaders(), 
Optional.empty()),
+                new ConsumerRecord<>(TOPIC, 0, 3, 0L, 
TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+                        CONFIGS_SERIALIZED.get(3), new RecordHeaders(), 
Optional.empty()));
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
+        deserialized.put(CONFIGS_SERIALIZED.get(0), 
CONNECTOR_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(1), 
TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(2), 
TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(3), 
TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+        logOffset = 5;
+
+        expectStart(existingRecords, deserialized);
+
+        when(configLog.partitionCount()).thenReturn(1);
+
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+        verifyConfigure();
+        configStorage.start();
+        verify(configLog).start();
+
+        ClusterConfigState configState = configStorage.snapshot();
+        expectRead(TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(0), 
TARGET_STATE_STARTED);
+        // Should see a single connector with initial state paused
+        assertEquals(TargetState.STARTED, 
configState.targetState(CONNECTOR_IDS.get(0)));
+
+        expectRead(TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(0), 
TARGET_STATE_STARTED);
+        // on resume update listener shouldn't be called
+        verify(configUpdateListener, 
never()).onConnectorTargetStateChange(anyString());
+
+        configStorage.stop();
+        verify(configStorage).stop();
+    }
+
+
+    @Test
+    public void testPutLogLevel() throws Exception {
+        final String logger1 = "org.apache.zookeeper";
+        final String logger2 = "org.apache.cassandra";
+        final String logger3 = "org.apache.kafka.clients";
+        final String logger4 = "org.apache.kafka.connect";
+        final String level1 = "ERROR";
+        final String level3 = "WARN";
+        final String level4 = "DEBUG";
+
+        final Struct existingLogLevel = new 
Struct(KafkaConfigBackingStore.LOGGER_LEVEL_V0)
+                .put("level", level1);
+
+        // Pre-populate the config topic with a couple of logger level 
records; these should be ignored (i.e.,
+        // not reported to the update listener)
+        List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+                new ConsumerRecord<>(TOPIC, 0, 0, 0L, 
TimestampType.CREATE_TIME, 0, 0, "logger-cluster-" + logger1,
+                        CONFIGS_SERIALIZED.get(0), new RecordHeaders(), 
Optional.empty()
+                ),
+                new ConsumerRecord<>(TOPIC, 0, 1, 0L, 
TimestampType.CREATE_TIME, 0, 0, "logger-cluster-" + logger2,
+                        CONFIGS_SERIALIZED.get(1), new RecordHeaders(), 
Optional.empty()
+                )
+        );
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
+        deserialized.put(CONFIGS_SERIALIZED.get(0), existingLogLevel);
+        // Make sure we gracefully handle tombstones
+        deserialized.put(CONFIGS_SERIALIZED.get(1), null);
+        logOffset = 2;
+
+        expectStart(existingRecords, deserialized);
+        when(configLog.partitionCount()).thenReturn(1);
+
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+        verifyConfigure();
+
+        configStorage.start();
+        verify(configLog).start();
+
+        expectConvertWriteRead(
+                "logger-cluster-" + logger3, 
KafkaConfigBackingStore.LOGGER_LEVEL_V0, CONFIGS_SERIALIZED.get(2),
+                "level", level3);
+        configStorage.putLoggerLevel(logger3, level3);
+
+        expectConvertWriteRead(
+                "logger-cluster-" + logger4, 
KafkaConfigBackingStore.LOGGER_LEVEL_V0, CONFIGS_SERIALIZED.get(3),
+                "level", level4);
+        configStorage.putLoggerLevel(logger4, level4);
+
+        LinkedHashMap<String, byte[]> newRecords = new LinkedHashMap<>();
+        newRecords.put("logger-cluster-" + logger3, CONFIGS_SERIALIZED.get(2));
+        newRecords.put("logger-cluster-" + logger4, CONFIGS_SERIALIZED.get(3));
+        doAnswer(expectReadToEnd(newRecords)).when(configLog).readToEnd();
+
+        configStorage.refresh(0, TimeUnit.SECONDS);
+        verify(configUpdateListener).onLoggingLevelUpdate(logger3, level3);
+        verify(configUpdateListener).onLoggingLevelUpdate(logger4, level4);
 
         configStorage.stop();
+        verify(configLog).stop();
+    }
 
-        PowerMock.verifyAll();
+    @Test
+    public void testTaskCountRecordsAndGenerations() throws Exception {
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+        verifyConfigure();
+        configStorage.start();
+        verify(configLog).start();
+
+        // Records to be read by consumer as it reads to the end of the log
+        doAnswer(expectReadToEnd(new LinkedHashMap<>()))
+                .doAnswer(expectReadToEnd(new LinkedHashMap<String, byte[]>() 
{{
+                            put(TASK_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(0));
+                            put(TASK_CONFIG_KEYS.get(1), 
CONFIGS_SERIALIZED.get(0));
+                            put(COMMIT_TASKS_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(1));
+                        }})
+                )
+                .doAnswer(expectReadToEnd(new LinkedHashMap<String, byte[]>() 
{{
+                            put(CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0), 
CONFIGS_SERIALIZED.get(2));
+                        }})
+                )
+                .when(configLog).readToEnd();
+
+        // Task configs should read to end, write to the log, read to end, 
write root, then read to end again
+//        expectConvertWriteRead(TASK_CONFIG_KEYS.get(0), 
KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
+//                "properties", SAMPLE_CONFIGS.get(0));
+//        expectConvertWriteRead(
+//                TASK_CONFIG_KEYS.get(1), 
KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
+//                "properties", SAMPLE_CONFIGS.get(0));
+//        expectConvertWriteRead(
+//                COMMIT_TASKS_CONFIG_KEYS.get(0), 
KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(0),
+//                "tasks", 2); // Starts with 0 tasks, after update has 2
+//
+//        // Task count records are read back after writing as well
+//        expectConvertWriteRead(
+//                CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0), 
KafkaConfigBackingStore.TASK_COUNT_RECORD_V0, CONFIGS_SERIALIZED.get(0),
+//                "task-count", 4);
+
+        when(configLog.partitionCount()).thenReturn(1);
+
+        addConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), 
Collections.emptyList());
+
+        // Before anything is written
+        String connectorName = CONNECTOR_IDS.get(0);
+        ClusterConfigState configState = configStorage.snapshot();
+        assertFalse(configState.pendingFencing(connectorName));
+        assertNull(configState.taskCountRecord(connectorName));
+        assertNull(configState.taskConfigGeneration(connectorName));
+
+        // Writing task configs should block until all the writes have been 
performed and the root record update
+        // has completed
+        List<Map<String, String>> taskConfigs = 
Arrays.asList(SAMPLE_CONFIGS.get(0), SAMPLE_CONFIGS.get(0));
+
+        Struct connectConfig = new 
Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0);
+        connectConfig.put("properties", SAMPLE_CONFIGS.get(0));
+        Struct taskCountRecord = new 
Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0);
+        taskCountRecord.put("task-count", 4);
+        Struct connectTaskCommitConfig = new 
Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0);
+        connectTaskCommitConfig.put("tasks", taskConfigs.size());
+
+        
doReturn(CONFIGS_SERIALIZED.get(0)).when(converter).fromConnectData(TOPIC, 
KafkaConfigBackingStore.TASK_CONFIGURATION_V0, connectConfig);
+        
doReturn(CONFIGS_SERIALIZED.get(0)).when(converter).fromConnectData(TOPIC, 
KafkaConfigBackingStore.TASK_COUNT_RECORD_V0, taskCountRecord);
+        
doReturn(CONFIGS_SERIALIZED.get(0)).when(converter).fromConnectData(TOPIC, 
KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, connectTaskCommitConfig);
+
+        
doReturn(producerFuture).when(configLog).sendWithReceipt(TASK_CONFIG_KEYS.get(0),
 CONFIGS_SERIALIZED.get(0));
+        
doReturn(producerFuture).when(configLog).sendWithReceipt(TASK_CONFIG_KEYS.get(1),
 CONFIGS_SERIALIZED.get(0));
+        
doReturn(producerFuture).when(configLog).sendWithReceipt(COMMIT_TASKS_CONFIG_KEYS.get(0),
 CONFIGS_SERIALIZED.get(1));
+        
doReturn(producerFuture).when(configLog).sendWithReceipt(CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0),
 CONFIGS_SERIALIZED.get(2));
+        doAnswer(invocation -> {
+            // Note null schema because default settings for internal 
serialization are schema-less
+            return new SchemaAndValue(null, structToMap(connectConfig));
+        }).when(converter).toConnectData(TOPIC, CONFIGS_SERIALIZED.get(0));
+        doAnswer(invocation -> {
+            // Note null schema because default settings for internal 
serialization are schema-less
+            return new SchemaAndValue(null, 
structToMap(connectTaskCommitConfig));
+        }).when(converter).toConnectData(TOPIC, CONFIGS_SERIALIZED.get(1));
+        doAnswer(invocation -> {
+            // Note null schema because default settings for internal 
serialization are schema-less
+            return new SchemaAndValue(null, structToMap(taskCountRecord));
+        }).when(converter).toConnectData(TOPIC, CONFIGS_SERIALIZED.get(2));
+        doReturn(null).when(producerFuture).get(anyLong(), 
any(TimeUnit.class));
+        configStorage.putTaskConfigs("connector1", taskConfigs);
+
+//        configState = configStorage.snapshot();
+//        assertEquals(3, configState.offset());
+//        assertTrue(configState.pendingFencing(connectorName));
+//        assertNull(configState.taskCountRecord(connectorName));
+//        assertEquals(0, (long) 
configState.taskConfigGeneration(connectorName));
+//
+//        configStorage.putTaskCountRecord(connectorName, 4);
+//
+//        configState = configStorage.snapshot();
+//        assertEquals(4, configState.offset());
+//        assertFalse(configState.pendingFencing(connectorName));
+//        assertEquals(4, (long) configState.taskCountRecord(connectorName));
+//        assertEquals(0, (long) 
configState.taskConfigGeneration(connectorName));
+//
+//        // As soon as root is rewritten, we should see a callback notifying 
us that we reconfigured some tasks
+//        
verify(configUpdateListener).onTaskConfigUpdate(Arrays.asList(TASK_IDS.get(0), 
TASK_IDS.get(1)));
+
+        configStorage.stop();
+        verify(configLog).stop();
     }
 
     @Test
     public void testPutTaskConfigs() throws Exception {

Review Comment:
   thanks for the sharing!
   I was stuck at the expectConvertWriteRead, got different stubbing errors and 
not sure what could be a better solution. let me check this and modify 
accordingly, thanks! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to