showuon commented on code in PR #11748:
URL: https://github.com/apache/kafka/pull/11748#discussion_r871177514


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##########
@@ -232,11 +240,10 @@ private void refreshIdleConsumerGroupOffset() {
                 ConsumerGroupState consumerGroupState = 
consumerGroupDesc.state();
                 // sync offset to the target cluster only if the state of 
current consumer group is:
                 // (1) idle: because the consumer at target is not actively 
consuming the mirrored topic
-                // (2) dead: the new consumer that is recently created at 
source and never exist at target
-                if (consumerGroupState.equals(ConsumerGroupState.EMPTY)) {
+                // (2) dead: the new consumer that is recently created at 
source and never existed at target
+                if (consumerGroupState == ConsumerGroupState.EMPTY) {
                     idleConsumerGroupsOffset.put(group, 
targetAdminClient.listConsumerGroupOffsets(group)
-                        
.partitionsToOffsetAndMetadata().get().entrySet().stream().collect(
-                            Collectors.toMap(Entry::getKey, Entry::getValue)));
+                        .partitionsToOffsetAndMetadata().get());

Review Comment:
   Nice clean up.



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -526,6 +520,65 @@ public void testOffsetSyncsTopicsOnTarget() throws 
Exception {
         assertFalse(primaryTopics.contains("mm2-offset-syncs." + 
BACKUP_CLUSTER_ALIAS + ".internal"));
     }
 
+    @Test
+    public void testNoCheckpointsIfNoRecordsAreMirrored() throws 
InterruptedException {
+        String consumerGroupName = "consumer-group-no-checkpoints";
+        Map<String, Object> consumerProps = 
Collections.singletonMap("group.id", consumerGroupName);
+
+        // ensure there are some records in the topic on the source cluster
+        produceMessages(primary, "test-topic-1");
+
+        // warm up consumers before starting the connectors, so we don't need 
to wait for discovery
+        warmUpConsumer(consumerProps);
+
+        // one way replication from primary to backup
+        mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + 
".enabled", "false");
+        mm2Config = new MirrorMakerConfig(mm2Props);
+        waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, 
PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
+
+        // make sure the topics  are created in the backup cluster
+        waitForTopicCreated(backup, remoteTopicName("test-topic-1", 
PRIMARY_CLUSTER_ALIAS));
+        waitForTopicCreated(backup, remoteTopicName("test-topic-no-records", 
PRIMARY_CLUSTER_ALIAS));
+
+        // commit some offsets for both topics in the source cluster
+        TopicPartition tp1 = new TopicPartition("test-topic-1", 0);
+        TopicPartition tp2 = new TopicPartition("test-topic-no-records", 0);
+        //Map<String, Object> consumerProps  = 
Collections.singletonMap("group.id", consumerGroupName);
+        try (Consumer<byte[], byte[]> consumer = 
primary.kafka().createConsumer(consumerProps)) {
+            Collection<TopicPartition> tps = Arrays.asList(tp1, tp2);
+            Map<TopicPartition, Long> endOffsets = consumer.endOffsets(tps);
+            Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = 
endOffsets.entrySet().stream()
+                            .collect(Collectors.toMap(
+                                    Map.Entry::getKey,
+                                    e -> new OffsetAndMetadata(e.getValue())
+                            ));
+            consumer.commitSync(offsetsToCommit);
+        }
+
+        // Only test-topic-1 should have translated offsets because we've not 
yet mirrored any records for topic-no-records

Review Comment:
   // ... because we've not yet mirrored any records for [test-]topic-no-records



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -526,6 +520,65 @@ public void testOffsetSyncsTopicsOnTarget() throws 
Exception {
         assertFalse(primaryTopics.contains("mm2-offset-syncs." + 
BACKUP_CLUSTER_ALIAS + ".internal"));
     }
 
+    @Test
+    public void testNoCheckpointsIfNoRecordsAreMirrored() throws 
InterruptedException {
+        String consumerGroupName = "consumer-group-no-checkpoints";
+        Map<String, Object> consumerProps = 
Collections.singletonMap("group.id", consumerGroupName);
+
+        // ensure there are some records in the topic on the source cluster
+        produceMessages(primary, "test-topic-1");
+
+        // warm up consumers before starting the connectors, so we don't need 
to wait for discovery
+        warmUpConsumer(consumerProps);
+
+        // one way replication from primary to backup
+        mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + 
".enabled", "false");
+        mm2Config = new MirrorMakerConfig(mm2Props);
+        waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, 
PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
+
+        // make sure the topics  are created in the backup cluster
+        waitForTopicCreated(backup, remoteTopicName("test-topic-1", 
PRIMARY_CLUSTER_ALIAS));
+        waitForTopicCreated(backup, remoteTopicName("test-topic-no-records", 
PRIMARY_CLUSTER_ALIAS));
+
+        // commit some offsets for both topics in the source cluster
+        TopicPartition tp1 = new TopicPartition("test-topic-1", 0);
+        TopicPartition tp2 = new TopicPartition("test-topic-no-records", 0);
+        //Map<String, Object> consumerProps  = 
Collections.singletonMap("group.id", consumerGroupName);
+        try (Consumer<byte[], byte[]> consumer = 
primary.kafka().createConsumer(consumerProps)) {
+            Collection<TopicPartition> tps = Arrays.asList(tp1, tp2);
+            Map<TopicPartition, Long> endOffsets = consumer.endOffsets(tps);
+            Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = 
endOffsets.entrySet().stream()
+                            .collect(Collectors.toMap(
+                                    Map.Entry::getKey,
+                                    e -> new OffsetAndMetadata(e.getValue())
+                            ));
+            consumer.commitSync(offsetsToCommit);
+        }
+
+        // Only test-topic-1 should have translated offsets because we've not 
yet mirrored any records for topic-no-records
+        MirrorClient backupClient = new 
MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS));
+        waitForCondition(() -> {
+            Map<TopicPartition, OffsetAndMetadata> translatedOffsets = 
backupClient.remoteConsumerOffsets(
+                    consumerGroupName, PRIMARY_CLUSTER_ALIAS, 
Duration.ofSeconds(30L));
+            return translatedOffsets.containsKey(remoteTopicPartition(tp1, 
PRIMARY_CLUSTER_ALIAS)) &&
+                   !translatedOffsets.containsKey(remoteTopicPartition(tp2, 
PRIMARY_CLUSTER_ALIAS));
+        }, OFFSET_SYNC_DURATION_MS, "Checkpoints were not emitted correctly to 
backup cluster");
+
+        // Send some records to topic-no-records in the source cluster

Review Comment:
   I don't think the topic name: `test-topic-no-records` makes sense because we 
will send records to it in the end. WDYT?



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -319,27 +319,21 @@ public void testReplication() throws Exception {
         waitForCondition(() -> 
primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS,
             Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new 
TopicPartition("backup.test-topic-1", 0)), CHECKPOINT_DURATION_MS, "Offsets not 
translated downstream to primary cluster.");
 
-        waitForCondition(() -> 
primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS,
-            Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new 
TopicPartition("test-topic-1", 0)), CHECKPOINT_DURATION_MS, "Offsets not 
translated upstream to primary cluster.");
-
         Map<TopicPartition, OffsetAndMetadata> primaryOffsets = 
primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS,
                 Duration.ofMillis(CHECKPOINT_DURATION_MS));
  
         primaryClient.close();
         backupClient.close();
         
         // Failback consumer group to primary cluster
-        try (Consumer<byte[], byte[]> backupConsumer = 
primary.kafka().createConsumer(Collections.singletonMap("group.id", 
consumerGroupName))) {
-            backupConsumer.assign(primaryOffsets.keySet());
-            primaryOffsets.forEach(backupConsumer::seek);
-            backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
-            backupConsumer.commitAsync();
-        
-            assertTrue(backupConsumer.position(new 
TopicPartition("test-topic-1", 0)) > 0, "Consumer failedback to zero upstream 
offset.");
-            assertTrue(backupConsumer.position(new 
TopicPartition("backup.test-topic-1", 0)) > 0, "Consumer failedback to zero 
downstream offset.");
-            assertTrue(backupConsumer.position(
-                new TopicPartition("test-topic-1", 0)) <= 
NUM_RECORDS_PRODUCED, "Consumer failedback beyond expected upstream offset.");
-            assertTrue(backupConsumer.position(
+        try (Consumer<byte[], byte[]> primaryConsumer = 
primary.kafka().createConsumer(Collections.singletonMap("group.id", 
consumerGroupName))) {
+            primaryConsumer.assign(primaryOffsets.keySet());
+            primaryOffsets.forEach(primaryConsumer::seek);
+            primaryConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+            primaryConsumer.commitAsync();
+
+            assertTrue(primaryConsumer.position(new 
TopicPartition("backup.test-topic-1", 0)) > 0, "Consumer failedback to zero 
downstream offset.");

Review Comment:
   Why don't we verify the offset of `test-topic-1` now? Could you help me 
understand it? Thanks.



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -526,6 +520,65 @@ public void testOffsetSyncsTopicsOnTarget() throws 
Exception {
         assertFalse(primaryTopics.contains("mm2-offset-syncs." + 
BACKUP_CLUSTER_ALIAS + ".internal"));
     }
 
+    @Test
+    public void testNoCheckpointsIfNoRecordsAreMirrored() throws 
InterruptedException {
+        String consumerGroupName = "consumer-group-no-checkpoints";
+        Map<String, Object> consumerProps = 
Collections.singletonMap("group.id", consumerGroupName);
+
+        // ensure there are some records in the topic on the source cluster
+        produceMessages(primary, "test-topic-1");
+
+        // warm up consumers before starting the connectors, so we don't need 
to wait for discovery
+        warmUpConsumer(consumerProps);
+
+        // one way replication from primary to backup
+        mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + 
".enabled", "false");
+        mm2Config = new MirrorMakerConfig(mm2Props);
+        waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, 
PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
+
+        // make sure the topics  are created in the backup cluster
+        waitForTopicCreated(backup, remoteTopicName("test-topic-1", 
PRIMARY_CLUSTER_ALIAS));
+        waitForTopicCreated(backup, remoteTopicName("test-topic-no-records", 
PRIMARY_CLUSTER_ALIAS));
+
+        // commit some offsets for both topics in the source cluster
+        TopicPartition tp1 = new TopicPartition("test-topic-1", 0);
+        TopicPartition tp2 = new TopicPartition("test-topic-no-records", 0);
+        //Map<String, Object> consumerProps  = 
Collections.singletonMap("group.id", consumerGroupName);

Review Comment:
   should be removed?



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -319,27 +319,21 @@ public void testReplication() throws Exception {
         waitForCondition(() -> 
primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS,
             Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new 
TopicPartition("backup.test-topic-1", 0)), CHECKPOINT_DURATION_MS, "Offsets not 
translated downstream to primary cluster.");
 
-        waitForCondition(() -> 
primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS,
-            Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new 
TopicPartition("test-topic-1", 0)), CHECKPOINT_DURATION_MS, "Offsets not 
translated upstream to primary cluster.");

Review Comment:
   And why should we remove this wait condition? Please help me understand it. 
Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to