johnnychhsu commented on code in PR #16126:
URL: https://github.com/apache/kafka/pull/16126#discussion_r1629995608
##########
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java:
##########
@@ -194,344 +235,1463 @@ public void setUp() {
}
@Test
- public void testTaskCountRecordsAndGenerations() throws Exception {
- expectConfigure();
- expectStart(Collections.emptyList(), Collections.emptyMap());
+ public void testStartStop() {
+ props.put("config.storage.min.insync.replicas", "3");
+ props.put("config.storage.max.message.bytes", "1001");
+ createStore();
- // Task configs should read to end, write to the log, read to end,
write root, then read to end again
- expectReadToEnd(new LinkedHashMap<>());
- expectConvertWriteRead(
- TASK_CONFIG_KEYS.get(0),
KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
- "properties", SAMPLE_CONFIGS.get(0));
- expectConvertWriteRead(
- TASK_CONFIG_KEYS.get(1),
KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1),
- "properties", SAMPLE_CONFIGS.get(1));
- expectReadToEnd(new LinkedHashMap<>());
- expectConvertWriteRead(
- COMMIT_TASKS_CONFIG_KEYS.get(0),
KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2),
- "tasks", 2); // Starts with 0 tasks, after update has 2
- // As soon as root is rewritten, we should see a callback notifying us
that we reconfigured some tasks
- configUpdateListener.onTaskConfigUpdate(Arrays.asList(TASK_IDS.get(0),
TASK_IDS.get(1)));
- EasyMock.expectLastCall();
+ when(configLog.partitionCount()).thenReturn(1);
- // Records to be read by consumer as it reads to the end of the log
- LinkedHashMap<String, byte[]> serializedConfigs = new
LinkedHashMap<>();
- serializedConfigs.put(TASK_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(0));
- serializedConfigs.put(TASK_CONFIG_KEYS.get(1),
CONFIGS_SERIALIZED.get(1));
- serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(2));
- expectReadToEnd(serializedConfigs);
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
- // Task count records are read back after writing as well
- expectConvertWriteRead(
- CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0),
KafkaConfigBackingStore.TASK_COUNT_RECORD_V0, CONFIGS_SERIALIZED.get(3),
- "task-count", 4);
- serializedConfigs = new LinkedHashMap<>();
- serializedConfigs.put(CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0),
CONFIGS_SERIALIZED.get(3));
- expectReadToEnd(serializedConfigs);
+ verifyConfigure();
+ assertEquals(TOPIC, capturedTopic.getValue());
+ assertEquals("org.apache.kafka.common.serialization.StringSerializer",
capturedProducerProps.getValue().get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
+
assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer",
capturedProducerProps.getValue().get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
+
assertEquals("org.apache.kafka.common.serialization.StringDeserializer",
capturedConsumerProps.getValue().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
+
assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer",
capturedConsumerProps.getValue().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
+
+ assertEquals(TOPIC, capturedNewTopic.getValue().name());
+ assertEquals(1, capturedNewTopic.getValue().numPartitions());
+ assertEquals(TOPIC_REPLICATION_FACTOR,
capturedNewTopic.getValue().replicationFactor());
+ assertEquals("3",
capturedNewTopic.getValue().configs().get("min.insync.replicas"));
+ assertEquals("1001",
capturedNewTopic.getValue().configs().get("max.message.bytes"));
+
+ configStorage.start();
+ configStorage.stop();
+
+ verify(configLog).start();
+ verify(configLog).stop();
+ }
- expectPartitionCount(1);
- expectStop();
+ @Test
+ public void testSnapshotCannotMutateInternalState() {
+ props.put("config.storage.min.insync.replicas", "3");
+ props.put("config.storage.max.message.bytes", "1001");
+ createStore();
- PowerMock.replayAll();
+ when(configLog.partitionCount()).thenReturn(1);
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+
configStorage.start();
+ ClusterConfigState snapshot = configStorage.snapshot();
+ assertNotSame(snapshot.connectorTaskCounts,
configStorage.connectorTaskCounts);
+ assertNotSame(snapshot.connectorConfigs,
configStorage.connectorConfigs);
+ assertNotSame(snapshot.connectorTargetStates,
configStorage.connectorTargetStates);
+ assertNotSame(snapshot.taskConfigs, configStorage.taskConfigs);
+ assertNotSame(snapshot.connectorTaskCountRecords,
configStorage.connectorTaskCountRecords);
+ assertNotSame(snapshot.connectorTaskConfigGenerations,
configStorage.connectorTaskConfigGenerations);
+ assertNotSame(snapshot.connectorsPendingFencing,
configStorage.connectorsPendingFencing);
+ assertNotSame(snapshot.inconsistentConnectors,
configStorage.inconsistent);
+ }
- // Bootstrap as if we had already added the connector, but no tasks
had been added yet
- whiteboxAddConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0),
Collections.emptyList());
+ @Test
+ public void testPutConnectorConfig() throws Exception {
+ when(configLog.partitionCount()).thenReturn(1);
- // Before anything is written
- String connectorName = CONNECTOR_IDS.get(0);
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+ configStorage.start();
+
+ // Null before writing
ClusterConfigState configState = configStorage.snapshot();
- assertFalse(configState.pendingFencing(connectorName));
- assertNull(configState.taskCountRecord(connectorName));
- assertNull(configState.taskConfigGeneration(connectorName));
+ assertEquals(-1, configState.offset());
+ assertNull(configState.connectorConfig(CONNECTOR_IDS.get(0)));
+ assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1)));
+
+ String configKey = CONNECTOR_CONFIG_KEYS.get(1);
+ String targetStateKey = TARGET_STATE_KEYS.get(1);
+
+
doAnswer(expectReadToEnd(Collections.singletonMap(CONNECTOR_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(0))))
+
.doAnswer(expectReadToEnd(Collections.singletonMap(CONNECTOR_CONFIG_KEYS.get(1),
CONFIGS_SERIALIZED.get(1))))
+ // Config deletion
+ .doAnswer(expectReadToEnd(new LinkedHashMap<String, byte[]>()
{{
+ put(configKey, null);
+ put(targetStateKey, null);
+ }})
+ ).when(configLog).readToEnd();
+
+ // Writing should block until it is written and read back from Kafka
+ expectConvertWriteRead(
+ CONNECTOR_CONFIG_KEYS.get(0),
KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
+ "properties", SAMPLE_CONFIGS.get(0));
- // Writing task configs should block until all the writes have been
performed and the root record update
- // has completed
- List<Map<String, String>> taskConfigs =
Arrays.asList(SAMPLE_CONFIGS.get(0), SAMPLE_CONFIGS.get(1));
- configStorage.putTaskConfigs("connector1", taskConfigs);
+ configStorage.putConnectorConfig(CONNECTOR_IDS.get(0),
SAMPLE_CONFIGS.get(0), null);
+ configState = configStorage.snapshot();
+
+ assertEquals(1, configState.offset());
+ assertEquals(SAMPLE_CONFIGS.get(0),
configState.connectorConfig(CONNECTOR_IDS.get(0)));
+ assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1)));
+
verify(configUpdateListener).onConnectorConfigUpdate(CONNECTOR_IDS.get(0));
+ // Second should also block and all configs should still be available
+ expectConvertWriteRead(
+ CONNECTOR_CONFIG_KEYS.get(1),
KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1),
+ "properties", SAMPLE_CONFIGS.get(1));
+
+ configStorage.putConnectorConfig(CONNECTOR_IDS.get(1),
SAMPLE_CONFIGS.get(1), null);
configState = configStorage.snapshot();
- assertEquals(3, configState.offset());
- assertTrue(configState.pendingFencing(connectorName));
- assertNull(configState.taskCountRecord(connectorName));
- assertEquals(0, (long)
configState.taskConfigGeneration(connectorName));
- configStorage.putTaskCountRecord(connectorName, 4);
+ assertEquals(2, configState.offset());
+ assertEquals(SAMPLE_CONFIGS.get(0),
configState.connectorConfig(CONNECTOR_IDS.get(0)));
+ assertEquals(SAMPLE_CONFIGS.get(1),
configState.connectorConfig(CONNECTOR_IDS.get(1)));
+
verify(configUpdateListener).onConnectorConfigUpdate(CONNECTOR_IDS.get(1));
+ // Config deletion
+ when(producerFuture.get(anyLong(),
any(TimeUnit.class))).thenReturn(null);
+ when(converter.toConnectData(TOPIC, null)).thenReturn(new
SchemaAndValue(null, null));
+
when(configLog.sendWithReceipt(AdditionalMatchers.or(Mockito.eq(configKey),
Mockito.eq(targetStateKey)),
+ Mockito.isNull())).thenReturn(producerFuture);
+
+ // Deletion should remove the second one we added
+ configStorage.removeConnectorConfig(CONNECTOR_IDS.get(1));
configState = configStorage.snapshot();
+
assertEquals(4, configState.offset());
- assertFalse(configState.pendingFencing(connectorName));
- assertEquals(4, (long) configState.taskCountRecord(connectorName));
- assertEquals(0, (long)
configState.taskConfigGeneration(connectorName));
+ assertEquals(SAMPLE_CONFIGS.get(0),
configState.connectorConfig(CONNECTOR_IDS.get(0)));
+ assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1)));
+ assertNull(configState.targetState(CONNECTOR_IDS.get(1)));
+
verify(configUpdateListener).onConnectorConfigRemove(CONNECTOR_IDS.get(1));
configStorage.stop();
-
- PowerMock.verifyAll();
+ verify(configLog).stop();
}
@Test
- public void testPutTaskConfigs() throws Exception {
- expectConfigure();
- expectStart(Collections.emptyList(), Collections.emptyMap());
+ public void testPutConnectorConfigWithTargetState() throws Exception {
+ when(configLog.partitionCount()).thenReturn(1);
- // Task configs should read to end, write to the log, read to end,
write root, then read to end again
- expectReadToEnd(new LinkedHashMap<>());
- expectConvertWriteRead(
- TASK_CONFIG_KEYS.get(0),
KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
- "properties", SAMPLE_CONFIGS.get(0));
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+ configStorage.start();
+
+ // Null before writing
+ ClusterConfigState configState = configStorage.snapshot();
+ assertEquals(-1, configState.offset());
+ assertNull(configState.connectorConfig(CONNECTOR_IDS.get(0)));
+ assertNull(configState.targetState(CONNECTOR_IDS.get(0)));
+
+ doAnswer(expectReadToEnd(new LinkedHashMap<String, byte[]>() {{
+ put(TARGET_STATE_KEYS.get(0),
TARGET_STATES_SERIALIZED.get(2));
+ put(CONNECTOR_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(0));
+ }})
+ ).when(configLog).readToEnd();
+
+ // We expect to write the target state first, followed by the config
write and then a read to end
+ // We expect to write the target state first, followed by the config
write and then a read to end
expectConvertWriteRead(
- TASK_CONFIG_KEYS.get(1),
KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1),
- "properties", SAMPLE_CONFIGS.get(1));
- expectReadToEnd(new LinkedHashMap<>());
+ TARGET_STATE_KEYS.get(0),
KafkaConfigBackingStore.TARGET_STATE_V1, TARGET_STATES_SERIALIZED.get(2),
+ "state.v2", TargetState.STOPPED.name());
+
expectConvertWriteRead(
- COMMIT_TASKS_CONFIG_KEYS.get(0),
KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2),
- "tasks", 2); // Starts with 0 tasks, after update has 2
- // As soon as root is rewritten, we should see a callback notifying us
that we reconfigured some tasks
- configUpdateListener.onTaskConfigUpdate(Arrays.asList(TASK_IDS.get(0),
TASK_IDS.get(1)));
- EasyMock.expectLastCall();
+ CONNECTOR_CONFIG_KEYS.get(0),
KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
+ "properties", SAMPLE_CONFIGS.get(0));
- // Records to be read by consumer as it reads to the end of the log
- LinkedHashMap<String, byte[]> serializedConfigs = new
LinkedHashMap<>();
- serializedConfigs.put(TASK_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(0));
- serializedConfigs.put(TASK_CONFIG_KEYS.get(1),
CONFIGS_SERIALIZED.get(1));
- serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(2));
- expectReadToEnd(serializedConfigs);
+ // Writing should block until it is written and read back from Kafka
+ configStorage.putConnectorConfig(CONNECTOR_IDS.get(0),
SAMPLE_CONFIGS.get(0), TargetState.STOPPED);
+ configState = configStorage.snapshot();
+ assertEquals(2, configState.offset());
+ assertEquals(TargetState.STOPPED,
configState.targetState(CONNECTOR_IDS.get(0)));
+ assertEquals(SAMPLE_CONFIGS.get(0),
configState.connectorConfig(CONNECTOR_IDS.get(0)));
- expectPartitionCount(1);
- expectStop();
+ // We don't expect the config update listener's
onConnectorTargetStateChange hook to be invoked
+ verify(configUpdateListener,
never()).onConnectorTargetStateChange(anyString());
+
+
verify(configUpdateListener).onConnectorConfigUpdate(CONNECTOR_IDS.get(0));
+
+ configStorage.stop();
+ verify(configLog).stop();
+ }
- PowerMock.replayAll();
+ @Test
+ public void testPutConnectorConfigProducerError() throws Exception {
+ when(configLog.partitionCount()).thenReturn(1);
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
configStorage.start();
- // Bootstrap as if we had already added the connector, but no tasks
had been added yet
- whiteboxAddConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0),
Collections.emptyList());
+ when(converter.fromConnectData(TOPIC,
KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0,
CONNECTOR_CONFIG_STRUCTS.get(0)))
+ .thenReturn(CONFIGS_SERIALIZED.get(0));
+ when(configLog.sendWithReceipt(anyString(),
any(byte[].class))).thenReturn(producerFuture);
- // Null before writing
+ // Verify initial state
ClusterConfigState configState = configStorage.snapshot();
assertEquals(-1, configState.offset());
- assertNull(configState.taskConfig(TASK_IDS.get(0)));
- assertNull(configState.taskConfig(TASK_IDS.get(1)));
+ assertEquals(0, configState.connectors().size());
- // Writing task configs should block until all the writes have been
performed and the root record update
- // has completed
- List<Map<String, String>> taskConfigs =
Arrays.asList(SAMPLE_CONFIGS.get(0), SAMPLE_CONFIGS.get(1));
- configStorage.putTaskConfigs("connector1", taskConfigs);
+ Exception thrownException = new ExecutionException(new
TopicAuthorizationException(Collections.singleton("test")));
+ when(producerFuture.get(anyLong(),
any(TimeUnit.class))).thenThrow(thrownException);
- // Validate root config by listing all connectors and tasks
- configState = configStorage.snapshot();
- assertEquals(3, configState.offset());
- String connectorName = CONNECTOR_IDS.get(0);
- assertEquals(Collections.singletonList(connectorName), new
ArrayList<>(configState.connectors()));
- assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)),
configState.tasks(connectorName));
- assertEquals(SAMPLE_CONFIGS.get(0),
configState.taskConfig(TASK_IDS.get(0)));
- assertEquals(SAMPLE_CONFIGS.get(1),
configState.taskConfig(TASK_IDS.get(1)));
- assertEquals(Collections.EMPTY_SET,
configState.inconsistentConnectors());
+ // verify that the producer exception from KafkaBasedLog::send is
propagated
+ ConnectException e = assertThrows(ConnectException.class, () ->
configStorage.putConnectorConfig(CONNECTOR_IDS.get(0),
+ SAMPLE_CONFIGS.get(0), null));
+ assertTrue(e.getMessage().contains("Error writing connector
configuration to Kafka"));
+ assertEquals(thrownException, e.getCause());
configStorage.stop();
-
- PowerMock.verifyAll();
+ verify(configLog).stop();
}
@Test
- public void testPutTaskConfigsStartsOnlyReconfiguredTasks() throws
Exception {
- expectConfigure();
- expectStart(Collections.emptyList(), Collections.emptyMap());
+ public void testRemoveConnectorConfigSlowProducer() throws Exception {
+ when(configLog.partitionCount()).thenReturn(1);
- // Task configs should read to end, write to the log, read to end,
write root, then read to end again
- expectReadToEnd(new LinkedHashMap<>());
- expectConvertWriteRead(
- TASK_CONFIG_KEYS.get(0),
KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
- "properties", SAMPLE_CONFIGS.get(0));
- expectConvertWriteRead(
- TASK_CONFIG_KEYS.get(1),
KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1),
- "properties", SAMPLE_CONFIGS.get(1));
- expectReadToEnd(new LinkedHashMap<>());
- expectConvertWriteRead(
- COMMIT_TASKS_CONFIG_KEYS.get(0),
KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2),
- "tasks", 2); // Starts with 0 tasks, after update has 2
- // As soon as root is rewritten, we should see a callback notifying us
that we reconfigured some tasks
- configUpdateListener.onTaskConfigUpdate(Arrays.asList(TASK_IDS.get(0),
TASK_IDS.get(1)));
- EasyMock.expectLastCall();
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+ configStorage.start();
- // Records to be read by consumer as it reads to the end of the log
- LinkedHashMap<String, byte[]> serializedConfigs = new
LinkedHashMap<>();
- serializedConfigs.put(TASK_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(0));
- serializedConfigs.put(TASK_CONFIG_KEYS.get(1),
CONFIGS_SERIALIZED.get(1));
- serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(2));
- expectReadToEnd(serializedConfigs);
+ @SuppressWarnings("unchecked")
+ Future<RecordMetadata> connectorConfigProducerFuture =
mock(Future.class);
- // Task configs should read to end, write to the log, read to end,
write root, then read to end again
- expectReadToEnd(new LinkedHashMap<>());
- expectConvertWriteRead(
- TASK_CONFIG_KEYS.get(2),
KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(3),
- "properties", SAMPLE_CONFIGS.get(2));
- expectReadToEnd(new LinkedHashMap<>());
- expectConvertWriteRead(
- COMMIT_TASKS_CONFIG_KEYS.get(1),
KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(4),
- "tasks", 1); // Starts with 2 tasks, after update has 3
+ @SuppressWarnings("unchecked")
+ Future<RecordMetadata> targetStateProducerFuture = mock(Future.class);
- // As soon as root is rewritten, we should see a callback notifying us
that we reconfigured some tasks
-
configUpdateListener.onTaskConfigUpdate(Collections.singletonList(TASK_IDS.get(2)));
- EasyMock.expectLastCall();
+ when(configLog.sendWithReceipt(anyString(), isNull()))
+ // tombstone for the connector config
+ .thenReturn(connectorConfigProducerFuture)
+ // tombstone for the connector target state
+ .thenReturn(targetStateProducerFuture);
- // Records to be read by consumer as it reads to the end of the log
- serializedConfigs = new LinkedHashMap<>();
- serializedConfigs.put(TASK_CONFIG_KEYS.get(2),
CONFIGS_SERIALIZED.get(3));
- serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(1),
CONFIGS_SERIALIZED.get(4));
- expectReadToEnd(serializedConfigs);
+
when(connectorConfigProducerFuture.get(eq(READ_WRITE_TOTAL_TIMEOUT_MS),
any(TimeUnit.class)))
+ .thenAnswer((Answer<RecordMetadata>) invocation -> {
+ time.sleep(READ_WRITE_TOTAL_TIMEOUT_MS - 1000);
+ return null;
+ });
- expectPartitionCount(1);
- expectStop();
+ // the future get timeout is expected to be reduced according to how
long the previous Future::get took
+ when(targetStateProducerFuture.get(eq(1000L), any(TimeUnit.class)))
+ .thenAnswer((Answer<RecordMetadata>) invocation -> {
+ time.sleep(1000);
+ return null;
+ });
- PowerMock.replayAll();
+ @SuppressWarnings("unchecked")
+ Future<Void> future = mock(Future.class);
+ when(configLog.readToEnd()).thenReturn(future);
- configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
- configStorage.start();
+ // the Future::get calls on the previous two producer futures
exhausted the overall timeout; so expect the
+ // timeout on the log read future to be 0
+ when(future.get(eq(0L), any(TimeUnit.class))).thenReturn(null);
- // Bootstrap as if we had already added the connector, but no tasks
had been added yet
- whiteboxAddConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0),
Collections.emptyList());
- whiteboxAddConnector(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(1),
Collections.emptyList());
+ configStorage.removeConnectorConfig("test-connector");
+ configStorage.stop();
+ verify(configLog).stop();
+ }
- // Null before writing
- ClusterConfigState configState = configStorage.snapshot();
- assertEquals(-1, configState.offset());
- assertNull(configState.taskConfig(TASK_IDS.get(0)));
- assertNull(configState.taskConfig(TASK_IDS.get(1)));
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testWritePrivileges() throws Exception {
+ // With exactly.once.source.support = preparing (or also, "enabled"),
we need to use a transactional producer
+ // to write some types of messages to the config topic
+ props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "preparing");
+ createStore();
- // Writing task configs should block until all the writes have been
performed and the root record update
- // has completed
- List<Map<String, String>> taskConfigs =
Arrays.asList(SAMPLE_CONFIGS.get(0), SAMPLE_CONFIGS.get(1));
- configStorage.putTaskConfigs("connector1", taskConfigs);
- taskConfigs = Collections.singletonList(SAMPLE_CONFIGS.get(2));
- configStorage.putTaskConfigs("connector2", taskConfigs);
+ when(configLog.partitionCount()).thenReturn(1);
- // Validate root config by listing all connectors and tasks
- configState = configStorage.snapshot();
- assertEquals(5, configState.offset());
- String connectorName1 = CONNECTOR_IDS.get(0);
- String connectorName2 = CONNECTOR_IDS.get(1);
- assertEquals(Arrays.asList(connectorName1, connectorName2), new
ArrayList<>(configState.connectors()));
- assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)),
configState.tasks(connectorName1));
- assertEquals(Collections.singletonList(TASK_IDS.get(2)),
configState.tasks(connectorName2));
- assertEquals(SAMPLE_CONFIGS.get(0),
configState.taskConfig(TASK_IDS.get(0)));
- assertEquals(SAMPLE_CONFIGS.get(1),
configState.taskConfig(TASK_IDS.get(1)));
- assertEquals(SAMPLE_CONFIGS.get(2),
configState.taskConfig(TASK_IDS.get(2)));
- assertEquals(Collections.EMPTY_SET,
configState.inconsistentConnectors());
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+ configStorage.start();
- configStorage.stop();
+ // Try and fail to write a task count record to the config topic
without write privileges
+ when(converter.fromConnectData(TOPIC,
KafkaConfigBackingStore.TASK_COUNT_RECORD_V0,
CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(0)))
+ .thenReturn(CONFIGS_SERIALIZED.get(0));
+
+ // Should fail the first time since we haven't claimed write privileges
+ assertThrows(IllegalStateException.class, () ->
configStorage.putTaskCountRecord(CONNECTOR_IDS.get(0), 6));
+
+ // Claim write privileges
+
doReturn(fencableProducer).when(configStorage).createFencableProducer();
+ // And write the task count record successfully
+
when(fencableProducer.send(any(ProducerRecord.class))).thenReturn(null);
+
doAnswer(expectReadToEnd(Collections.singletonMap(CONNECTOR_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(0))))
+
.doAnswer(expectReadToEnd(Collections.singletonMap(CONNECTOR_CONFIG_KEYS.get(1),
CONFIGS_SERIALIZED.get(2))))
+ .when(configLog).readToEnd();
+ when(converter.toConnectData(TOPIC, CONFIGS_SERIALIZED.get(0)))
+ .thenReturn(new SchemaAndValue(null,
structToMap(CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(0))));
+
+ // Should succeed now
+ configStorage.claimWritePrivileges();
+ configStorage.putTaskCountRecord(CONNECTOR_IDS.get(0), 6);
+
+ verify(fencableProducer).beginTransaction();
+ verify(fencableProducer).commitTransaction();
+
+ // Try to write a connector config
+ when(converter.fromConnectData(TOPIC,
KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0,
CONNECTOR_CONFIG_STRUCTS.get(0)))
+ .thenReturn(CONFIGS_SERIALIZED.get(1));
+ // Get fenced out
+ doThrow(new ProducerFencedException("Better luck next time"))
+ .doNothing()
+ .when(fencableProducer).commitTransaction();
+
+ // Should fail again when we get fenced out
+ assertThrows(PrivilegedWriteException.class, () ->
configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(0),
null));
+
+ verify(fencableProducer, times(2)).beginTransaction();
+ verify(fencableProducer).close(Duration.ZERO);
+
+ // Should fail if we retry without reclaiming write privileges
+ assertThrows(IllegalStateException.class, () ->
configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(0),
null));
+
+ // In the meantime, write a target state (which doesn't require write
privileges)
+ when(converter.fromConnectData(TOPIC,
KafkaConfigBackingStore.TARGET_STATE_V1, TARGET_STATE_PAUSED))
+ .thenReturn(CONFIGS_SERIALIZED.get(1));
+ when(configLog.sendWithReceipt("target-state-" + CONNECTOR_IDS.get(1),
CONFIGS_SERIALIZED.get(1)))
+ .thenReturn(producerFuture);
+ when(producerFuture.get(anyLong(),
any(TimeUnit.class))).thenReturn(null);
+
+ // Should succeed even without write privileges (target states can be
written by anyone)
+ configStorage.putTargetState(CONNECTOR_IDS.get(1), TargetState.PAUSED);
+
+ // Reclaim write privileges and successfully write the config
+ when(converter.toConnectData(TOPIC, CONFIGS_SERIALIZED.get(2)))
+ .thenReturn(new SchemaAndValue(null,
structToMap(CONNECTOR_CONFIG_STRUCTS.get(0))));
+
+ // Should succeed if we re-claim write privileges
+ configStorage.claimWritePrivileges();
+ configStorage.putConnectorConfig(CONNECTOR_IDS.get(1),
SAMPLE_CONFIGS.get(0), null);
+
+ verify(fencableProducer, times(3)).beginTransaction();
+ verify(fencableProducer, times(3)).commitTransaction();
+
verify(configUpdateListener).onConnectorConfigUpdate(CONNECTOR_IDS.get(1));
- PowerMock.verifyAll();
+ configStorage.stop();
+ verify(configLog).stop();
+ verify(configStorage, times(2)).createFencableProducer();
+ verify(fencableProducer, times(2)).close(Duration.ZERO);
}
- private void expectConfigure() throws Exception {
- PowerMock.expectPrivate(configStorage, "createKafkaBasedLog",
- EasyMock.capture(capturedTopic),
EasyMock.capture(capturedProducerProps),
- EasyMock.capture(capturedConsumerProps),
EasyMock.capture(capturedConsumedCallback),
- EasyMock.capture(capturedNewTopic),
EasyMock.capture(capturedAdminSupplier),
- EasyMock.anyObject(WorkerConfig.class),
EasyMock.anyObject(Time.class))
- .andReturn(storeLog);
- }
+ @Test
+ public void testRestoreTargetStateUnexpectedDeletion() {
+ List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+ new ConsumerRecord<>(TOPIC, 0, 0, 0L,
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(0), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 1, 0L,
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(1), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 2, 0L,
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
+ CONFIGS_SERIALIZED.get(2), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 3, 0L,
TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(3), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 4, 0L,
TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(4), new RecordHeaders(),
Optional.empty()));
+ LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
+ deserialized.put(CONFIGS_SERIALIZED.get(0),
CONNECTOR_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(1),
TASK_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(2),
TASK_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(3), null);
+ deserialized.put(CONFIGS_SERIALIZED.get(4),
TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+ logOffset = 5;
+
+ expectStart(existingRecords, deserialized);
+ when(configLog.partitionCount()).thenReturn(1);
- private void expectPartitionCount(int partitionCount) {
- EasyMock.expect(storeLog.partitionCount())
- .andReturn(partitionCount);
- }
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+ configStorage.start();
- // If non-empty, deserializations should be a LinkedHashMap
- private void expectStart(final List<ConsumerRecord<String, byte[]>>
preexistingRecords,
- final Map<byte[], Struct> deserializations) {
- storeLog.start();
- PowerMock.expectLastCall().andAnswer(() -> {
- for (ConsumerRecord<String, byte[]> rec : preexistingRecords)
- capturedConsumedCallback.getValue().onCompletion(null, rec);
- return null;
- });
- for (Map.Entry<byte[], Struct> deserializationEntry :
deserializations.entrySet()) {
- // Note null schema because default settings for internal
serialization are schema-less
- EasyMock.expect(converter.toConnectData(EasyMock.eq(TOPIC),
EasyMock.aryEq(deserializationEntry.getKey())))
- .andReturn(new SchemaAndValue(null,
structToMap(deserializationEntry.getValue())));
- }
- }
+ // The target state deletion should reset the state to STARTED
+ ClusterConfigState configState = configStorage.snapshot();
+ assertEquals(5, configState.offset()); // Should always be next to be
read, even if uncommitted
+ assertEquals(Collections.singletonList(CONNECTOR_IDS.get(0)), new
ArrayList<>(configState.connectors()));
+ assertEquals(TargetState.STARTED,
configState.targetState(CONNECTOR_IDS.get(0)));
- private void expectStop() {
- storeLog.stop();
- PowerMock.expectLastCall();
+ configStorage.stop();
+ verify(configLog).stop();
}
- private void expectRead(LinkedHashMap<String, byte[]> serializedValues,
- Map<String, Struct> deserializedValues) {
- expectReadToEnd(serializedValues);
- for (Map.Entry<String, Struct> deserializedValueEntry :
deserializedValues.entrySet()) {
- byte[] serializedValue =
serializedValues.get(deserializedValueEntry.getKey());
- EasyMock.expect(converter.toConnectData(EasyMock.eq(TOPIC),
EasyMock.aryEq(serializedValue)))
- .andReturn(new SchemaAndValue(null,
structToMap(deserializedValueEntry.getValue())));
- }
- }
+ @Test
+ public void testRestoreTargetState() {
+ List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+ new ConsumerRecord<>(TOPIC, 0, 0, 0L,
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(0), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 1, 0L,
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(1), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 2, 0L,
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
+ CONFIGS_SERIALIZED.get(2), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 3, 0L,
TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(3), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 4, 0L,
TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(4), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 5, 0L,
TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(1),
+ CONFIGS_SERIALIZED.get(5), new RecordHeaders(),
Optional.empty()));
+ LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
+ deserialized.put(CONFIGS_SERIALIZED.get(0),
CONNECTOR_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(1),
TASK_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(2),
TASK_CONFIG_STRUCTS.get(0));
+ // A worker running an older version wrote this target state; make
sure we can handle it correctly
+ deserialized.put(CONFIGS_SERIALIZED.get(3),
TARGET_STATE_PAUSED_LEGACY);
+ deserialized.put(CONFIGS_SERIALIZED.get(4),
TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+ deserialized.put(CONFIGS_SERIALIZED.get(5), TARGET_STATE_STOPPED);
+ logOffset = 6;
+
+ expectStart(existingRecords, deserialized);
+
+ // Shouldn't see any callbacks since this is during startup
+ when(configLog.partitionCount()).thenReturn(1);
- // Expect a conversion & write to the underlying log, followed by a
subsequent read when the data is consumed back
- // from the log. Validate the data that is captured when the conversion is
performed matches the specified data
- // (by checking a single field's value)
- private void expectConvertWriteRead(final String configKey, final Schema
valueSchema, final byte[] serialized,
- final String dataFieldName, final
Object dataFieldValue) throws Exception {
- final Capture<Struct> capturedRecord = EasyMock.newCapture();
- if (serialized != null)
- EasyMock.expect(converter.fromConnectData(EasyMock.eq(TOPIC),
EasyMock.eq(valueSchema), EasyMock.capture(capturedRecord)))
- .andReturn(serialized);
-
- storeLog.sendWithReceipt(EasyMock.eq(configKey),
EasyMock.aryEq(serialized));
- EasyMock.expectLastCall().andReturn(producerFuture);
-
- producerFuture.get(EasyMock.anyLong(), EasyMock.anyObject());
- EasyMock.expectLastCall().andReturn(null);
-
- EasyMock.expect(converter.toConnectData(EasyMock.eq(TOPIC),
EasyMock.aryEq(serialized)))
- .andAnswer(() -> {
- if (dataFieldName != null)
- assertEquals(dataFieldValue,
capturedRecord.getValue().get(dataFieldName));
- // Note null schema because default settings for internal
serialization are schema-less
- return new SchemaAndValue(null, serialized == null ? null
: structToMap(capturedRecord.getValue()));
- });
- }
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+ configStorage.start();
- // This map needs to maintain ordering
- private void expectReadToEnd(final LinkedHashMap<String, byte[]>
serializedConfigs) {
- EasyMock.expect(storeLog.readToEnd())
- .andAnswer(() -> {
- TestFuture<Void> future = new TestFuture<>();
- for (Map.Entry<String, byte[]> entry :
serializedConfigs.entrySet()) {
- capturedConsumedCallback.getValue().onCompletion(null,
- new ConsumerRecord<>(TOPIC, 0, logOffset++, 0L,
TimestampType.CREATE_TIME, 0, 0,
- entry.getKey(), entry.getValue(), new
RecordHeaders(), Optional.empty()));
- }
- future.resolveOnGet((Void) null);
- return future;
- });
- }
+ // Should see a single connector with initial state paused
+ ClusterConfigState configState = configStorage.snapshot();
+ assertEquals(6, configState.offset()); // Should always be next to be
read, even if uncommitted
+ assertEquals(Collections.singletonList(CONNECTOR_IDS.get(0)), new
ArrayList<>(configState.connectors()));
+ assertEquals(TargetState.PAUSED,
configState.targetState(CONNECTOR_IDS.get(0)));
+ assertEquals(TargetState.STOPPED,
configState.targetState(CONNECTOR_IDS.get(1)));
- // Manually insert a connector into config storage, updating the task
configs, connector config, and root config
- private void whiteboxAddConnector(String connectorName, Map<String,
String> connectorConfig, List<Map<String, String>> taskConfigs) {
- Map<ConnectorTaskId, Map<String, String>> storageTaskConfigs =
Whitebox.getInternalState(configStorage, "taskConfigs");
- for (int i = 0; i < taskConfigs.size(); i++)
- storageTaskConfigs.put(new ConnectorTaskId(connectorName, i),
taskConfigs.get(i));
+ configStorage.stop();
+ verify(configLog).stop();
+ }
- Map<String, Map<String, String>> connectorConfigs =
Whitebox.getInternalState(configStorage, "connectorConfigs");
- connectorConfigs.put(connectorName, connectorConfig);
+ @Test
+ public void testRestore() {
+ // Restoring data should notify only of the latest values after
loading is complete. This also validates
+ // that inconsistent state is ignored.
+
+ // Overwrite each type at least once to ensure we see the latest data
after loading
+ List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+ new ConsumerRecord<>(TOPIC, 0, 0, 0L,
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(0), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 1, 0L,
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(1), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 2, 0L,
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(2), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 3, 0L,
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
+ CONFIGS_SERIALIZED.get(3), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 4, 0L,
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(4), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 5, 0L,
TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(5), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 6, 0L,
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_TASK_COUNT_RECORD_KEYS.get(1),
+ CONFIGS_SERIALIZED.get(6), new RecordHeaders(),
Optional.empty()),
+ // Connector after root update should make it through, task
update shouldn't
+ new ConsumerRecord<>(TOPIC, 0, 7, 0L,
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(7), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 8, 0L,
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(8), new RecordHeaders(),
Optional.empty()));
+ LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
+ deserialized.put(CONFIGS_SERIALIZED.get(0),
CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(1),
CONNECTOR_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(2),
TASK_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(3),
TASK_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(4),
CONNECTOR_CONFIG_STRUCTS.get(1));
+ deserialized.put(CONFIGS_SERIALIZED.get(5),
TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+ deserialized.put(CONFIGS_SERIALIZED.get(6),
CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(1));
+ deserialized.put(CONFIGS_SERIALIZED.get(7),
CONNECTOR_CONFIG_STRUCTS.get(2));
+ deserialized.put(CONFIGS_SERIALIZED.get(8),
TASK_CONFIG_STRUCTS.get(1));
+ logOffset = 9;
+
+ expectStart(existingRecords, deserialized);
+ when(configLog.partitionCount()).thenReturn(1);
- Whitebox.<Map<String, Integer>>getInternalState(configStorage,
"connectorTaskCounts").put(connectorName, taskConfigs.size());
- }
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+ configStorage.start();
- // Generates a Map representation of Struct. Only does shallow traversal,
so nested structs are not converted
+ // Should see a single connector and its config should be the last one
seen anywhere in the log
+ ClusterConfigState configState = configStorage.snapshot();
+ assertEquals(logOffset, configState.offset()); // Should always be
next to be read, even if uncommitted
+ assertEquals(Collections.singletonList(CONNECTOR_IDS.get(0)), new
ArrayList<>(configState.connectors()));
+ assertEquals(TargetState.STARTED,
configState.targetState(CONNECTOR_IDS.get(0)));
+ // CONNECTOR_CONFIG_STRUCTS[2] -> SAMPLE_CONFIGS[2]
+ assertEquals(SAMPLE_CONFIGS.get(2),
configState.connectorConfig(CONNECTOR_IDS.get(0)));
+ // Should see 2 tasks for that connector. Only config updates before
the root key update should be reflected
+ assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)),
configState.tasks(CONNECTOR_IDS.get(0)));
+ // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0]
+ assertEquals(SAMPLE_CONFIGS.get(0),
configState.taskConfig(TASK_IDS.get(0)));
+ assertEquals(SAMPLE_CONFIGS.get(0),
configState.taskConfig(TASK_IDS.get(1)));
+ assertEquals(9, (int)
configState.taskCountRecord(CONNECTOR_IDS.get(1)));
+ assertEquals(Collections.EMPTY_SET,
configState.inconsistentConnectors());
+ assertEquals(Collections.singleton("connector1"),
configState.connectorsPendingFencing);
+
+ // Shouldn't see any callbacks since this is during startup
+ configStorage.stop();
+ verify(configLog).stop();
+ }
+
+ @Test
+ public void testRestoreConnectorDeletion() {
+ // Restoring data should notify only of the latest values after
loading is complete. This also validates
+ // that inconsistent state is ignored.
+
+ // Overwrite each type at least once to ensure we see the latest data
after loading
+ List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+ new ConsumerRecord<>(TOPIC, 0, 0, 0L,
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(0), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 1, 0L,
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(1), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 2, 0L,
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
+ CONFIGS_SERIALIZED.get(2), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 3, 0L,
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(3), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 4, 0L,
TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(4), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 5, 0L,
TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(5), new RecordHeaders(),
Optional.empty()));
+
+ LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
+ deserialized.put(CONFIGS_SERIALIZED.get(0),
CONNECTOR_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(1),
TASK_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(2),
TASK_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(3), null);
+ deserialized.put(CONFIGS_SERIALIZED.get(4), null);
+ deserialized.put(CONFIGS_SERIALIZED.get(5),
TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+
+ logOffset = 6;
+ expectStart(existingRecords, deserialized);
+ when(configLog.partitionCount()).thenReturn(1);
+
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+ configStorage.start();
+
+ // Should see a single connector and its config should be the last one
seen anywhere in the log
+ ClusterConfigState configState = configStorage.snapshot();
+ assertEquals(6, configState.offset()); // Should always be next to be
read, even if uncommitted
+ assertTrue(configState.connectors().isEmpty());
+
+ // Shouldn't see any callbacks since this is during startup
+ configStorage.stop();
+ verify(configLog).stop();
+ }
+
+ @Test
+ public void testRestoreZeroTasks() {
+ // Restoring data should notify only of the latest values after
loading is complete. This also validates
+ // that inconsistent state is ignored.
+
+ // Overwrite each type at least once to ensure we see the latest data
after loading
+ List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+ new ConsumerRecord<>(TOPIC, 0, 0, 0L,
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(0), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 1, 0L,
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(1), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 2, 0L,
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
+ CONFIGS_SERIALIZED.get(2), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 3, 0L,
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(3), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 4, 0L,
TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(4), new RecordHeaders(),
Optional.empty()),
+ // Connector after root update should make it through, task
update shouldn't
+ new ConsumerRecord<>(TOPIC, 0, 5, 0L,
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(5), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 6, 0L,
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(6), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 7, 0L,
TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(7), new RecordHeaders(),
Optional.empty()));
+ LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
+ deserialized.put(CONFIGS_SERIALIZED.get(0),
CONNECTOR_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(1),
TASK_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(2),
TASK_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(3),
CONNECTOR_CONFIG_STRUCTS.get(1));
+ deserialized.put(CONFIGS_SERIALIZED.get(4),
TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+ deserialized.put(CONFIGS_SERIALIZED.get(5),
CONNECTOR_CONFIG_STRUCTS.get(2));
+ deserialized.put(CONFIGS_SERIALIZED.get(6),
TASK_CONFIG_STRUCTS.get(1));
+ deserialized.put(CONFIGS_SERIALIZED.get(7),
TASKS_COMMIT_STRUCT_ZERO_TASK_CONNECTOR);
+ logOffset = 8;
+ expectStart(existingRecords, deserialized);
+ when(configLog.partitionCount()).thenReturn(1);
+
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+ configStorage.start();
+
+ // Should see a single connector and its config should be the last one
seen anywhere in the log
+ ClusterConfigState configState = configStorage.snapshot();
+ assertEquals(8, configState.offset()); // Should always be next to be
read, even if uncommitted
+ assertEquals(Collections.singletonList(CONNECTOR_IDS.get(0)), new
ArrayList<>(configState.connectors()));
+ // CONNECTOR_CONFIG_STRUCTS[2] -> SAMPLE_CONFIGS[2]
+ assertEquals(SAMPLE_CONFIGS.get(2),
configState.connectorConfig(CONNECTOR_IDS.get(0)));
+ // Should see 0 tasks for that connector.
+ assertEquals(Collections.emptyList(),
configState.tasks(CONNECTOR_IDS.get(0)));
+ // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0]
+ assertEquals(Collections.EMPTY_SET,
configState.inconsistentConnectors());
+
+ // Shouldn't see any callbacks since this is during startup
+ configStorage.stop();
+ verify(configLog).stop();
+ }
+
+ @Test
+ public void testRecordToRestartRequest() {
+ ConsumerRecord<String, byte[]> record = new ConsumerRecord<>(TOPIC, 0,
0, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(0), new RecordHeaders(),
Optional.empty());
+ Struct struct = RESTART_REQUEST_STRUCTS.get(0);
+ SchemaAndValue schemaAndValue = new SchemaAndValue(struct.schema(),
structToMap(struct));
+ RestartRequest restartRequest =
configStorage.recordToRestartRequest(record, schemaAndValue);
+ assertEquals(CONNECTOR_1_NAME, restartRequest.connectorName());
+ assertEquals(struct.getBoolean(INCLUDE_TASKS_FIELD_NAME),
restartRequest.includeTasks());
+ assertEquals(struct.getBoolean(ONLY_FAILED_FIELD_NAME),
restartRequest.onlyFailed());
+ }
+
+ @Test
+ public void testRecordToRestartRequestOnlyFailedInconsistent() {
+ ConsumerRecord<String, byte[]> record = new ConsumerRecord<>(TOPIC, 0,
0, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(0), new RecordHeaders(),
Optional.empty());
+ Struct struct = ONLY_FAILED_MISSING_STRUCT;
+ SchemaAndValue schemaAndValue = new SchemaAndValue(struct.schema(),
structToMap(struct));
+ RestartRequest restartRequest =
configStorage.recordToRestartRequest(record, schemaAndValue);
+ assertEquals(CONNECTOR_1_NAME, restartRequest.connectorName());
+ assertEquals(struct.getBoolean(INCLUDE_TASKS_FIELD_NAME),
restartRequest.includeTasks());
+ assertFalse(restartRequest.onlyFailed());
+ }
+
+ @Test
+ public void testRecordToRestartRequestIncludeTasksInconsistent() {
+ ConsumerRecord<String, byte[]> record = new ConsumerRecord<>(TOPIC, 0,
0, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(0), new RecordHeaders(),
Optional.empty());
+ Struct struct = INCLUDE_TASKS_MISSING_STRUCT;
+ SchemaAndValue schemaAndValue = new SchemaAndValue(struct.schema(),
structToMap(struct));
+ RestartRequest restartRequest =
configStorage.recordToRestartRequest(record, schemaAndValue);
+ assertEquals(CONNECTOR_1_NAME, restartRequest.connectorName());
+ assertFalse(restartRequest.includeTasks());
+ assertEquals(struct.getBoolean(ONLY_FAILED_FIELD_NAME),
restartRequest.onlyFailed());
+ }
+
+ @Test
+ public void testFencableProducerPropertiesOverrideUserSuppliedValues() {
+ props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "preparing");
+ String groupId = "my-other-connect-cluster";
+ props.put(GROUP_ID_CONFIG, groupId);
+ props.put(TRANSACTIONAL_ID_CONFIG, "my-custom-transactional-id");
+ props.put(ENABLE_IDEMPOTENCE_CONFIG, "false");
+ createStore();
+
+ Map<String, Object> fencableProducerProperties =
configStorage.fencableProducerProps(config);
+ assertEquals("connect-cluster-" + groupId,
fencableProducerProperties.get(TRANSACTIONAL_ID_CONFIG));
+ assertEquals("true",
fencableProducerProperties.get(ENABLE_IDEMPOTENCE_CONFIG));
+ }
+
+ @Test
+ public void
testConsumerPropertiesDoNotOverrideUserSuppliedValuesWithoutExactlyOnceSourceEnabled()
{
+ props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "preparing");
+ props.put(ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_UNCOMMITTED.toString());
+ createStore();
+
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+
+ assertEquals(
+ IsolationLevel.READ_UNCOMMITTED.toString(),
+ capturedConsumerProps.getValue().get(ISOLATION_LEVEL_CONFIG)
+ );
+ }
+
+ @Test
+ public void testClientIds() {
+ props = new HashMap<>(DEFAULT_CONFIG_STORAGE_PROPS);
+ props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled");
+ createStore();
+
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+
+ Map<String, Object> fencableProducerProps =
configStorage.fencableProducerProps(config);
+
+ final String expectedClientId = CLIENT_ID_BASE + "configs";
+ assertEquals(expectedClientId,
capturedProducerProps.getValue().get(CLIENT_ID_CONFIG));
+ assertEquals(expectedClientId,
capturedConsumerProps.getValue().get(CLIENT_ID_CONFIG));
+ assertEquals(expectedClientId + "-leader",
fencableProducerProps.get(CLIENT_ID_CONFIG));
+ }
+
+ @Test
+ public void testExceptionOnStartWhenConfigTopicHasMultiplePartitions() {
+ when(configLog.partitionCount()).thenReturn(2);
+
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ ConfigException e = assertThrows(ConfigException.class, () ->
configStorage.start());
+ assertTrue(e.getMessage().contains("required to have a single
partition"));
+ }
+
+ @Test
+ public void testFencableProducerPropertiesInsertedByDefault() {
+ props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "preparing");
+ String groupId = "my-connect-cluster";
+ props.put(GROUP_ID_CONFIG, groupId);
+ props.remove(TRANSACTIONAL_ID_CONFIG);
+ props.remove(ENABLE_IDEMPOTENCE_CONFIG);
+ createStore();
+
+ Map<String, Object> fencableProducerProperties =
configStorage.fencableProducerProps(config);
+ assertEquals("connect-cluster-" + groupId,
fencableProducerProperties.get(TRANSACTIONAL_ID_CONFIG));
+ assertEquals("true",
fencableProducerProperties.get(ENABLE_IDEMPOTENCE_CONFIG));
+ }
+
+ @Test
+ public void
testConsumerPropertiesInsertedByDefaultWithExactlyOnceSourceEnabled() {
+ props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled");
+ props.remove(ISOLATION_LEVEL_CONFIG);
+ createStore();
+
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+
+ assertEquals(
+ IsolationLevel.READ_COMMITTED.toString(),
+ capturedConsumerProps.getValue().get(ISOLATION_LEVEL_CONFIG)
+ );
+ }
+
+ @Test
+ public void
testConsumerPropertiesOverrideUserSuppliedValuesWithExactlyOnceSourceEnabled() {
+ props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled");
+ props.put(ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_UNCOMMITTED.toString());
+ createStore();
+
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+
+ assertEquals(
+ IsolationLevel.READ_COMMITTED.toString(),
+ capturedConsumerProps.getValue().get(ISOLATION_LEVEL_CONFIG)
+ );
+ }
+
+ @Test
+ public void
testConsumerPropertiesNotInsertedByDefaultWithoutExactlyOnceSourceEnabled() {
+ props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "preparing");
+ props.remove(ISOLATION_LEVEL_CONFIG);
+ createStore();
+
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+
+
assertNull(capturedConsumerProps.getValue().get(ISOLATION_LEVEL_CONFIG));
+ }
+
+ @Test
+ public void testBackgroundConnectorDeletion() throws Exception {
+ // verify that we handle connector deletions correctly when they come
up through the log
+ List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+ new ConsumerRecord<>(TOPIC, 0, 0, 0L,
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(0), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 1, 0L,
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(1), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 2, 0L,
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
+ CONFIGS_SERIALIZED.get(2), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 3, 0L,
TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(3), new RecordHeaders(),
Optional.empty()));
+ LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
+ deserialized.put(CONFIGS_SERIALIZED.get(0),
CONNECTOR_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(1),
TASK_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(2),
TASK_CONFIG_STRUCTS.get(1));
+ deserialized.put(CONFIGS_SERIALIZED.get(3),
TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+ logOffset = 5;
+
+ expectStart(existingRecords, deserialized);
+ when(configLog.partitionCount()).thenReturn(1);
+
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+
+ configStorage.start();
+ verify(configLog).start();
+
+ // Should see a single connector with initial state paused
+ ClusterConfigState configState = configStorage.snapshot();
+ assertEquals(TargetState.STARTED,
configState.targetState(CONNECTOR_IDS.get(0)));
+ assertEquals(SAMPLE_CONFIGS.get(0),
configState.connectorConfig(CONNECTOR_IDS.get(0)));
+ assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(new
ConnectorTaskId(CONNECTOR_IDS.get(0), 0)));
+ assertEquals(SAMPLE_CONFIGS.get(1), configState.taskConfig(new
ConnectorTaskId(CONNECTOR_IDS.get(0), 1)));
+ assertEquals(2, configState.taskCount(CONNECTOR_IDS.get(0)));
+
+ LinkedHashMap<String, byte[]> serializedData = new LinkedHashMap<>();
+ serializedData.put(CONNECTOR_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(0));
+ serializedData.put(TARGET_STATE_KEYS.get(0),
CONFIGS_SERIALIZED.get(1));
+ doAnswer(expectReadToEnd(serializedData)).when(configLog).readToEnd();
+
+ Map<String, Struct> deserializedData = new HashMap<>();
+ deserializedData.put(CONNECTOR_CONFIG_KEYS.get(0), null);
+ deserializedData.put(TARGET_STATE_KEYS.get(0), null);
+ expectRead(serializedData, deserializedData);
+
+ configStorage.refresh(0, TimeUnit.SECONDS);
+
verify(configUpdateListener).onConnectorConfigRemove(CONNECTOR_IDS.get(0));
+
+ configState = configStorage.snapshot();
+ // Connector should now be removed from the snapshot
+ assertFalse(configState.contains(CONNECTOR_IDS.get(0)));
+ assertEquals(0, configState.taskCount(CONNECTOR_IDS.get(0)));
+ // Ensure that the deleted connector's deferred task updates have been
cleaned up
+ // in order to prevent unbounded growth of the map
+ assertEquals(Collections.emptyMap(),
configStorage.deferredTaskUpdates);
+
+ configStorage.stop();
+ verify(configLog).stop();
+ }
+
+ @Test
+ public void testPutTaskConfigsDoesNotResolveAllInconsistencies() throws
Exception {
+ // Test a case where a failure and compaction has left us in an
inconsistent state when reading the log.
+ // We start out by loading an initial configuration where we started
to write a task update, and then
+ // compaction cleaned up the earlier record.
+ List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+ new ConsumerRecord<>(TOPIC, 0, 0, 0L,
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(0), new RecordHeaders(),
Optional.empty()),
+ // This is the record that has been compacted:
+ //new ConsumerRecord<>(TOPIC, 0, 1, TASK_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(1)),
+ new ConsumerRecord<>(TOPIC, 0, 2, 0L,
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
+ CONFIGS_SERIALIZED.get(2), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 4, 0L,
TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(4), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 5, 0L,
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(5), new RecordHeaders(),
Optional.empty()));
+ LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
+ deserialized.put(CONFIGS_SERIALIZED.get(0),
CONNECTOR_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(2),
TASK_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(4),
TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+ deserialized.put(CONFIGS_SERIALIZED.get(5),
TASK_CONFIG_STRUCTS.get(1));
+ logOffset = 6;
+ expectStart(existingRecords, deserialized);
+ when(configLog.partitionCount()).thenReturn(1);
+
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+ configStorage.start();
+
+ // After reading the log, it should have been in an inconsistent state
+ ClusterConfigState configState = configStorage.snapshot();
+ assertEquals(6, configState.offset()); // Should always be next to be
read, not last committed
+ assertEquals(Collections.singletonList(CONNECTOR_IDS.get(0)), new
ArrayList<>(configState.connectors()));
+ // Inconsistent data should leave us with no tasks listed for the
connector and an entry in the inconsistent list
+ assertEquals(Collections.emptyList(),
configState.tasks(CONNECTOR_IDS.get(0)));
+ // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0]
+ assertNull(configState.taskConfig(TASK_IDS.get(0)));
+ assertNull(configState.taskConfig(TASK_IDS.get(1)));
+ assertEquals(Collections.singleton(CONNECTOR_IDS.get(0)),
configState.inconsistentConnectors());
+
+ // Records to be read by consumer as it reads to the end of the log
+ LinkedHashMap<String, byte[]> serializedConfigs = new
LinkedHashMap<>();
+ serializedConfigs.put(TASK_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(0));
+ serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(2));
+ // Successful attempt to write new task config
+ doAnswer(expectReadToEnd(new LinkedHashMap<>()))
+ .doAnswer(expectReadToEnd(new LinkedHashMap<>()))
+ .doAnswer(expectReadToEnd(serializedConfigs))
+ .when(configLog).readToEnd();
+ expectConvertWriteRead(
+ TASK_CONFIG_KEYS.get(0),
KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
+ "properties", SAMPLE_CONFIGS.get(0));
+ expectConvertWriteRead(
+ COMMIT_TASKS_CONFIG_KEYS.get(0),
KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2),
+ "tasks", 1); // Updated to just 1 task
+
+ // Next, issue a write that has everything that is needed and it
should be accepted. Note that in this case
+ // we are going to shrink the number of tasks to 1
+ configStorage.putTaskConfigs("connector1",
Collections.singletonList(SAMPLE_CONFIGS.get(0)));
+
+ // Validate updated config
+ configState = configStorage.snapshot();
+ // This is only two more ahead of the last one because multiple calls
fail, and so their configs are not written
+ // to the topic. Only the last call with 1 task config + 1 commit
actually gets written.
+ assertEquals(8, configState.offset());
+ assertEquals(Collections.singletonList(CONNECTOR_IDS.get(0)), new
ArrayList<>(configState.connectors()));
+ assertEquals(Collections.singletonList(TASK_IDS.get(0)),
configState.tasks(CONNECTOR_IDS.get(0)));
+ assertEquals(SAMPLE_CONFIGS.get(0),
configState.taskConfig(TASK_IDS.get(0)));
+ assertEquals(Collections.EMPTY_SET,
configState.inconsistentConnectors());
+
+ // As soon as root is rewritten, we should see a callback notifying us
that we reconfigured some tasks
+
verify(configUpdateListener).onTaskConfigUpdate(Collections.singletonList(TASK_IDS.get(0)));
+
+ configStorage.stop();
+ verify(configLog).stop();
+ }
+
+ @Test
+ public void testPutRestartRequestOnlyFailed() throws Exception {
+ RestartRequest restartRequest = new
RestartRequest(CONNECTOR_IDS.get(0), true, false);
+ testPutRestartRequest(restartRequest);
+ }
+
+ @Test
+ public void testPutRestartRequestOnlyFailedIncludingTasks() throws
Exception {
+ RestartRequest restartRequest = new
RestartRequest(CONNECTOR_IDS.get(0), true, true);
+ testPutRestartRequest(restartRequest);
+ }
+
+ private void testPutRestartRequest(RestartRequest restartRequest) throws
Exception {
+ expectStart(Collections.emptyList(), Collections.emptyMap());
+ when(configLog.partitionCount()).thenReturn(1);
+
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+
+ configStorage.start();
+ verify(configLog).start();
+
+ LinkedHashMap<String, byte[]> recordsToRead = new LinkedHashMap<>();
+ recordsToRead.put(RESTART_CONNECTOR_KEYS.get(0),
CONFIGS_SERIALIZED.get(0));
+ doAnswer(expectReadToEnd(recordsToRead)).when(configLog).readToEnd();
+
+ expectConvertWriteRead2(
+ RESTART_CONNECTOR_KEYS.get(0),
KafkaConfigBackingStore.RESTART_REQUEST_V0, CONFIGS_SERIALIZED.get(0),
+ new
Struct(KafkaConfigBackingStore.RESTART_REQUEST_V0).put(ONLY_FAILED_FIELD_NAME,
restartRequest.onlyFailed()).put(INCLUDE_TASKS_FIELD_NAME,
restartRequest.includeTasks()));
+
+ // Writing should block until it is written and read back from Kafka
+ configStorage.putRestartRequest(restartRequest);
+
+ final ArgumentCaptor<RestartRequest> restartRequestCaptor =
ArgumentCaptor.forClass(RestartRequest.class);
+
verify(configUpdateListener).onRestartRequest(restartRequestCaptor.capture());
+
+ assertEquals(restartRequest.connectorName(),
restartRequestCaptor.getValue().connectorName());
+ assertEquals(restartRequest.onlyFailed(),
restartRequestCaptor.getValue().onlyFailed());
+ assertEquals(restartRequest.includeTasks(),
restartRequestCaptor.getValue().includeTasks());
+
+ configStorage.stop();
+ verify(configLog).stop();
+ }
+
+ @Test
+ public void testRestoreRestartRequestInconsistentState() {
+ // Restoring data should notify only of the latest values after
loading is complete. This also validates
+ // that inconsistent state doesn't prevent startup.
+ // Overwrite each type at least once to ensure we see the latest data
after loading
+ List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+ new ConsumerRecord<>(TOPIC, 0, 0, 0L,
TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(0), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 1, 0L,
TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(1),
+ CONFIGS_SERIALIZED.get(1), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 2, 0L,
TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(1),
+ CONFIGS_SERIALIZED.get(2), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 3, 0L,
TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(1),
+ CONFIGS_SERIALIZED.get(3), new RecordHeaders(),
Optional.empty()));
+ LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
+ deserialized.put(CONFIGS_SERIALIZED.get(0),
RESTART_REQUEST_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(1),
RESTART_REQUEST_STRUCTS.get(1));
+ deserialized.put(CONFIGS_SERIALIZED.get(2),
RESTART_REQUEST_STRUCTS.get(2));
+ deserialized.put(CONFIGS_SERIALIZED.get(3), null);
+ logOffset = 4;
+ expectStart(existingRecords, deserialized);
+ when(configLog.partitionCount()).thenReturn(1);
+
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+
+ configStorage.start();
+ verify(configLog).start();
+
+ // Shouldn't see any callbacks since this is during startup
+ verify(configUpdateListener,
never()).onConnectorConfigRemove(anyString());
+ verify(configUpdateListener,
never()).onConnectorConfigUpdate(anyString());
+ verify(configUpdateListener,
never()).onTaskConfigUpdate(anyCollection());
+ verify(configUpdateListener,
never()).onConnectorTargetStateChange(anyString());
+ verify(configUpdateListener,
never()).onSessionKeyUpdate(any(SessionKey.class));
+ verify(configUpdateListener,
never()).onRestartRequest(any(RestartRequest.class));
+ verify(configUpdateListener,
never()).onLoggingLevelUpdate(anyString(), anyString());
+
+ configStorage.stop();
+ verify(configLog).stop();
+ }
+
+ @Test
+ public void testPutTaskConfigsZeroTasks() throws Exception {
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+ configStorage.start();
+ verify(configLog).start();
+
+ // Records to be read by consumer as it reads to the end of the log
+ doAnswer(expectReadToEnd(new LinkedHashMap<>())).
+
doAnswer(expectReadToEnd(Collections.singletonMap(COMMIT_TASKS_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(0))))
+ .when(configLog).readToEnd();
+
+ expectConvertWriteRead(
+ COMMIT_TASKS_CONFIG_KEYS.get(0),
KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(0),
+ "tasks", 0); // We have 0 tasks
+
+ // Bootstrap as if we had already added the connector, but no tasks
had been added yet
+ addConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0),
Collections.emptyList());
+
+
+ // Null before writing
+ ClusterConfigState configState = configStorage.snapshot();
+ assertEquals(-1, configState.offset());
+
+ // Writing task configs should block until all the writes have been
performed and the root record update
+ // has completed
+ List<Map<String, String>> taskConfigs = Collections.emptyList();
+ configStorage.putTaskConfigs("connector1", taskConfigs);
+
+ // Validate root config by listing all connectors and tasks
+ configState = configStorage.snapshot();
+ assertEquals(1, configState.offset());
+ String connectorName = CONNECTOR_IDS.get(0);
+ assertEquals(Collections.singletonList(connectorName), new
ArrayList<>(configState.connectors()));
+ assertEquals(Collections.emptyList(),
configState.tasks(connectorName));
+ assertEquals(Collections.EMPTY_SET,
configState.inconsistentConnectors());
+
+ // As soon as root is rewritten, we should see a callback notifying us
that we reconfigured some tasks
+
verify(configUpdateListener).onTaskConfigUpdate(Collections.emptyList());
+
+ configStorage.stop();
+ verify(configLog).stop();
+ }
+
+ @Test
+ public void testBackgroundUpdateTargetState() throws Exception {
+ // verify that we handle target state changes correctly when they come
up through the log
+ List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+ new ConsumerRecord<>(TOPIC, 0, 0, 0L,
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(0), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 1, 0L,
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(1), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 2, 0L,
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
+ CONFIGS_SERIALIZED.get(2), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 3, 0L,
TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(3), new RecordHeaders(),
Optional.empty()));
+ LinkedHashMap<byte[], Struct> deserializedOnStartup = new
LinkedHashMap<>();
+ deserializedOnStartup.put(CONFIGS_SERIALIZED.get(0),
CONNECTOR_CONFIG_STRUCTS.get(0));
+ deserializedOnStartup.put(CONFIGS_SERIALIZED.get(1),
TASK_CONFIG_STRUCTS.get(0));
+ deserializedOnStartup.put(CONFIGS_SERIALIZED.get(2),
TASK_CONFIG_STRUCTS.get(0));
+ deserializedOnStartup.put(CONFIGS_SERIALIZED.get(3),
TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+ logOffset = 5;
+
+ expectStart(existingRecords, deserializedOnStartup);
+ when(configLog.partitionCount()).thenReturn(1);
+
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+ configStorage.start();
+ verify(configLog).start();
+
+ // Should see a single connector with initial state started
+ ClusterConfigState configState = configStorage.snapshot();
+ assertEquals(Collections.singleton(CONNECTOR_IDS.get(0)),
configStorage.connectorTargetStates.keySet());
+ assertEquals(TargetState.STARTED,
configState.targetState(CONNECTOR_IDS.get(0)));
+
+ LinkedHashMap<String, byte[]> serializedAfterStartup = new
LinkedHashMap<>();
+ serializedAfterStartup.put(TARGET_STATE_KEYS.get(0),
CONFIGS_SERIALIZED.get(0));
+ serializedAfterStartup.put(TARGET_STATE_KEYS.get(1),
CONFIGS_SERIALIZED.get(1));
+
doAnswer(expectReadToEnd(serializedAfterStartup)).when(configLog).readToEnd();
+
+ Map<String, Struct> deserializedAfterStartup = new HashMap<>();
+ deserializedAfterStartup.put(TARGET_STATE_KEYS.get(0),
TARGET_STATE_PAUSED);
+ deserializedAfterStartup.put(TARGET_STATE_KEYS.get(1),
TARGET_STATE_STOPPED);
+ expectRead(serializedAfterStartup, deserializedAfterStartup);
+
+ // Should see two connectors now, one paused and one stopped
+ configStorage.refresh(0, TimeUnit.SECONDS);
+
verify(configUpdateListener).onConnectorTargetStateChange(CONNECTOR_IDS.get(0));
+ configState = configStorage.snapshot();
+
+ assertEquals(new HashSet<>(CONNECTOR_IDS),
configStorage.connectorTargetStates.keySet());
+ assertEquals(TargetState.PAUSED,
configState.targetState(CONNECTOR_IDS.get(0)));
+ assertEquals(TargetState.STOPPED,
configState.targetState(CONNECTOR_IDS.get(1)));
+
+ configStorage.stop();
+ verify(configStorage).stop();
+ }
+
+ @Test
+ public void testSameTargetState() throws Exception {
+ // verify that we handle target state changes correctly when they come
up through the log
+ List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+ new ConsumerRecord<>(TOPIC, 0, 0, 0L,
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(0), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 1, 0L,
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(1), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 2, 0L,
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
+ CONFIGS_SERIALIZED.get(2), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 3, 0L,
TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(3), new RecordHeaders(),
Optional.empty()));
+ LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
+ deserialized.put(CONFIGS_SERIALIZED.get(0),
CONNECTOR_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(1),
TASK_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(2),
TASK_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(3),
TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+ logOffset = 5;
+
+ expectStart(existingRecords, deserialized);
+
+ when(configLog.partitionCount()).thenReturn(1);
+
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+ configStorage.start();
+ verify(configLog).start();
+
+ ClusterConfigState configState = configStorage.snapshot();
+ expectRead(TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(0),
TARGET_STATE_STARTED);
+ // Should see a single connector with initial state paused
+ assertEquals(TargetState.STARTED,
configState.targetState(CONNECTOR_IDS.get(0)));
+
+ expectRead(TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(0),
TARGET_STATE_STARTED);
+ // on resume update listener shouldn't be called
+ verify(configUpdateListener,
never()).onConnectorTargetStateChange(anyString());
+
+ configStorage.stop();
+ verify(configStorage).stop();
+ }
+
+
+ @Test
+ public void testPutLogLevel() throws Exception {
+ final String logger1 = "org.apache.zookeeper";
+ final String logger2 = "org.apache.cassandra";
+ final String logger3 = "org.apache.kafka.clients";
+ final String logger4 = "org.apache.kafka.connect";
+ final String level1 = "ERROR";
+ final String level3 = "WARN";
+ final String level4 = "DEBUG";
+
+ final Struct existingLogLevel = new
Struct(KafkaConfigBackingStore.LOGGER_LEVEL_V0)
+ .put("level", level1);
+
+ // Pre-populate the config topic with a couple of logger level
records; these should be ignored (i.e.,
+ // not reported to the update listener)
+ List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+ new ConsumerRecord<>(TOPIC, 0, 0, 0L,
TimestampType.CREATE_TIME, 0, 0, "logger-cluster-" + logger1,
+ CONFIGS_SERIALIZED.get(0), new RecordHeaders(),
Optional.empty()
+ ),
+ new ConsumerRecord<>(TOPIC, 0, 1, 0L,
TimestampType.CREATE_TIME, 0, 0, "logger-cluster-" + logger2,
+ CONFIGS_SERIALIZED.get(1), new RecordHeaders(),
Optional.empty()
+ )
+ );
+ LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
+ deserialized.put(CONFIGS_SERIALIZED.get(0), existingLogLevel);
+ // Make sure we gracefully handle tombstones
+ deserialized.put(CONFIGS_SERIALIZED.get(1), null);
+ logOffset = 2;
+
+ expectStart(existingRecords, deserialized);
+ when(configLog.partitionCount()).thenReturn(1);
+
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+
+ configStorage.start();
+ verify(configLog).start();
+
+ expectConvertWriteRead(
+ "logger-cluster-" + logger3,
KafkaConfigBackingStore.LOGGER_LEVEL_V0, CONFIGS_SERIALIZED.get(2),
+ "level", level3);
+ configStorage.putLoggerLevel(logger3, level3);
+
+ expectConvertWriteRead(
+ "logger-cluster-" + logger4,
KafkaConfigBackingStore.LOGGER_LEVEL_V0, CONFIGS_SERIALIZED.get(3),
+ "level", level4);
+ configStorage.putLoggerLevel(logger4, level4);
+
+ LinkedHashMap<String, byte[]> newRecords = new LinkedHashMap<>();
+ newRecords.put("logger-cluster-" + logger3, CONFIGS_SERIALIZED.get(2));
+ newRecords.put("logger-cluster-" + logger4, CONFIGS_SERIALIZED.get(3));
+ doAnswer(expectReadToEnd(newRecords)).when(configLog).readToEnd();
+
+ configStorage.refresh(0, TimeUnit.SECONDS);
+ verify(configUpdateListener).onLoggingLevelUpdate(logger3, level3);
+ verify(configUpdateListener).onLoggingLevelUpdate(logger4, level4);
+
+ configStorage.stop();
+ verify(configLog).stop();
+ }
+
+ @Test
+ public void testTaskCountRecordsAndGenerations() throws Exception {
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+ configStorage.start();
+ verify(configLog).start();
+
+ // Records to be read by consumer as it reads to the end of the log
+ LinkedHashMap<String, byte[]> serializedConfigs = new
LinkedHashMap<>();
+ serializedConfigs.put(TASK_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(0));
+ serializedConfigs.put(TASK_CONFIG_KEYS.get(1),
CONFIGS_SERIALIZED.get(1));
+ serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(2));
+
+
+ // Records to be read by consumer as it reads to the end of the log
+ doAnswer(expectReadToEnd(new LinkedHashMap<>()))
+ .doAnswer(expectReadToEnd(new LinkedHashMap<>()))
+ .doAnswer(expectReadToEnd(serializedConfigs))
+ .doAnswer(expectReadToEnd(new LinkedHashMap<String, byte[]>()
{{
+ put(CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0),
CONFIGS_SERIALIZED.get(3));
+ }})
+ )
+ .when(configLog).readToEnd();
+
+ // Task configs should read to end, write to the log, read to end,
write root, then read to end again
+ expectConvertWriteRead2(TASK_CONFIG_KEYS.get(0),
KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
+ new
Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0).put("properties",
SAMPLE_CONFIGS.get(0)));
+
+ expectConvertWriteRead2(
+ TASK_CONFIG_KEYS.get(1),
KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1),
+ new
Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0).put("properties",
SAMPLE_CONFIGS.get(1)));
+ expectConvertWriteRead2(
+ COMMIT_TASKS_CONFIG_KEYS.get(0),
KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2),
+ new
Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 2));
+ // Starts with 0 tasks, after update has 2
+
+ // Task count records are read back after writing as well
+ expectConvertWriteRead2(
+ CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0),
KafkaConfigBackingStore.TASK_COUNT_RECORD_V0, CONFIGS_SERIALIZED.get(3),
+ new
Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0).put("task-count", 4));
+
+ addConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0),
Collections.emptyList());
+
+ // Before anything is written
+ String connectorName = CONNECTOR_IDS.get(0);
+ ClusterConfigState configState = configStorage.snapshot();
+ assertFalse(configState.pendingFencing(connectorName));
+ assertNull(configState.taskCountRecord(connectorName));
+ assertNull(configState.taskConfigGeneration(connectorName));
+
+ // Writing task configs should block until all the writes have been
performed and the root record update
+ // has completed
+ List<Map<String, String>> taskConfigs =
Arrays.asList(SAMPLE_CONFIGS.get(0), SAMPLE_CONFIGS.get(1));
+ configStorage.putTaskConfigs("connector1", taskConfigs);
+
+ configState = configStorage.snapshot();
+ assertEquals(3, configState.offset());
+ assertTrue(configState.pendingFencing(connectorName));
+ assertNull(configState.taskCountRecord(connectorName));
+ assertEquals(0, (long)
configState.taskConfigGeneration(connectorName));
+
+ configStorage.putTaskCountRecord(connectorName, 4);
+
+ configState = configStorage.snapshot();
+ assertEquals(4, configState.offset());
+ assertFalse(configState.pendingFencing(connectorName));
+ assertEquals(4, (long) configState.taskCountRecord(connectorName));
+ assertEquals(0, (long)
configState.taskConfigGeneration(connectorName));
+
+ // As soon as root is rewritten, we should see a callback notifying us
that we reconfigured some tasks
+
verify(configUpdateListener).onTaskConfigUpdate(Arrays.asList(TASK_IDS.get(0),
TASK_IDS.get(1)));
+
+ configStorage.stop();
+ verify(configLog).stop();
+ }
+
+ @Test
+ public void testPutTaskConfigs() throws Exception {
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+ configStorage.start();
+ verify(configLog).start();
+
+ doAnswer(expectReadToEnd(new LinkedHashMap<>()))
+ .doAnswer(expectReadToEnd(new LinkedHashMap<>()))
+ .doAnswer(expectReadToEnd(new LinkedHashMap<String, byte[]>()
{{
+ put(TASK_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(0));
+ put(TASK_CONFIG_KEYS.get(1),
CONFIGS_SERIALIZED.get(1));
+ put(COMMIT_TASKS_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(2));
+ }})
+ )
+ .when(configLog).readToEnd();
+
+ // Task configs should read to end, write to the log, read to end,
write root, then read to end again
+ expectConvertWriteRead2(
+ TASK_CONFIG_KEYS.get(0),
KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
+ new
Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0).put("properties",
SAMPLE_CONFIGS.get(0)));
+ expectConvertWriteRead2(
+ TASK_CONFIG_KEYS.get(1),
KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1),
+ new
Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0).put("properties",
SAMPLE_CONFIGS.get(1)));
+ expectConvertWriteRead2(
+ COMMIT_TASKS_CONFIG_KEYS.get(0),
KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2),
+ new
Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 2)); //
Starts with 0 tasks, after update has 2
+ // Bootstrap as if we had already added the connector, but no tasks
had been added yet
+ addConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0),
Collections.emptyList());
+
+ // Null before writing
+ ClusterConfigState configState = configStorage.snapshot();
+ assertEquals(-1, configState.offset());
+ assertNull(configState.taskConfig(TASK_IDS.get(0)));
+ assertNull(configState.taskConfig(TASK_IDS.get(1)));
+
+ // Writing task configs should block until all the writes have been
performed and the root record update
+ // has completed
+ List<Map<String, String>> taskConfigs =
Arrays.asList(SAMPLE_CONFIGS.get(0), SAMPLE_CONFIGS.get(1));
+ configStorage.putTaskConfigs("connector1", taskConfigs);
+
+ // Validate root config by listing all connectors and tasks
+ configState = configStorage.snapshot();
+ assertEquals(3, configState.offset());
+ String connectorName = CONNECTOR_IDS.get(0);
+ assertEquals(Collections.singletonList(connectorName), new
ArrayList<>(configState.connectors()));
+ assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)),
configState.tasks(connectorName));
+ assertEquals(SAMPLE_CONFIGS.get(0),
configState.taskConfig(TASK_IDS.get(0)));
+ assertEquals(SAMPLE_CONFIGS.get(1),
configState.taskConfig(TASK_IDS.get(1)));
+ assertEquals(Collections.EMPTY_SET,
configState.inconsistentConnectors());
+
+ // As soon as root is rewritten, we should see a callback notifying us
that we reconfigured some tasks
+
verify(configUpdateListener).onTaskConfigUpdate(Arrays.asList(TASK_IDS.get(0),
TASK_IDS.get(1)));
+
+ configStorage.stop();
+ verify(configLog).stop();
+ }
+
+ @Test
+ public void testPutTaskConfigsStartsOnlyReconfiguredTasks() throws
Exception {
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+ configStorage.start();
+ verify(configLog).start();
+
+ doAnswer(expectReadToEnd(new LinkedHashMap<>()))
+ .doAnswer(expectReadToEnd(new LinkedHashMap<>()))
+ .doAnswer(expectReadToEnd(new LinkedHashMap<String, byte[]>()
{{
+ put(TASK_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(0));
+ put(TASK_CONFIG_KEYS.get(1),
CONFIGS_SERIALIZED.get(1));
+ put(COMMIT_TASKS_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(2));
+ }})
+ )
+ .doAnswer(expectReadToEnd(new LinkedHashMap<>()))
+ .doAnswer(expectReadToEnd(new LinkedHashMap<>()))
+ .doAnswer(expectReadToEnd(new LinkedHashMap<String, byte[]>()
{{
+ put(TASK_CONFIG_KEYS.get(2),
CONFIGS_SERIALIZED.get(3));
+ put(COMMIT_TASKS_CONFIG_KEYS.get(1),
CONFIGS_SERIALIZED.get(4));
+ }})
+ )
+ .when(configLog).readToEnd();
+
+ expectConvertWriteRead2(
+ TASK_CONFIG_KEYS.get(0),
KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
+ new
Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0).put("properties",
SAMPLE_CONFIGS.get(0)));
+ expectConvertWriteRead2(
+ TASK_CONFIG_KEYS.get(1),
KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1),
+ new
Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0).put("properties",
SAMPLE_CONFIGS.get(1)));
+ expectConvertWriteRead2(
+ COMMIT_TASKS_CONFIG_KEYS.get(0),
KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2),
+ new
Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 2)); //
Starts with 0 tasks, after update has 2
+
+ // Null before writing
+ ClusterConfigState configState = configStorage.snapshot();
+ assertEquals(-1, configState.offset());
+ assertNull(configState.taskConfig(TASK_IDS.get(0)));
+ assertNull(configState.taskConfig(TASK_IDS.get(1)));
+
+ // Bootstrap as if we had already added the connector, but no tasks
had been added yet
+ addConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0),
Collections.emptyList());
+ List<Map<String, String>> taskConfigs =
Arrays.asList(SAMPLE_CONFIGS.get(0), SAMPLE_CONFIGS.get(1));
+ configStorage.putTaskConfigs("connector1", taskConfigs);
+
+ expectConvertWriteRead2(
+ TASK_CONFIG_KEYS.get(2),
KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(3),
+ new
Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0).put("properties",
SAMPLE_CONFIGS.get(2)));
+ expectConvertWriteRead2(
+ COMMIT_TASKS_CONFIG_KEYS.get(1),
KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(4),
+ new
Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 1)); //
Starts with 2 tasks, after update has 3
+
+ addConnector(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(1),
Collections.emptyList());
+ taskConfigs = Collections.singletonList(SAMPLE_CONFIGS.get(2));
+ configStorage.putTaskConfigs("connector2", taskConfigs);
+
+ // Validate root config by listing all connectors and tasks
+ configState = configStorage.snapshot();
+ assertEquals(5, configState.offset());
+ String connectorName1 = CONNECTOR_IDS.get(0);
+ String connectorName2 = CONNECTOR_IDS.get(1);
+ assertEquals(Arrays.asList(connectorName1, connectorName2), new
ArrayList<>(configState.connectors()));
+ assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)),
configState.tasks(connectorName1));
+ assertEquals(Collections.singletonList(TASK_IDS.get(2)),
configState.tasks(connectorName2));
+ assertEquals(SAMPLE_CONFIGS.get(0),
configState.taskConfig(TASK_IDS.get(0)));
+ assertEquals(SAMPLE_CONFIGS.get(1),
configState.taskConfig(TASK_IDS.get(1)));
+ assertEquals(SAMPLE_CONFIGS.get(2),
configState.taskConfig(TASK_IDS.get(2)));
+ assertEquals(Collections.EMPTY_SET,
configState.inconsistentConnectors());
+
+ // As soon as root is rewritten, we should see a callback notifying us
that we reconfigured some tasks
+ final ArgumentCaptor<Collection<ConnectorTaskId>> capturedRecord =
ArgumentCaptor.captor();
Review Comment:
yes you are right, I got errors when I tried to test like this, and the
error reason is that the invoke times is more than 2, thus I switch it to use
`verify(configUpdateListener, times)` and it passed. I think there was some
mocking issue before, I tried to test with the expected arguments again and it
passed this time.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]