showuon commented on code in PR #11748:
URL: https://github.com/apache/kafka/pull/11748#discussion_r871177514
##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##########
@@ -232,11 +240,10 @@ private void refreshIdleConsumerGroupOffset() {
ConsumerGroupState consumerGroupState =
consumerGroupDesc.state();
// sync offset to the target cluster only if the state of
current consumer group is:
// (1) idle: because the consumer at target is not actively
consuming the mirrored topic
- // (2) dead: the new consumer that is recently created at
source and never exist at target
- if (consumerGroupState.equals(ConsumerGroupState.EMPTY)) {
+ // (2) dead: the new consumer that is recently created at
source and never existed at target
+ if (consumerGroupState == ConsumerGroupState.EMPTY) {
idleConsumerGroupsOffset.put(group,
targetAdminClient.listConsumerGroupOffsets(group)
-
.partitionsToOffsetAndMetadata().get().entrySet().stream().collect(
- Collectors.toMap(Entry::getKey, Entry::getValue)));
+ .partitionsToOffsetAndMetadata().get());
Review Comment:
Nice clean up.
##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -526,6 +520,65 @@ public void testOffsetSyncsTopicsOnTarget() throws
Exception {
assertFalse(primaryTopics.contains("mm2-offset-syncs." +
BACKUP_CLUSTER_ALIAS + ".internal"));
}
+ @Test
+ public void testNoCheckpointsIfNoRecordsAreMirrored() throws
InterruptedException {
+ String consumerGroupName = "consumer-group-no-checkpoints";
+ Map<String, Object> consumerProps =
Collections.singletonMap("group.id", consumerGroupName);
+
+ // ensure there are some records in the topic on the source cluster
+ produceMessages(primary, "test-topic-1");
+
+ // warm up consumers before starting the connectors, so we don't need
to wait for discovery
+ warmUpConsumer(consumerProps);
+
+ // one way replication from primary to backup
+ mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS +
".enabled", "false");
+ mm2Config = new MirrorMakerConfig(mm2Props);
+ waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config,
PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
+
+ // make sure the topics are created in the backup cluster
+ waitForTopicCreated(backup, remoteTopicName("test-topic-1",
PRIMARY_CLUSTER_ALIAS));
+ waitForTopicCreated(backup, remoteTopicName("test-topic-no-records",
PRIMARY_CLUSTER_ALIAS));
+
+ // commit some offsets for both topics in the source cluster
+ TopicPartition tp1 = new TopicPartition("test-topic-1", 0);
+ TopicPartition tp2 = new TopicPartition("test-topic-no-records", 0);
+ //Map<String, Object> consumerProps =
Collections.singletonMap("group.id", consumerGroupName);
+ try (Consumer<byte[], byte[]> consumer =
primary.kafka().createConsumer(consumerProps)) {
+ Collection<TopicPartition> tps = Arrays.asList(tp1, tp2);
+ Map<TopicPartition, Long> endOffsets = consumer.endOffsets(tps);
+ Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
endOffsets.entrySet().stream()
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ e -> new OffsetAndMetadata(e.getValue())
+ ));
+ consumer.commitSync(offsetsToCommit);
+ }
+
+ // Only test-topic-1 should have translated offsets because we've not
yet mirrored any records for topic-no-records
Review Comment:
// ... because we've not yet mirrored any records for [test-]topic-no-records
##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -526,6 +520,65 @@ public void testOffsetSyncsTopicsOnTarget() throws
Exception {
assertFalse(primaryTopics.contains("mm2-offset-syncs." +
BACKUP_CLUSTER_ALIAS + ".internal"));
}
+ @Test
+ public void testNoCheckpointsIfNoRecordsAreMirrored() throws
InterruptedException {
+ String consumerGroupName = "consumer-group-no-checkpoints";
+ Map<String, Object> consumerProps =
Collections.singletonMap("group.id", consumerGroupName);
+
+ // ensure there are some records in the topic on the source cluster
+ produceMessages(primary, "test-topic-1");
+
+ // warm up consumers before starting the connectors, so we don't need
to wait for discovery
+ warmUpConsumer(consumerProps);
+
+ // one way replication from primary to backup
+ mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS +
".enabled", "false");
+ mm2Config = new MirrorMakerConfig(mm2Props);
+ waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config,
PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
+
+ // make sure the topics are created in the backup cluster
+ waitForTopicCreated(backup, remoteTopicName("test-topic-1",
PRIMARY_CLUSTER_ALIAS));
+ waitForTopicCreated(backup, remoteTopicName("test-topic-no-records",
PRIMARY_CLUSTER_ALIAS));
+
+ // commit some offsets for both topics in the source cluster
+ TopicPartition tp1 = new TopicPartition("test-topic-1", 0);
+ TopicPartition tp2 = new TopicPartition("test-topic-no-records", 0);
+ //Map<String, Object> consumerProps =
Collections.singletonMap("group.id", consumerGroupName);
+ try (Consumer<byte[], byte[]> consumer =
primary.kafka().createConsumer(consumerProps)) {
+ Collection<TopicPartition> tps = Arrays.asList(tp1, tp2);
+ Map<TopicPartition, Long> endOffsets = consumer.endOffsets(tps);
+ Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
endOffsets.entrySet().stream()
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ e -> new OffsetAndMetadata(e.getValue())
+ ));
+ consumer.commitSync(offsetsToCommit);
+ }
+
+ // Only test-topic-1 should have translated offsets because we've not
yet mirrored any records for topic-no-records
+ MirrorClient backupClient = new
MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS));
+ waitForCondition(() -> {
+ Map<TopicPartition, OffsetAndMetadata> translatedOffsets =
backupClient.remoteConsumerOffsets(
+ consumerGroupName, PRIMARY_CLUSTER_ALIAS,
Duration.ofSeconds(30L));
+ return translatedOffsets.containsKey(remoteTopicPartition(tp1,
PRIMARY_CLUSTER_ALIAS)) &&
+ !translatedOffsets.containsKey(remoteTopicPartition(tp2,
PRIMARY_CLUSTER_ALIAS));
+ }, OFFSET_SYNC_DURATION_MS, "Checkpoints were not emitted correctly to
backup cluster");
+
+ // Send some records to topic-no-records in the source cluster
Review Comment:
I don't think the topic name: `test-topic-no-records` makes sense because we
will send records to it in the end. WDYT?
##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -319,27 +319,21 @@ public void testReplication() throws Exception {
waitForCondition(() ->
primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS,
Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new
TopicPartition("backup.test-topic-1", 0)), CHECKPOINT_DURATION_MS, "Offsets not
translated downstream to primary cluster.");
- waitForCondition(() ->
primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS,
- Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new
TopicPartition("test-topic-1", 0)), CHECKPOINT_DURATION_MS, "Offsets not
translated upstream to primary cluster.");
-
Map<TopicPartition, OffsetAndMetadata> primaryOffsets =
primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS,
Duration.ofMillis(CHECKPOINT_DURATION_MS));
primaryClient.close();
backupClient.close();
// Failback consumer group to primary cluster
- try (Consumer<byte[], byte[]> backupConsumer =
primary.kafka().createConsumer(Collections.singletonMap("group.id",
consumerGroupName))) {
- backupConsumer.assign(primaryOffsets.keySet());
- primaryOffsets.forEach(backupConsumer::seek);
- backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
- backupConsumer.commitAsync();
-
- assertTrue(backupConsumer.position(new
TopicPartition("test-topic-1", 0)) > 0, "Consumer failedback to zero upstream
offset.");
- assertTrue(backupConsumer.position(new
TopicPartition("backup.test-topic-1", 0)) > 0, "Consumer failedback to zero
downstream offset.");
- assertTrue(backupConsumer.position(
- new TopicPartition("test-topic-1", 0)) <=
NUM_RECORDS_PRODUCED, "Consumer failedback beyond expected upstream offset.");
- assertTrue(backupConsumer.position(
+ try (Consumer<byte[], byte[]> primaryConsumer =
primary.kafka().createConsumer(Collections.singletonMap("group.id",
consumerGroupName))) {
+ primaryConsumer.assign(primaryOffsets.keySet());
+ primaryOffsets.forEach(primaryConsumer::seek);
+ primaryConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+ primaryConsumer.commitAsync();
+
+ assertTrue(primaryConsumer.position(new
TopicPartition("backup.test-topic-1", 0)) > 0, "Consumer failedback to zero
downstream offset.");
Review Comment:
Why don't we verify the offset of `test-topic-1` now? Could you help me
understand it? Thanks.
##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -526,6 +520,65 @@ public void testOffsetSyncsTopicsOnTarget() throws
Exception {
assertFalse(primaryTopics.contains("mm2-offset-syncs." +
BACKUP_CLUSTER_ALIAS + ".internal"));
}
+ @Test
+ public void testNoCheckpointsIfNoRecordsAreMirrored() throws
InterruptedException {
+ String consumerGroupName = "consumer-group-no-checkpoints";
+ Map<String, Object> consumerProps =
Collections.singletonMap("group.id", consumerGroupName);
+
+ // ensure there are some records in the topic on the source cluster
+ produceMessages(primary, "test-topic-1");
+
+ // warm up consumers before starting the connectors, so we don't need
to wait for discovery
+ warmUpConsumer(consumerProps);
+
+ // one way replication from primary to backup
+ mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS +
".enabled", "false");
+ mm2Config = new MirrorMakerConfig(mm2Props);
+ waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config,
PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
+
+ // make sure the topics are created in the backup cluster
+ waitForTopicCreated(backup, remoteTopicName("test-topic-1",
PRIMARY_CLUSTER_ALIAS));
+ waitForTopicCreated(backup, remoteTopicName("test-topic-no-records",
PRIMARY_CLUSTER_ALIAS));
+
+ // commit some offsets for both topics in the source cluster
+ TopicPartition tp1 = new TopicPartition("test-topic-1", 0);
+ TopicPartition tp2 = new TopicPartition("test-topic-no-records", 0);
+ //Map<String, Object> consumerProps =
Collections.singletonMap("group.id", consumerGroupName);
Review Comment:
should be removed?
##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -319,27 +319,21 @@ public void testReplication() throws Exception {
waitForCondition(() ->
primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS,
Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new
TopicPartition("backup.test-topic-1", 0)), CHECKPOINT_DURATION_MS, "Offsets not
translated downstream to primary cluster.");
- waitForCondition(() ->
primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS,
- Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new
TopicPartition("test-topic-1", 0)), CHECKPOINT_DURATION_MS, "Offsets not
translated upstream to primary cluster.");
Review Comment:
And why should we remove this wait condition? Please help me understand it.
Thanks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]