chia7712 commented on code in PR #16126:
URL: https://github.com/apache/kafka/pull/16126#discussion_r1626542796
##########
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java:
##########
@@ -194,121 +234,1293 @@ public void setUp() {
}
@Test
- public void testTaskCountRecordsAndGenerations() throws Exception {
- expectConfigure();
- expectStart(Collections.emptyList(), Collections.emptyMap());
+ public void testStartStop() {
+ props.put("config.storage.min.insync.replicas", "3");
+ props.put("config.storage.max.message.bytes", "1001");
+ createStore();
- // Task configs should read to end, write to the log, read to end,
write root, then read to end again
- expectReadToEnd(new LinkedHashMap<>());
+ when(configLog.partitionCount()).thenReturn(1);
+
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+
+ verifyConfigure();
+ assertEquals(TOPIC, capturedTopic.getValue());
+ assertEquals("org.apache.kafka.common.serialization.StringSerializer",
capturedProducerProps.getValue().get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
+
assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer",
capturedProducerProps.getValue().get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
+
assertEquals("org.apache.kafka.common.serialization.StringDeserializer",
capturedConsumerProps.getValue().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
+
assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer",
capturedConsumerProps.getValue().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
+
+ assertEquals(TOPIC, capturedNewTopic.getValue().name());
+ assertEquals(1, capturedNewTopic.getValue().numPartitions());
+ assertEquals(TOPIC_REPLICATION_FACTOR,
capturedNewTopic.getValue().replicationFactor());
+ assertEquals("3",
capturedNewTopic.getValue().configs().get("min.insync.replicas"));
+ assertEquals("1001",
capturedNewTopic.getValue().configs().get("max.message.bytes"));
+
+ configStorage.start();
+ configStorage.stop();
+
+ verify(configLog).start();
+ verify(configLog).stop();
+ }
+
+ @Test
+ public void testSnapshotCannotMutateInternalState() {
+ props.put("config.storage.min.insync.replicas", "3");
+ props.put("config.storage.max.message.bytes", "1001");
+ createStore();
+
+ when(configLog.partitionCount()).thenReturn(1);
+
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+
+ configStorage.start();
+ ClusterConfigState snapshot = configStorage.snapshot();
+ assertNotSame(snapshot.connectorTaskCounts,
configStorage.connectorTaskCounts);
+ assertNotSame(snapshot.connectorConfigs,
configStorage.connectorConfigs);
+ assertNotSame(snapshot.connectorTargetStates,
configStorage.connectorTargetStates);
+ assertNotSame(snapshot.taskConfigs, configStorage.taskConfigs);
+ assertNotSame(snapshot.connectorTaskCountRecords,
configStorage.connectorTaskCountRecords);
+ assertNotSame(snapshot.connectorTaskConfigGenerations,
configStorage.connectorTaskConfigGenerations);
+ assertNotSame(snapshot.connectorsPendingFencing,
configStorage.connectorsPendingFencing);
+ assertNotSame(snapshot.inconsistentConnectors,
configStorage.inconsistent);
+ }
+
+ @Test
+ public void testPutConnectorConfig() throws Exception {
+ when(configLog.partitionCount()).thenReturn(1);
+
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+ configStorage.start();
+
+ // Null before writing
+ ClusterConfigState configState = configStorage.snapshot();
+ assertEquals(-1, configState.offset());
+ assertNull(configState.connectorConfig(CONNECTOR_IDS.get(0)));
+ assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1)));
+
+ String configKey = CONNECTOR_CONFIG_KEYS.get(1);
+ String targetStateKey = TARGET_STATE_KEYS.get(1);
+
+
doAnswer(expectReadToEnd(Collections.singletonMap(CONNECTOR_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(0))))
+
.doAnswer(expectReadToEnd(Collections.singletonMap(CONNECTOR_CONFIG_KEYS.get(1),
CONFIGS_SERIALIZED.get(1))))
+ // Config deletion
+ .doAnswer(expectReadToEnd(new LinkedHashMap<String, byte[]>()
{{
+ put(configKey, null);
+ put(targetStateKey, null);
+ }})
+ ).when(configLog).readToEnd();
+
+ // Writing should block until it is written and read back from Kafka
expectConvertWriteRead(
- TASK_CONFIG_KEYS.get(0),
KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
+ CONNECTOR_CONFIG_KEYS.get(0),
KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
"properties", SAMPLE_CONFIGS.get(0));
+
+ configStorage.putConnectorConfig(CONNECTOR_IDS.get(0),
SAMPLE_CONFIGS.get(0), null);
+ configState = configStorage.snapshot();
+
+ assertEquals(1, configState.offset());
+ assertEquals(SAMPLE_CONFIGS.get(0),
configState.connectorConfig(CONNECTOR_IDS.get(0)));
+ assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1)));
+
verify(configUpdateListener).onConnectorConfigUpdate(CONNECTOR_IDS.get(0));
+
+ // Second should also block and all configs should still be available
expectConvertWriteRead(
- TASK_CONFIG_KEYS.get(1),
KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1),
+ CONNECTOR_CONFIG_KEYS.get(1),
KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1),
"properties", SAMPLE_CONFIGS.get(1));
- expectReadToEnd(new LinkedHashMap<>());
+
+ configStorage.putConnectorConfig(CONNECTOR_IDS.get(1),
SAMPLE_CONFIGS.get(1), null);
+ configState = configStorage.snapshot();
+
+ assertEquals(2, configState.offset());
+ assertEquals(SAMPLE_CONFIGS.get(0),
configState.connectorConfig(CONNECTOR_IDS.get(0)));
+ assertEquals(SAMPLE_CONFIGS.get(1),
configState.connectorConfig(CONNECTOR_IDS.get(1)));
+
verify(configUpdateListener).onConnectorConfigUpdate(CONNECTOR_IDS.get(1));
+
+ // Config deletion
+ when(producerFuture.get(anyLong(),
any(TimeUnit.class))).thenReturn(null);
+ when(converter.toConnectData(TOPIC, null)).thenReturn(new
SchemaAndValue(null, null));
+
when(configLog.sendWithReceipt(AdditionalMatchers.or(Mockito.eq(configKey),
Mockito.eq(targetStateKey)),
+ Mockito.isNull())).thenReturn(producerFuture);
+
+ // Deletion should remove the second one we added
+ configStorage.removeConnectorConfig(CONNECTOR_IDS.get(1));
+ configState = configStorage.snapshot();
+
+ assertEquals(4, configState.offset());
+ assertEquals(SAMPLE_CONFIGS.get(0),
configState.connectorConfig(CONNECTOR_IDS.get(0)));
+ assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1)));
+ assertNull(configState.targetState(CONNECTOR_IDS.get(1)));
+
verify(configUpdateListener).onConnectorConfigRemove(CONNECTOR_IDS.get(1));
+
+ configStorage.stop();
+ verify(configLog).stop();
+ }
+
+ @Test
+ public void testPutConnectorConfigWithTargetState() throws Exception {
+ when(configLog.partitionCount()).thenReturn(1);
+
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+ configStorage.start();
+
+ // Null before writing
+ ClusterConfigState configState = configStorage.snapshot();
+ assertEquals(-1, configState.offset());
+ assertNull(configState.connectorConfig(CONNECTOR_IDS.get(0)));
+ assertNull(configState.targetState(CONNECTOR_IDS.get(0)));
+
+ doAnswer(expectReadToEnd(new LinkedHashMap<String, byte[]>() {{
+ put(TARGET_STATE_KEYS.get(0),
TARGET_STATES_SERIALIZED.get(2));
+ put(CONNECTOR_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(0));
+ }})
+ ).when(configLog).readToEnd();
+
+ // We expect to write the target state first, followed by the config
write and then a read to end
expectConvertWriteRead(
- COMMIT_TASKS_CONFIG_KEYS.get(0),
KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2),
- "tasks", 2); // Starts with 0 tasks, after update has 2
- // As soon as root is rewritten, we should see a callback notifying us
that we reconfigured some tasks
- configUpdateListener.onTaskConfigUpdate(Arrays.asList(TASK_IDS.get(0),
TASK_IDS.get(1)));
- EasyMock.expectLastCall();
+ TARGET_STATE_KEYS.get(0),
KafkaConfigBackingStore.TARGET_STATE_V1, TARGET_STATES_SERIALIZED.get(2),
+ "state.v2", TargetState.STOPPED.name());
+
+ expectConvertWriteRead(
+ CONNECTOR_CONFIG_KEYS.get(0),
KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
+ "properties", SAMPLE_CONFIGS.get(0));
+
+ // Writing should block until it is written and read back from Kafka
+ configStorage.putConnectorConfig(CONNECTOR_IDS.get(0),
SAMPLE_CONFIGS.get(0), TargetState.STOPPED);
+ configState = configStorage.snapshot();
+ assertEquals(2, configState.offset());
+ assertEquals(TargetState.STOPPED,
configState.targetState(CONNECTOR_IDS.get(0)));
+ assertEquals(SAMPLE_CONFIGS.get(0),
configState.connectorConfig(CONNECTOR_IDS.get(0)));
+
+ // We don't expect the config update listener's
onConnectorTargetStateChange hook to be invoked
+ verify(configUpdateListener,
never()).onConnectorTargetStateChange(anyString());
+
+
verify(configUpdateListener).onConnectorConfigUpdate(CONNECTOR_IDS.get(0));
+
+ configStorage.stop();
+ verify(configLog).stop();
+ }
+
+ @Test
+ public void testPutConnectorConfigProducerError() throws Exception {
+ when(configLog.partitionCount()).thenReturn(1);
+
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+ configStorage.start();
+
+ when(converter.fromConnectData(TOPIC,
KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0,
CONNECTOR_CONFIG_STRUCTS.get(0)))
+ .thenReturn(CONFIGS_SERIALIZED.get(0));
+ when(configLog.sendWithReceipt(anyString(),
any(byte[].class))).thenReturn(producerFuture);
+
+ // Verify initial state
+ ClusterConfigState configState = configStorage.snapshot();
+ assertEquals(-1, configState.offset());
+ assertEquals(0, configState.connectors().size());
+
+ Exception thrownException = new ExecutionException(new
TopicAuthorizationException(Collections.singleton("test")));
+ when(producerFuture.get(anyLong(),
any(TimeUnit.class))).thenThrow(thrownException);
+
+ // verify that the producer exception from KafkaBasedLog::send is
propagated
+ ConnectException e = assertThrows(ConnectException.class, () ->
configStorage.putConnectorConfig(CONNECTOR_IDS.get(0),
+ SAMPLE_CONFIGS.get(0), null));
+ assertTrue(e.getMessage().contains("Error writing connector
configuration to Kafka"));
+ assertEquals(thrownException, e.getCause());
+
+ configStorage.stop();
+ verify(configLog).stop();
+ }
+
+ @Test
+ public void testRemoveConnectorConfigSlowProducer() throws Exception {
+ when(configLog.partitionCount()).thenReturn(1);
+
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+ configStorage.start();
+
+ @SuppressWarnings("unchecked")
+ Future<RecordMetadata> connectorConfigProducerFuture =
mock(Future.class);
+
+ @SuppressWarnings("unchecked")
+ Future<RecordMetadata> targetStateProducerFuture = mock(Future.class);
+
+ when(configLog.sendWithReceipt(anyString(), isNull()))
+ // tombstone for the connector config
+ .thenReturn(connectorConfigProducerFuture)
+ // tombstone for the connector target state
+ .thenReturn(targetStateProducerFuture);
+
+
when(connectorConfigProducerFuture.get(eq(READ_WRITE_TOTAL_TIMEOUT_MS),
any(TimeUnit.class)))
+ .thenAnswer((Answer<RecordMetadata>) invocation -> {
+ time.sleep(READ_WRITE_TOTAL_TIMEOUT_MS - 1000);
+ return null;
+ });
+
+ // the future get timeout is expected to be reduced according to how
long the previous Future::get took
+ when(targetStateProducerFuture.get(eq(1000L), any(TimeUnit.class)))
+ .thenAnswer((Answer<RecordMetadata>) invocation -> {
+ time.sleep(1000);
+ return null;
+ });
+
+ @SuppressWarnings("unchecked")
+ Future<Void> future = mock(Future.class);
+ when(configLog.readToEnd()).thenReturn(future);
+
+ // the Future::get calls on the previous two producer futures
exhausted the overall timeout; so expect the
+ // timeout on the log read future to be 0
+ when(future.get(eq(0L), any(TimeUnit.class))).thenReturn(null);
+
+ configStorage.removeConnectorConfig("test-connector");
+ configStorage.stop();
+ verify(configLog).stop();
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testWritePrivileges() throws Exception {
+ // With exactly.once.source.support = preparing (or also, "enabled"),
we need to use a transactional producer
+ // to write some types of messages to the config topic
+ props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "preparing");
+ createStore();
+
+ when(configLog.partitionCount()).thenReturn(1);
+
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+ configStorage.start();
+
+ // Try and fail to write a task count record to the config topic
without write privileges
+ when(converter.fromConnectData(TOPIC,
KafkaConfigBackingStore.TASK_COUNT_RECORD_V0,
CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(0)))
+ .thenReturn(CONFIGS_SERIALIZED.get(0));
+
+ // Should fail the first time since we haven't claimed write privileges
+ assertThrows(IllegalStateException.class, () ->
configStorage.putTaskCountRecord(CONNECTOR_IDS.get(0), 6));
+
+ // Claim write privileges
+
doReturn(fencableProducer).when(configStorage).createFencableProducer();
+ // And write the task count record successfully
+
when(fencableProducer.send(any(ProducerRecord.class))).thenReturn(null);
+
doAnswer(expectReadToEnd(Collections.singletonMap(CONNECTOR_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(0))))
+
.doAnswer(expectReadToEnd(Collections.singletonMap(CONNECTOR_CONFIG_KEYS.get(1),
CONFIGS_SERIALIZED.get(2))))
+ .when(configLog).readToEnd();
+ when(converter.toConnectData(TOPIC, CONFIGS_SERIALIZED.get(0)))
+ .thenReturn(new SchemaAndValue(null,
structToMap(CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(0))));
+
+ // Should succeed now
+ configStorage.claimWritePrivileges();
+ configStorage.putTaskCountRecord(CONNECTOR_IDS.get(0), 6);
+
+ verify(fencableProducer).beginTransaction();
+ verify(fencableProducer).commitTransaction();
+
+ // Try to write a connector config
+ when(converter.fromConnectData(TOPIC,
KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0,
CONNECTOR_CONFIG_STRUCTS.get(0)))
+ .thenReturn(CONFIGS_SERIALIZED.get(1));
+ // Get fenced out
+ doThrow(new ProducerFencedException("Better luck next time"))
+ .doNothing()
+ .when(fencableProducer).commitTransaction();
+
+ // Should fail again when we get fenced out
+ assertThrows(PrivilegedWriteException.class, () ->
configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(0),
null));
+
+ verify(fencableProducer, times(2)).beginTransaction();
+ verify(fencableProducer).close(Duration.ZERO);
+
+ // Should fail if we retry without reclaiming write privileges
+ assertThrows(IllegalStateException.class, () ->
configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(0),
null));
+
+ // In the meantime, write a target state (which doesn't require write
privileges)
+ when(converter.fromConnectData(TOPIC,
KafkaConfigBackingStore.TARGET_STATE_V1, TARGET_STATE_PAUSED))
+ .thenReturn(CONFIGS_SERIALIZED.get(1));
+ when(configLog.sendWithReceipt("target-state-" + CONNECTOR_IDS.get(1),
CONFIGS_SERIALIZED.get(1)))
+ .thenReturn(producerFuture);
+ when(producerFuture.get(anyLong(),
any(TimeUnit.class))).thenReturn(null);
+
+ // Should succeed even without write privileges (target states can be
written by anyone)
+ configStorage.putTargetState(CONNECTOR_IDS.get(1), TargetState.PAUSED);
+
+ // Reclaim write privileges and successfully write the config
+ when(converter.toConnectData(TOPIC, CONFIGS_SERIALIZED.get(2)))
+ .thenReturn(new SchemaAndValue(null,
structToMap(CONNECTOR_CONFIG_STRUCTS.get(0))));
+
+ // Should succeed if we re-claim write privileges
+ configStorage.claimWritePrivileges();
+ configStorage.putConnectorConfig(CONNECTOR_IDS.get(1),
SAMPLE_CONFIGS.get(0), null);
+
+ verify(fencableProducer, times(3)).beginTransaction();
+ verify(fencableProducer, times(3)).commitTransaction();
+
verify(configUpdateListener).onConnectorConfigUpdate(CONNECTOR_IDS.get(1));
+
+ configStorage.stop();
+ verify(configLog).stop();
+ verify(configStorage, times(2)).createFencableProducer();
+ verify(fencableProducer, times(2)).close(Duration.ZERO);
+ }
+
+ @Test
+ public void testRestoreTargetStateUnexpectedDeletion() {
+ List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+ new ConsumerRecord<>(TOPIC, 0, 0, 0L,
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(0), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 1, 0L,
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(1), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 2, 0L,
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
+ CONFIGS_SERIALIZED.get(2), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 3, 0L,
TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(3), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 4, 0L,
TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(4), new RecordHeaders(),
Optional.empty()));
+ LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
+ deserialized.put(CONFIGS_SERIALIZED.get(0),
CONNECTOR_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(1),
TASK_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(2),
TASK_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(3), null);
+ deserialized.put(CONFIGS_SERIALIZED.get(4),
TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+ logOffset = 5;
+
+ expectStart(existingRecords, deserialized);
+ when(configLog.partitionCount()).thenReturn(1);
+
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+ configStorage.start();
+
+ // The target state deletion should reset the state to STARTED
+ ClusterConfigState configState = configStorage.snapshot();
+ assertEquals(5, configState.offset()); // Should always be next to be
read, even if uncommitted
+ assertEquals(Collections.singletonList(CONNECTOR_IDS.get(0)), new
ArrayList<>(configState.connectors()));
+ assertEquals(TargetState.STARTED,
configState.targetState(CONNECTOR_IDS.get(0)));
+
+ configStorage.stop();
+ verify(configLog).stop();
+ }
+
+ @Test
+ public void testRestoreTargetState() {
+ List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+ new ConsumerRecord<>(TOPIC, 0, 0, 0L,
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(0), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 1, 0L,
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(1), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 2, 0L,
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
+ CONFIGS_SERIALIZED.get(2), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 3, 0L,
TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(3), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 4, 0L,
TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(4), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 5, 0L,
TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(1),
+ CONFIGS_SERIALIZED.get(5), new RecordHeaders(),
Optional.empty()));
+ LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
+ deserialized.put(CONFIGS_SERIALIZED.get(0),
CONNECTOR_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(1),
TASK_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(2),
TASK_CONFIG_STRUCTS.get(0));
+ // A worker running an older version wrote this target state; make
sure we can handle it correctly
+ deserialized.put(CONFIGS_SERIALIZED.get(3),
TARGET_STATE_PAUSED_LEGACY);
+ deserialized.put(CONFIGS_SERIALIZED.get(4),
TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+ deserialized.put(CONFIGS_SERIALIZED.get(5), TARGET_STATE_STOPPED);
+ logOffset = 6;
+
+ expectStart(existingRecords, deserialized);
+
+ // Shouldn't see any callbacks since this is during startup
+ when(configLog.partitionCount()).thenReturn(1);
+
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+ configStorage.start();
+
+ // Should see a single connector with initial state paused
+ ClusterConfigState configState = configStorage.snapshot();
+ assertEquals(6, configState.offset()); // Should always be next to be
read, even if uncommitted
+ assertEquals(Collections.singletonList(CONNECTOR_IDS.get(0)), new
ArrayList<>(configState.connectors()));
+ assertEquals(TargetState.PAUSED,
configState.targetState(CONNECTOR_IDS.get(0)));
+ assertEquals(TargetState.STOPPED,
configState.targetState(CONNECTOR_IDS.get(1)));
+
+ configStorage.stop();
+ verify(configLog).stop();
+ }
+
+ @Test
+ public void testRestore() {
+ // Restoring data should notify only of the latest values after
loading is complete. This also validates
+ // that inconsistent state is ignored.
+
+ // Overwrite each type at least once to ensure we see the latest data
after loading
+ List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+ new ConsumerRecord<>(TOPIC, 0, 0, 0L,
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(0), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 1, 0L,
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(1), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 2, 0L,
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(2), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 3, 0L,
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
+ CONFIGS_SERIALIZED.get(3), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 4, 0L,
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(4), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 5, 0L,
TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(5), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 6, 0L,
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_TASK_COUNT_RECORD_KEYS.get(1),
+ CONFIGS_SERIALIZED.get(6), new RecordHeaders(),
Optional.empty()),
+ // Connector after root update should make it through, task
update shouldn't
+ new ConsumerRecord<>(TOPIC, 0, 7, 0L,
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(7), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 8, 0L,
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(8), new RecordHeaders(),
Optional.empty()));
+ LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
+ deserialized.put(CONFIGS_SERIALIZED.get(0),
CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(1),
CONNECTOR_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(2),
TASK_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(3),
TASK_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(4),
CONNECTOR_CONFIG_STRUCTS.get(1));
+ deserialized.put(CONFIGS_SERIALIZED.get(5),
TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+ deserialized.put(CONFIGS_SERIALIZED.get(6),
CONNECTOR_TASK_COUNT_RECORD_STRUCTS.get(1));
+ deserialized.put(CONFIGS_SERIALIZED.get(7),
CONNECTOR_CONFIG_STRUCTS.get(2));
+ deserialized.put(CONFIGS_SERIALIZED.get(8),
TASK_CONFIG_STRUCTS.get(1));
+ logOffset = 9;
+
+ expectStart(existingRecords, deserialized);
+ when(configLog.partitionCount()).thenReturn(1);
+
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+ configStorage.start();
+
+ // Should see a single connector and its config should be the last one
seen anywhere in the log
+ ClusterConfigState configState = configStorage.snapshot();
+ assertEquals(logOffset, configState.offset()); // Should always be
next to be read, even if uncommitted
+ assertEquals(Collections.singletonList(CONNECTOR_IDS.get(0)), new
ArrayList<>(configState.connectors()));
+ assertEquals(TargetState.STARTED,
configState.targetState(CONNECTOR_IDS.get(0)));
+ // CONNECTOR_CONFIG_STRUCTS[2] -> SAMPLE_CONFIGS[2]
+ assertEquals(SAMPLE_CONFIGS.get(2),
configState.connectorConfig(CONNECTOR_IDS.get(0)));
+ // Should see 2 tasks for that connector. Only config updates before
the root key update should be reflected
+ assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)),
configState.tasks(CONNECTOR_IDS.get(0)));
+ // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0]
+ assertEquals(SAMPLE_CONFIGS.get(0),
configState.taskConfig(TASK_IDS.get(0)));
+ assertEquals(SAMPLE_CONFIGS.get(0),
configState.taskConfig(TASK_IDS.get(1)));
+ assertEquals(9, (int)
configState.taskCountRecord(CONNECTOR_IDS.get(1)));
+ assertEquals(Collections.EMPTY_SET,
configState.inconsistentConnectors());
+ assertEquals(Collections.singleton("connector1"),
configState.connectorsPendingFencing);
+
+ // Shouldn't see any callbacks since this is during startup
+ configStorage.stop();
+ verify(configLog).stop();
+ }
+
+ @Test
+ public void testRestoreConnectorDeletion() {
+ // Restoring data should notify only of the latest values after
loading is complete. This also validates
+ // that inconsistent state is ignored.
+
+ // Overwrite each type at least once to ensure we see the latest data
after loading
+ List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+ new ConsumerRecord<>(TOPIC, 0, 0, 0L,
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(0), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 1, 0L,
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(1), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 2, 0L,
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
+ CONFIGS_SERIALIZED.get(2), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 3, 0L,
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(3), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 4, 0L,
TimestampType.CREATE_TIME, 0, 0, TARGET_STATE_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(4), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 5, 0L,
TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(5), new RecordHeaders(),
Optional.empty()));
+
+ LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
+ deserialized.put(CONFIGS_SERIALIZED.get(0),
CONNECTOR_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(1),
TASK_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(2),
TASK_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(3), null);
+ deserialized.put(CONFIGS_SERIALIZED.get(4), null);
+ deserialized.put(CONFIGS_SERIALIZED.get(5),
TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+
+ logOffset = 6;
+ expectStart(existingRecords, deserialized);
+ when(configLog.partitionCount()).thenReturn(1);
+
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+ configStorage.start();
+
+ // Should see a single connector and its config should be the last one
seen anywhere in the log
+ ClusterConfigState configState = configStorage.snapshot();
+ assertEquals(6, configState.offset()); // Should always be next to be
read, even if uncommitted
+ assertTrue(configState.connectors().isEmpty());
+
+ // Shouldn't see any callbacks since this is during startup
+ configStorage.stop();
+ verify(configLog).stop();
+ }
+
+ @Test
+ public void testRestoreZeroTasks() {
+ // Restoring data should notify only of the latest values after
loading is complete. This also validates
+ // that inconsistent state is ignored.
+
+ // Overwrite each type at least once to ensure we see the latest data
after loading
+ List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+ new ConsumerRecord<>(TOPIC, 0, 0, 0L,
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(0), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 1, 0L,
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(1), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 2, 0L,
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
+ CONFIGS_SERIALIZED.get(2), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 3, 0L,
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(3), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 4, 0L,
TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(4), new RecordHeaders(),
Optional.empty()),
+ // Connector after root update should make it through, task
update shouldn't
+ new ConsumerRecord<>(TOPIC, 0, 5, 0L,
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(5), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 6, 0L,
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(6), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 7, 0L,
TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(7), new RecordHeaders(),
Optional.empty()));
+ LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
+ deserialized.put(CONFIGS_SERIALIZED.get(0),
CONNECTOR_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(1),
TASK_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(2),
TASK_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(3),
CONNECTOR_CONFIG_STRUCTS.get(1));
+ deserialized.put(CONFIGS_SERIALIZED.get(4),
TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+ deserialized.put(CONFIGS_SERIALIZED.get(5),
CONNECTOR_CONFIG_STRUCTS.get(2));
+ deserialized.put(CONFIGS_SERIALIZED.get(6),
TASK_CONFIG_STRUCTS.get(1));
+ deserialized.put(CONFIGS_SERIALIZED.get(7),
TASKS_COMMIT_STRUCT_ZERO_TASK_CONNECTOR);
+ logOffset = 8;
+ expectStart(existingRecords, deserialized);
+ when(configLog.partitionCount()).thenReturn(1);
+
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+ configStorage.start();
+
+ // Should see a single connector and its config should be the last one
seen anywhere in the log
+ ClusterConfigState configState = configStorage.snapshot();
+ assertEquals(8, configState.offset()); // Should always be next to be
read, even if uncommitted
+ assertEquals(Collections.singletonList(CONNECTOR_IDS.get(0)), new
ArrayList<>(configState.connectors()));
+ // CONNECTOR_CONFIG_STRUCTS[2] -> SAMPLE_CONFIGS[2]
+ assertEquals(SAMPLE_CONFIGS.get(2),
configState.connectorConfig(CONNECTOR_IDS.get(0)));
+ // Should see 0 tasks for that connector.
+ assertEquals(Collections.emptyList(),
configState.tasks(CONNECTOR_IDS.get(0)));
+ // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0]
+ assertEquals(Collections.EMPTY_SET,
configState.inconsistentConnectors());
+
+ // Shouldn't see any callbacks since this is during startup
+ configStorage.stop();
+ verify(configLog).stop();
+ }
+
+ @Test
+ public void testRecordToRestartRequest() {
+ ConsumerRecord<String, byte[]> record = new ConsumerRecord<>(TOPIC, 0,
0, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(0), new RecordHeaders(),
Optional.empty());
+ Struct struct = RESTART_REQUEST_STRUCTS.get(0);
+ SchemaAndValue schemaAndValue = new SchemaAndValue(struct.schema(),
structToMap(struct));
+ RestartRequest restartRequest =
configStorage.recordToRestartRequest(record, schemaAndValue);
+ assertEquals(CONNECTOR_1_NAME, restartRequest.connectorName());
+ assertEquals(struct.getBoolean(INCLUDE_TASKS_FIELD_NAME),
restartRequest.includeTasks());
+ assertEquals(struct.getBoolean(ONLY_FAILED_FIELD_NAME),
restartRequest.onlyFailed());
+ }
+
+ @Test
+ public void testRecordToRestartRequestOnlyFailedInconsistent() {
+ ConsumerRecord<String, byte[]> record = new ConsumerRecord<>(TOPIC, 0,
0, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(0), new RecordHeaders(),
Optional.empty());
+ Struct struct = ONLY_FAILED_MISSING_STRUCT;
+ SchemaAndValue schemaAndValue = new SchemaAndValue(struct.schema(),
structToMap(struct));
+ RestartRequest restartRequest =
configStorage.recordToRestartRequest(record, schemaAndValue);
+ assertEquals(CONNECTOR_1_NAME, restartRequest.connectorName());
+ assertEquals(struct.getBoolean(INCLUDE_TASKS_FIELD_NAME),
restartRequest.includeTasks());
+ assertFalse(restartRequest.onlyFailed());
+ }
+
+ @Test
+ public void testRecordToRestartRequestIncludeTasksInconsistent() {
+ ConsumerRecord<String, byte[]> record = new ConsumerRecord<>(TOPIC, 0,
0, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(0), new RecordHeaders(),
Optional.empty());
+ Struct struct = INCLUDE_TASKS_MISSING_STRUCT;
+ SchemaAndValue schemaAndValue = new SchemaAndValue(struct.schema(),
structToMap(struct));
+ RestartRequest restartRequest =
configStorage.recordToRestartRequest(record, schemaAndValue);
+ assertEquals(CONNECTOR_1_NAME, restartRequest.connectorName());
+ assertFalse(restartRequest.includeTasks());
+ assertEquals(struct.getBoolean(ONLY_FAILED_FIELD_NAME),
restartRequest.onlyFailed());
+ }
+
+ @Test
+ public void testFencableProducerPropertiesOverrideUserSuppliedValues() {
+ props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "preparing");
+ String groupId = "my-other-connect-cluster";
+ props.put(GROUP_ID_CONFIG, groupId);
+ props.put(TRANSACTIONAL_ID_CONFIG, "my-custom-transactional-id");
+ props.put(ENABLE_IDEMPOTENCE_CONFIG, "false");
+ createStore();
+
+ Map<String, Object> fencableProducerProperties =
configStorage.fencableProducerProps(config);
+ assertEquals("connect-cluster-" + groupId,
fencableProducerProperties.get(TRANSACTIONAL_ID_CONFIG));
+ assertEquals("true",
fencableProducerProperties.get(ENABLE_IDEMPOTENCE_CONFIG));
+ }
+
+ @Test
+ public void
testConsumerPropertiesDoNotOverrideUserSuppliedValuesWithoutExactlyOnceSourceEnabled()
{
+ props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "preparing");
+ props.put(ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_UNCOMMITTED.toString());
+ createStore();
+
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+
+ assertEquals(
+ IsolationLevel.READ_UNCOMMITTED.toString(),
+ capturedConsumerProps.getValue().get(ISOLATION_LEVEL_CONFIG)
+ );
+ }
+
+ @Test
+ public void testClientIds() {
+ props = new HashMap<>(DEFAULT_CONFIG_STORAGE_PROPS);
+ props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled");
+ createStore();
+
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+
+ Map<String, Object> fencableProducerProps =
configStorage.fencableProducerProps(config);
+
+ final String expectedClientId = CLIENT_ID_BASE + "configs";
+ assertEquals(expectedClientId,
capturedProducerProps.getValue().get(CLIENT_ID_CONFIG));
+ assertEquals(expectedClientId,
capturedConsumerProps.getValue().get(CLIENT_ID_CONFIG));
+ assertEquals(expectedClientId + "-leader",
fencableProducerProps.get(CLIENT_ID_CONFIG));
+ }
+
+ @Test
+ public void testExceptionOnStartWhenConfigTopicHasMultiplePartitions() {
+ when(configLog.partitionCount()).thenReturn(2);
+
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ ConfigException e = assertThrows(ConfigException.class, () ->
configStorage.start());
+ assertTrue(e.getMessage().contains("required to have a single
partition"));
+ }
+
+ @Test
+ public void testFencableProducerPropertiesInsertedByDefault() {
+ props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "preparing");
+ String groupId = "my-connect-cluster";
+ props.put(GROUP_ID_CONFIG, groupId);
+ props.remove(TRANSACTIONAL_ID_CONFIG);
+ props.remove(ENABLE_IDEMPOTENCE_CONFIG);
+ createStore();
+
+ Map<String, Object> fencableProducerProperties =
configStorage.fencableProducerProps(config);
+ assertEquals("connect-cluster-" + groupId,
fencableProducerProperties.get(TRANSACTIONAL_ID_CONFIG));
+ assertEquals("true",
fencableProducerProperties.get(ENABLE_IDEMPOTENCE_CONFIG));
+ }
+
+ @Test
+ public void
testConsumerPropertiesInsertedByDefaultWithExactlyOnceSourceEnabled() {
+ props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled");
+ props.remove(ISOLATION_LEVEL_CONFIG);
+ createStore();
+
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+
+ assertEquals(
+ IsolationLevel.READ_COMMITTED.toString(),
+ capturedConsumerProps.getValue().get(ISOLATION_LEVEL_CONFIG)
+ );
+ }
+
+ @Test
+ public void
testConsumerPropertiesOverrideUserSuppliedValuesWithExactlyOnceSourceEnabled() {
+ props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled");
+ props.put(ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_UNCOMMITTED.toString());
+ createStore();
+
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+
+ assertEquals(
+ IsolationLevel.READ_COMMITTED.toString(),
+ capturedConsumerProps.getValue().get(ISOLATION_LEVEL_CONFIG)
+ );
+ }
+
+ @Test
+ public void
testConsumerPropertiesNotInsertedByDefaultWithoutExactlyOnceSourceEnabled() {
+ props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "preparing");
+ props.remove(ISOLATION_LEVEL_CONFIG);
+ createStore();
+
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+
+
assertNull(capturedConsumerProps.getValue().get(ISOLATION_LEVEL_CONFIG));
+ }
+
+ @Test
+ public void testBackgroundConnectorDeletion() throws Exception {
+ // verify that we handle connector deletions correctly when they come
up through the log
+ List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+ new ConsumerRecord<>(TOPIC, 0, 0, 0L,
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(0), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 1, 0L,
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(1), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 2, 0L,
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
+ CONFIGS_SERIALIZED.get(2), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 3, 0L,
TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(3), new RecordHeaders(),
Optional.empty()));
+ LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
+ deserialized.put(CONFIGS_SERIALIZED.get(0),
CONNECTOR_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(1),
TASK_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(2),
TASK_CONFIG_STRUCTS.get(1));
+ deserialized.put(CONFIGS_SERIALIZED.get(3),
TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+ logOffset = 5;
+
+ expectStart(existingRecords, deserialized);
+ when(configLog.partitionCount()).thenReturn(1);
+
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+
+ configStorage.start();
+ verify(configLog).start();
+
+ // Should see a single connector with initial state paused
+ ClusterConfigState configState = configStorage.snapshot();
+ assertEquals(TargetState.STARTED,
configState.targetState(CONNECTOR_IDS.get(0)));
+ assertEquals(SAMPLE_CONFIGS.get(0),
configState.connectorConfig(CONNECTOR_IDS.get(0)));
+ assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(new
ConnectorTaskId(CONNECTOR_IDS.get(0), 0)));
+ assertEquals(SAMPLE_CONFIGS.get(1), configState.taskConfig(new
ConnectorTaskId(CONNECTOR_IDS.get(0), 1)));
+ assertEquals(2, configState.taskCount(CONNECTOR_IDS.get(0)));
+
+ LinkedHashMap<String, byte[]> serializedData = new LinkedHashMap<>();
+ serializedData.put(CONNECTOR_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(0));
+ serializedData.put(TARGET_STATE_KEYS.get(0),
CONFIGS_SERIALIZED.get(1));
+ doAnswer(expectReadToEnd(serializedData)).when(configLog).readToEnd();
+
+ Map<String, Struct> deserializedData = new HashMap<>();
+ deserializedData.put(CONNECTOR_CONFIG_KEYS.get(0), null);
+ deserializedData.put(TARGET_STATE_KEYS.get(0), null);
+ expectRead(serializedData, deserializedData);
+
+ configStorage.refresh(0, TimeUnit.SECONDS);
+
verify(configUpdateListener).onConnectorConfigRemove(CONNECTOR_IDS.get(0));
+
+ configState = configStorage.snapshot();
+ // Connector should now be removed from the snapshot
+ assertFalse(configState.contains(CONNECTOR_IDS.get(0)));
+ assertEquals(0, configState.taskCount(CONNECTOR_IDS.get(0)));
+ // Ensure that the deleted connector's deferred task updates have been
cleaned up
+ // in order to prevent unbounded growth of the map
+ assertEquals(Collections.emptyMap(),
configStorage.deferredTaskUpdates);
+
+ configStorage.stop();
+ verify(configLog).stop();
+ }
+
+ @Test
+ public void testPutTaskConfigsDoesNotResolveAllInconsistencies() throws
Exception {
+ // Test a case where a failure and compaction has left us in an
inconsistent state when reading the log.
+ // We start out by loading an initial configuration where we started
to write a task update, and then
+ // compaction cleaned up the earlier record.
+ List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+ new ConsumerRecord<>(TOPIC, 0, 0, 0L,
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(0), new RecordHeaders(),
Optional.empty()),
+ // This is the record that has been compacted:
+ //new ConsumerRecord<>(TOPIC, 0, 1, TASK_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(1)),
+ new ConsumerRecord<>(TOPIC, 0, 2, 0L,
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
+ CONFIGS_SERIALIZED.get(2), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 4, 0L,
TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(4), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 5, 0L,
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(5), new RecordHeaders(),
Optional.empty()));
+ LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
+ deserialized.put(CONFIGS_SERIALIZED.get(0),
CONNECTOR_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(2),
TASK_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(4),
TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+ deserialized.put(CONFIGS_SERIALIZED.get(5),
TASK_CONFIG_STRUCTS.get(1));
+ logOffset = 6;
+ expectStart(existingRecords, deserialized);
+ when(configLog.partitionCount()).thenReturn(1);
+
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+ configStorage.start();
+
+ // After reading the log, it should have been in an inconsistent state
+ ClusterConfigState configState = configStorage.snapshot();
+ assertEquals(6, configState.offset()); // Should always be next to be
read, not last committed
+ assertEquals(Collections.singletonList(CONNECTOR_IDS.get(0)), new
ArrayList<>(configState.connectors()));
+ // Inconsistent data should leave us with no tasks listed for the
connector and an entry in the inconsistent list
+ assertEquals(Collections.emptyList(),
configState.tasks(CONNECTOR_IDS.get(0)));
+ // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0]
+ assertNull(configState.taskConfig(TASK_IDS.get(0)));
+ assertNull(configState.taskConfig(TASK_IDS.get(1)));
+ assertEquals(Collections.singleton(CONNECTOR_IDS.get(0)),
configState.inconsistentConnectors());
// Records to be read by consumer as it reads to the end of the log
LinkedHashMap<String, byte[]> serializedConfigs = new
LinkedHashMap<>();
serializedConfigs.put(TASK_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(0));
- serializedConfigs.put(TASK_CONFIG_KEYS.get(1),
CONFIGS_SERIALIZED.get(1));
serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(2));
- expectReadToEnd(serializedConfigs);
+ // Successful attempt to write new task config
+ doAnswer(expectReadToEnd(new LinkedHashMap<>()))
+ .doAnswer(expectReadToEnd(new LinkedHashMap<>()))
+ .doAnswer(expectReadToEnd(serializedConfigs))
+ .when(configLog).readToEnd();
+ expectConvertWriteRead(
+ TASK_CONFIG_KEYS.get(0),
KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
+ "properties", SAMPLE_CONFIGS.get(0));
+ expectConvertWriteRead(
+ COMMIT_TASKS_CONFIG_KEYS.get(0),
KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2),
+ "tasks", 1); // Updated to just 1 task
+
+ // Next, issue a write that has everything that is needed and it
should be accepted. Note that in this case
+ // we are going to shrink the number of tasks to 1
+ configStorage.putTaskConfigs("connector1",
Collections.singletonList(SAMPLE_CONFIGS.get(0)));
+
+ // Validate updated config
+ configState = configStorage.snapshot();
+ // This is only two more ahead of the last one because multiple calls
fail, and so their configs are not written
+ // to the topic. Only the last call with 1 task config + 1 commit
actually gets written.
+ assertEquals(8, configState.offset());
+ assertEquals(Collections.singletonList(CONNECTOR_IDS.get(0)), new
ArrayList<>(configState.connectors()));
+ assertEquals(Collections.singletonList(TASK_IDS.get(0)),
configState.tasks(CONNECTOR_IDS.get(0)));
+ assertEquals(SAMPLE_CONFIGS.get(0),
configState.taskConfig(TASK_IDS.get(0)));
+ assertEquals(Collections.EMPTY_SET,
configState.inconsistentConnectors());
+
+ // As soon as root is rewritten, we should see a callback notifying us
that we reconfigured some tasks
+
verify(configUpdateListener).onTaskConfigUpdate(Collections.singletonList(TASK_IDS.get(0)));
+
+ configStorage.stop();
+ verify(configLog).stop();
+ }
+
+ @Test
+ public void testPutRestartRequestOnlyFailed() throws Exception {
+ RestartRequest restartRequest = new
RestartRequest(CONNECTOR_IDS.get(0), true, false);
+ testPutRestartRequest(restartRequest);
+ }
+
+ @Test
+ public void testPutRestartRequestOnlyFailedIncludingTasks() throws
Exception {
+ RestartRequest restartRequest = new
RestartRequest(CONNECTOR_IDS.get(0), true, true);
+ testPutRestartRequest(restartRequest);
+ }
+
+ private void testPutRestartRequest(RestartRequest restartRequest) throws
Exception {
+ expectStart(Collections.emptyList(), Collections.emptyMap());
+ when(configLog.partitionCount()).thenReturn(1);
+
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+
+ configStorage.start();
+ verify(configLog).start();
- // Task count records are read back after writing as well
expectConvertWriteRead(
- CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0),
KafkaConfigBackingStore.TASK_COUNT_RECORD_V0, CONFIGS_SERIALIZED.get(3),
- "task-count", 4);
- serializedConfigs = new LinkedHashMap<>();
- serializedConfigs.put(CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0),
CONFIGS_SERIALIZED.get(3));
- expectReadToEnd(serializedConfigs);
+ RESTART_CONNECTOR_KEYS.get(0),
KafkaConfigBackingStore.RESTART_REQUEST_V0, CONFIGS_SERIALIZED.get(0),
+ ONLY_FAILED_FIELD_NAME, restartRequest.onlyFailed());
+
+ LinkedHashMap<String, byte[]> recordsToRead = new LinkedHashMap<>();
+ recordsToRead.put(RESTART_CONNECTOR_KEYS.get(0),
CONFIGS_SERIALIZED.get(0));
+ doAnswer(expectReadToEnd(recordsToRead)).when(configLog).readToEnd();
+
+ // Writing should block until it is written and read back from Kafka
+ configStorage.putRestartRequest(restartRequest);
- expectPartitionCount(1);
- expectStop();
+ final ArgumentCaptor<RestartRequest> restartRequestCaptor =
ArgumentCaptor.forClass(RestartRequest.class);
+
verify(configUpdateListener).onRestartRequest(restartRequestCaptor.capture());
- PowerMock.replayAll();
+ assertEquals(restartRequest.connectorName(),
restartRequestCaptor.getValue().connectorName());
+ assertEquals(restartRequest.onlyFailed(),
restartRequestCaptor.getValue().onlyFailed());
+ assertEquals(restartRequest.includeTasks(),
restartRequestCaptor.getValue().includeTasks());
+
+ configStorage.stop();
+ verify(configLog).stop();
+ }
+
+ @Test
+ public void testRestoreRestartRequestInconsistentState() {
+ // Restoring data should notify only of the latest values after
loading is complete. This also validates
+ // that inconsistent state doesn't prevent startup.
+ // Overwrite each type at least once to ensure we see the latest data
after loading
+ List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+ new ConsumerRecord<>(TOPIC, 0, 0, 0L,
TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(0), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 1, 0L,
TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(1),
+ CONFIGS_SERIALIZED.get(1), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 2, 0L,
TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(1),
+ CONFIGS_SERIALIZED.get(2), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 3, 0L,
TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(1),
+ CONFIGS_SERIALIZED.get(3), new RecordHeaders(),
Optional.empty()));
+ LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
+ deserialized.put(CONFIGS_SERIALIZED.get(0),
RESTART_REQUEST_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(1),
RESTART_REQUEST_STRUCTS.get(1));
+ deserialized.put(CONFIGS_SERIALIZED.get(2),
RESTART_REQUEST_STRUCTS.get(2));
+ deserialized.put(CONFIGS_SERIALIZED.get(3), null);
+ logOffset = 4;
+ expectStart(existingRecords, deserialized);
+ when(configLog.partitionCount()).thenReturn(1);
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+
configStorage.start();
+ verify(configLog).start();
+
+ // Shouldn't see any callbacks since this is during startup
+ verify(configUpdateListener,
never()).onConnectorConfigRemove(anyString());
+ verify(configUpdateListener,
never()).onConnectorConfigUpdate(anyString());
+ verify(configUpdateListener,
never()).onTaskConfigUpdate(anyCollection());
+ verify(configUpdateListener,
never()).onConnectorTargetStateChange(anyString());
+ verify(configUpdateListener,
never()).onSessionKeyUpdate(any(SessionKey.class));
+ verify(configUpdateListener,
never()).onRestartRequest(any(RestartRequest.class));
+ verify(configUpdateListener,
never()).onLoggingLevelUpdate(anyString(), anyString());
+
+ configStorage.stop();
+ verify(configLog).stop();
+ }
+
+ @Test
+ public void testPutTaskConfigsZeroTasks() throws Exception {
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+ configStorage.start();
+ verify(configLog).start();
+
+ // Records to be read by consumer as it reads to the end of the log
+ doAnswer(expectReadToEnd(new LinkedHashMap<>())).
+
doAnswer(expectReadToEnd(Collections.singletonMap(COMMIT_TASKS_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(0))))
+ .when(configLog).readToEnd();
+
+ expectConvertWriteRead(
+ COMMIT_TASKS_CONFIG_KEYS.get(0),
KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(0),
+ "tasks", 0); // We have 0 tasks
// Bootstrap as if we had already added the connector, but no tasks
had been added yet
- whiteboxAddConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0),
Collections.emptyList());
+ addConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0),
Collections.emptyList());
- // Before anything is written
- String connectorName = CONNECTOR_IDS.get(0);
+
+ // Null before writing
ClusterConfigState configState = configStorage.snapshot();
- assertFalse(configState.pendingFencing(connectorName));
- assertNull(configState.taskCountRecord(connectorName));
- assertNull(configState.taskConfigGeneration(connectorName));
+ assertEquals(-1, configState.offset());
// Writing task configs should block until all the writes have been
performed and the root record update
// has completed
- List<Map<String, String>> taskConfigs =
Arrays.asList(SAMPLE_CONFIGS.get(0), SAMPLE_CONFIGS.get(1));
+ List<Map<String, String>> taskConfigs = Collections.emptyList();
configStorage.putTaskConfigs("connector1", taskConfigs);
+ // Validate root config by listing all connectors and tasks
configState = configStorage.snapshot();
- assertEquals(3, configState.offset());
- assertTrue(configState.pendingFencing(connectorName));
- assertNull(configState.taskCountRecord(connectorName));
- assertEquals(0, (long)
configState.taskConfigGeneration(connectorName));
+ assertEquals(1, configState.offset());
+ String connectorName = CONNECTOR_IDS.get(0);
+ assertEquals(Collections.singletonList(connectorName), new
ArrayList<>(configState.connectors()));
+ assertEquals(Collections.emptyList(),
configState.tasks(connectorName));
+ assertEquals(Collections.EMPTY_SET,
configState.inconsistentConnectors());
+
+ // As soon as root is rewritten, we should see a callback notifying us
that we reconfigured some tasks
+
verify(configUpdateListener).onTaskConfigUpdate(Collections.emptyList());
+
+ configStorage.stop();
+ verify(configLog).stop();
+ }
- configStorage.putTaskCountRecord(connectorName, 4);
+ @Test
+ public void testBackgroundUpdateTargetState() throws Exception {
+ // verify that we handle target state changes correctly when they come
up through the log
+ List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+ new ConsumerRecord<>(TOPIC, 0, 0, 0L,
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(0), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 1, 0L,
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(1), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 2, 0L,
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
+ CONFIGS_SERIALIZED.get(2), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 3, 0L,
TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(3), new RecordHeaders(),
Optional.empty()));
+ LinkedHashMap<byte[], Struct> deserializedOnStartup = new
LinkedHashMap<>();
+ deserializedOnStartup.put(CONFIGS_SERIALIZED.get(0),
CONNECTOR_CONFIG_STRUCTS.get(0));
+ deserializedOnStartup.put(CONFIGS_SERIALIZED.get(1),
TASK_CONFIG_STRUCTS.get(0));
+ deserializedOnStartup.put(CONFIGS_SERIALIZED.get(2),
TASK_CONFIG_STRUCTS.get(0));
+ deserializedOnStartup.put(CONFIGS_SERIALIZED.get(3),
TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+ logOffset = 5;
+
+ expectStart(existingRecords, deserializedOnStartup);
+ when(configLog.partitionCount()).thenReturn(1);
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+ configStorage.start();
+ verify(configLog).start();
+
+ // Should see a single connector with initial state started
+ ClusterConfigState configState = configStorage.snapshot();
+ assertEquals(Collections.singleton(CONNECTOR_IDS.get(0)),
configStorage.connectorTargetStates.keySet());
+ assertEquals(TargetState.STARTED,
configState.targetState(CONNECTOR_IDS.get(0)));
+
+ LinkedHashMap<String, byte[]> serializedAfterStartup = new
LinkedHashMap<>();
+ serializedAfterStartup.put(TARGET_STATE_KEYS.get(0),
CONFIGS_SERIALIZED.get(0));
+ serializedAfterStartup.put(TARGET_STATE_KEYS.get(1),
CONFIGS_SERIALIZED.get(1));
+
doAnswer(expectReadToEnd(serializedAfterStartup)).when(configLog).readToEnd();
+
+ Map<String, Struct> deserializedAfterStartup = new HashMap<>();
+ deserializedAfterStartup.put(TARGET_STATE_KEYS.get(0),
TARGET_STATE_PAUSED);
+ deserializedAfterStartup.put(TARGET_STATE_KEYS.get(1),
TARGET_STATE_STOPPED);
+ expectRead(serializedAfterStartup, deserializedAfterStartup);
+
+ // Should see two connectors now, one paused and one stopped
+ configStorage.refresh(0, TimeUnit.SECONDS);
+
verify(configUpdateListener).onConnectorTargetStateChange(CONNECTOR_IDS.get(0));
configState = configStorage.snapshot();
- assertEquals(4, configState.offset());
- assertFalse(configState.pendingFencing(connectorName));
- assertEquals(4, (long) configState.taskCountRecord(connectorName));
- assertEquals(0, (long)
configState.taskConfigGeneration(connectorName));
+
+ assertEquals(new HashSet<>(CONNECTOR_IDS),
configStorage.connectorTargetStates.keySet());
+ assertEquals(TargetState.PAUSED,
configState.targetState(CONNECTOR_IDS.get(0)));
+ assertEquals(TargetState.STOPPED,
configState.targetState(CONNECTOR_IDS.get(1)));
+
+ configStorage.stop();
+ verify(configStorage).stop();
+ }
+
+ @Test
+ public void testSameTargetState() throws Exception {
+ // verify that we handle target state changes correctly when they come
up through the log
+ List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+ new ConsumerRecord<>(TOPIC, 0, 0, 0L,
TimestampType.CREATE_TIME, 0, 0, CONNECTOR_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(0), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 1, 0L,
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(1), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 2, 0L,
TimestampType.CREATE_TIME, 0, 0, TASK_CONFIG_KEYS.get(1),
+ CONFIGS_SERIALIZED.get(2), new RecordHeaders(),
Optional.empty()),
+ new ConsumerRecord<>(TOPIC, 0, 3, 0L,
TimestampType.CREATE_TIME, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0),
+ CONFIGS_SERIALIZED.get(3), new RecordHeaders(),
Optional.empty()));
+ LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
+ deserialized.put(CONFIGS_SERIALIZED.get(0),
CONNECTOR_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(1),
TASK_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(2),
TASK_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(3),
TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+ logOffset = 5;
+
+ expectStart(existingRecords, deserialized);
+
+ when(configLog.partitionCount()).thenReturn(1);
+
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+ configStorage.start();
+ verify(configLog).start();
+
+ ClusterConfigState configState = configStorage.snapshot();
+ expectRead(TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(0),
TARGET_STATE_STARTED);
+ // Should see a single connector with initial state paused
+ assertEquals(TargetState.STARTED,
configState.targetState(CONNECTOR_IDS.get(0)));
+
+ expectRead(TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(0),
TARGET_STATE_STARTED);
+ // on resume update listener shouldn't be called
+ verify(configUpdateListener,
never()).onConnectorTargetStateChange(anyString());
+
+ configStorage.stop();
+ verify(configStorage).stop();
+ }
+
+
+ @Test
+ public void testPutLogLevel() throws Exception {
+ final String logger1 = "org.apache.zookeeper";
+ final String logger2 = "org.apache.cassandra";
+ final String logger3 = "org.apache.kafka.clients";
+ final String logger4 = "org.apache.kafka.connect";
+ final String level1 = "ERROR";
+ final String level3 = "WARN";
+ final String level4 = "DEBUG";
+
+ final Struct existingLogLevel = new
Struct(KafkaConfigBackingStore.LOGGER_LEVEL_V0)
+ .put("level", level1);
+
+ // Pre-populate the config topic with a couple of logger level
records; these should be ignored (i.e.,
+ // not reported to the update listener)
+ List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+ new ConsumerRecord<>(TOPIC, 0, 0, 0L,
TimestampType.CREATE_TIME, 0, 0, "logger-cluster-" + logger1,
+ CONFIGS_SERIALIZED.get(0), new RecordHeaders(),
Optional.empty()
+ ),
+ new ConsumerRecord<>(TOPIC, 0, 1, 0L,
TimestampType.CREATE_TIME, 0, 0, "logger-cluster-" + logger2,
+ CONFIGS_SERIALIZED.get(1), new RecordHeaders(),
Optional.empty()
+ )
+ );
+ LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
+ deserialized.put(CONFIGS_SERIALIZED.get(0), existingLogLevel);
+ // Make sure we gracefully handle tombstones
+ deserialized.put(CONFIGS_SERIALIZED.get(1), null);
+ logOffset = 2;
+
+ expectStart(existingRecords, deserialized);
+ when(configLog.partitionCount()).thenReturn(1);
+
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+
+ configStorage.start();
+ verify(configLog).start();
+
+ expectConvertWriteRead(
+ "logger-cluster-" + logger3,
KafkaConfigBackingStore.LOGGER_LEVEL_V0, CONFIGS_SERIALIZED.get(2),
+ "level", level3);
+ configStorage.putLoggerLevel(logger3, level3);
+
+ expectConvertWriteRead(
+ "logger-cluster-" + logger4,
KafkaConfigBackingStore.LOGGER_LEVEL_V0, CONFIGS_SERIALIZED.get(3),
+ "level", level4);
+ configStorage.putLoggerLevel(logger4, level4);
+
+ LinkedHashMap<String, byte[]> newRecords = new LinkedHashMap<>();
+ newRecords.put("logger-cluster-" + logger3, CONFIGS_SERIALIZED.get(2));
+ newRecords.put("logger-cluster-" + logger4, CONFIGS_SERIALIZED.get(3));
+ doAnswer(expectReadToEnd(newRecords)).when(configLog).readToEnd();
+
+ configStorage.refresh(0, TimeUnit.SECONDS);
+ verify(configUpdateListener).onLoggingLevelUpdate(logger3, level3);
+ verify(configUpdateListener).onLoggingLevelUpdate(logger4, level4);
configStorage.stop();
+ verify(configLog).stop();
+ }
- PowerMock.verifyAll();
+ @Test
+ public void testTaskCountRecordsAndGenerations() throws Exception {
+ configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+ verifyConfigure();
+ configStorage.start();
+ verify(configLog).start();
+
+ // Records to be read by consumer as it reads to the end of the log
+ doAnswer(expectReadToEnd(new LinkedHashMap<>()))
+ .doAnswer(expectReadToEnd(new LinkedHashMap<String, byte[]>()
{{
+ put(TASK_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(0));
+ put(TASK_CONFIG_KEYS.get(1),
CONFIGS_SERIALIZED.get(0));
+ put(COMMIT_TASKS_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(1));
+ }})
+ )
+ .doAnswer(expectReadToEnd(new LinkedHashMap<String, byte[]>()
{{
+ put(CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0),
CONFIGS_SERIALIZED.get(2));
+ }})
+ )
+ .when(configLog).readToEnd();
+
+ // Task configs should read to end, write to the log, read to end,
write root, then read to end again
+// expectConvertWriteRead(TASK_CONFIG_KEYS.get(0),
KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
+// "properties", SAMPLE_CONFIGS.get(0));
+// expectConvertWriteRead(
+// TASK_CONFIG_KEYS.get(1),
KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
+// "properties", SAMPLE_CONFIGS.get(0));
+// expectConvertWriteRead(
+// COMMIT_TASKS_CONFIG_KEYS.get(0),
KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(0),
+// "tasks", 2); // Starts with 0 tasks, after update has 2
+//
+// // Task count records are read back after writing as well
+// expectConvertWriteRead(
+// CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0),
KafkaConfigBackingStore.TASK_COUNT_RECORD_V0, CONFIGS_SERIALIZED.get(0),
+// "task-count", 4);
+
+ when(configLog.partitionCount()).thenReturn(1);
+
+ addConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0),
Collections.emptyList());
+
+ // Before anything is written
+ String connectorName = CONNECTOR_IDS.get(0);
+ ClusterConfigState configState = configStorage.snapshot();
+ assertFalse(configState.pendingFencing(connectorName));
+ assertNull(configState.taskCountRecord(connectorName));
+ assertNull(configState.taskConfigGeneration(connectorName));
+
+ // Writing task configs should block until all the writes have been
performed and the root record update
+ // has completed
+ List<Map<String, String>> taskConfigs =
Arrays.asList(SAMPLE_CONFIGS.get(0), SAMPLE_CONFIGS.get(0));
+
+ Struct connectConfig = new
Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0);
+ connectConfig.put("properties", SAMPLE_CONFIGS.get(0));
+ Struct taskCountRecord = new
Struct(KafkaConfigBackingStore.TASK_COUNT_RECORD_V0);
+ taskCountRecord.put("task-count", 4);
+ Struct connectTaskCommitConfig = new
Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0);
+ connectTaskCommitConfig.put("tasks", taskConfigs.size());
+
+
doReturn(CONFIGS_SERIALIZED.get(0)).when(converter).fromConnectData(TOPIC,
KafkaConfigBackingStore.TASK_CONFIGURATION_V0, connectConfig);
+
doReturn(CONFIGS_SERIALIZED.get(0)).when(converter).fromConnectData(TOPIC,
KafkaConfigBackingStore.TASK_COUNT_RECORD_V0, taskCountRecord);
+
doReturn(CONFIGS_SERIALIZED.get(0)).when(converter).fromConnectData(TOPIC,
KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, connectTaskCommitConfig);
+
+
doReturn(producerFuture).when(configLog).sendWithReceipt(TASK_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(0));
+
doReturn(producerFuture).when(configLog).sendWithReceipt(TASK_CONFIG_KEYS.get(1),
CONFIGS_SERIALIZED.get(0));
+
doReturn(producerFuture).when(configLog).sendWithReceipt(COMMIT_TASKS_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(1));
+
doReturn(producerFuture).when(configLog).sendWithReceipt(CONNECTOR_TASK_COUNT_RECORD_KEYS.get(0),
CONFIGS_SERIALIZED.get(2));
+ doAnswer(invocation -> {
+ // Note null schema because default settings for internal
serialization are schema-less
+ return new SchemaAndValue(null, structToMap(connectConfig));
+ }).when(converter).toConnectData(TOPIC, CONFIGS_SERIALIZED.get(0));
+ doAnswer(invocation -> {
+ // Note null schema because default settings for internal
serialization are schema-less
+ return new SchemaAndValue(null,
structToMap(connectTaskCommitConfig));
+ }).when(converter).toConnectData(TOPIC, CONFIGS_SERIALIZED.get(1));
+ doAnswer(invocation -> {
+ // Note null schema because default settings for internal
serialization are schema-less
+ return new SchemaAndValue(null, structToMap(taskCountRecord));
+ }).when(converter).toConnectData(TOPIC, CONFIGS_SERIALIZED.get(2));
+ doReturn(null).when(producerFuture).get(anyLong(),
any(TimeUnit.class));
+ configStorage.putTaskConfigs("connector1", taskConfigs);
+
+// configState = configStorage.snapshot();
+// assertEquals(3, configState.offset());
+// assertTrue(configState.pendingFencing(connectorName));
+// assertNull(configState.taskCountRecord(connectorName));
+// assertEquals(0, (long)
configState.taskConfigGeneration(connectorName));
+//
+// configStorage.putTaskCountRecord(connectorName, 4);
+//
+// configState = configStorage.snapshot();
+// assertEquals(4, configState.offset());
+// assertFalse(configState.pendingFencing(connectorName));
+// assertEquals(4, (long) configState.taskCountRecord(connectorName));
+// assertEquals(0, (long)
configState.taskConfigGeneration(connectorName));
+//
+// // As soon as root is rewritten, we should see a callback notifying
us that we reconfigured some tasks
+//
verify(configUpdateListener).onTaskConfigUpdate(Arrays.asList(TASK_IDS.get(0),
TASK_IDS.get(1)));
+
+ configStorage.stop();
+ verify(configLog).stop();
}
@Test
public void testPutTaskConfigs() throws Exception {
Review Comment:
Please take a look at following example:
```java
@Test
public void testPutTaskConfigs() throws Exception {
// Records to be read by consumer as it reads to the end of the log
LinkedHashMap<String, byte[]> serializedConfigs = new
LinkedHashMap<>();
serializedConfigs.put(TASK_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(0));
serializedConfigs.put(TASK_CONFIG_KEYS.get(1),
CONFIGS_SERIALIZED.get(1));
serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0),
CONFIGS_SERIALIZED.get(2));
// Task configs should read to end, write to the log, read to end,
write root, then read to end again
doAnswer(expectReadToEnd(new LinkedHashMap<>()))
.doAnswer(expectReadToEnd(new LinkedHashMap<>()))
.doAnswer(expectReadToEnd(serializedConfigs))
.when(configLog).readToEnd();
when(configLog.partitionCount()).thenReturn(1);
expectConvertWriteRead(TASK_CONFIG_KEYS.get(0),
KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
new
Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0).put("properties",
SAMPLE_CONFIGS.get(0)));
expectConvertWriteRead(TASK_CONFIG_KEYS.get(1),
KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1),
new
Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0).put("properties",
SAMPLE_CONFIGS.get(1)));
expectConvertWriteRead(COMMIT_TASKS_CONFIG_KEYS.get(0),
KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2),
new
Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 2));
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
configStorage.start();
verifyConfigure();
// Bootstrap as if we had already added the connector, but no tasks
had been added yet
addConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0),
Collections.emptyList());
// Null before writing
ClusterConfigState configState = configStorage.snapshot();
assertEquals(-1, configState.offset());
assertNull(configState.taskConfig(TASK_IDS.get(0)));
assertNull(configState.taskConfig(TASK_IDS.get(1)));
// Writing task configs should block until all the writes have been
performed and the root record update
// has completed
List<Map<String, String>> taskConfigs =
Arrays.asList(SAMPLE_CONFIGS.get(0), SAMPLE_CONFIGS.get(1));
configStorage.putTaskConfigs("connector1", taskConfigs);
// Validate root config by listing all connectors and tasks
configState = configStorage.snapshot();
assertEquals(3, configState.offset());
String connectorName = CONNECTOR_IDS.get(0);
assertEquals(Collections.singletonList(connectorName), new
ArrayList<>(configState.connectors()));
assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)),
configState.tasks(connectorName));
assertEquals(SAMPLE_CONFIGS.get(0),
configState.taskConfig(TASK_IDS.get(0)));
assertEquals(SAMPLE_CONFIGS.get(1),
configState.taskConfig(TASK_IDS.get(1)));
assertEquals(Collections.EMPTY_SET,
configState.inconsistentConnectors());
// As soon as root is rewritten, we should see a callback notifying
us that we reconfigured some tasks
verify(configUpdateListener).onTaskConfigUpdate(Arrays.asList(TASK_IDS.get(0),
TASK_IDS.get(1)));
configStorage.stop();
verify(configLog).stop();
}
private void expectConvertWriteRead(final String configKey, final Schema
valueSchema, final byte[] serialized,
final Struct value)
throws Exception {
doReturn(serialized).when(converter).fromConnectData(eq(TOPIC),
eq(valueSchema), eq(value));
doReturn(producerFuture).when(configLog).sendWithReceipt(eq(configKey),
eq(serialized));
doReturn(null).when(producerFuture).get(anyLong(),
any(TimeUnit.class));
doReturn(new SchemaAndValue(null,
structToMap(value))).when(converter).toConnectData(eq(TOPIC), eq(serialized));
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]