gharris1727 commented on code in PR #16080:
URL: https://github.com/apache/kafka/pull/16080#discussion_r1622763556
##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java:
##########
@@ -101,7 +101,13 @@ public class MirrorSourceConfig extends
MirrorConnectorConfig {
public static final Class<?> CONFIG_PROPERTY_FILTER_CLASS_DEFAULT =
DefaultConfigPropertyFilter.class;
public static final String OFFSET_LAG_MAX = "offset.lag.max";
- private static final String OFFSET_LAG_MAX_DOC = "How out-of-sync a remote
partition can be before it is resynced.";
+ private static final String OFFSET_LAG_MAX_DOC = "Determines the maximum
allowed lag between the source and remote partitions before MirrorMaker
initiates a resync operation to catch up the remote partition. The lag is
calculated as the difference between the latest offset in the source partition
and the last committed offset in the remote partition.\n" +
+ "\n" +
+ "When the lag for a remote partition exceeds the
<code>offset.lag.max</code> value, MirrorMaker will initiate a resync operation
to catch up the remote partition with the source partition. This involves
reading records from the source partition starting from the last committed
offset in the remote partition and writing them to the remote partition.\n" +
Review Comment:
> This involves reading records from the source partition starting from the
last committed offset in the remote partition and writing them to the remote
partition.
This is incorrect, and I don't know where this information came from.
When the lag for a remote partition exceeds offset.lag.max, it will emit an
offset sync to the offset sync topic, which can then be used by the
MirrorCheckpointTask to translate upstream and downstream offsets.
##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java:
##########
@@ -101,7 +101,13 @@ public class MirrorSourceConfig extends
MirrorConnectorConfig {
public static final Class<?> CONFIG_PROPERTY_FILTER_CLASS_DEFAULT =
DefaultConfigPropertyFilter.class;
public static final String OFFSET_LAG_MAX = "offset.lag.max";
- private static final String OFFSET_LAG_MAX_DOC = "How out-of-sync a remote
partition can be before it is resynced.";
+ private static final String OFFSET_LAG_MAX_DOC = "Determines the maximum
allowed lag between the source and remote partitions before MirrorMaker
initiates a resync operation to catch up the remote partition. The lag is
calculated as the difference between the latest offset in the source partition
and the last committed offset in the remote partition.\n" +
+ "\n" +
+ "When the lag for a remote partition exceeds the
<code>offset.lag.max</code> value, MirrorMaker will initiate a resync operation
to catch up the remote partition with the source partition. This involves
reading records from the source partition starting from the last committed
offset in the remote partition and writing them to the remote partition.\n" +
+ "\n" +
+ "Setting <code>offset.lag.max</code> to a lower value can be
beneficial in scenarios where records may not flow constantly or at a
consistent rate, as it ensures the remote partitions stay more closely in sync
with the source partitions during periods of low throughput or inactivity. On
the other hand, setting it to a higher value can be useful when the source
topic has high throughput and the remote partitions can tolerate a larger
lag.\n" +
Review Comment:
> Setting <code>offset.lag.max</code> to a lower value can be beneficial in
scenarios where records may not flow constantly or at a consistent rate
Lowering the offset.lag.max to a non-zero value doesn't help when the flow
is inconsistent. If a pause in the flow happens in-between syncs, the records
after the last sync aren't translated. That was addressed in a separate
improvement: https://issues.apache.org/jira/browse/KAFKA-15906
Lowering the offset.lag.max to a nonzero value is only necessary when the
partition has a consistent low throughput, and a fixed sync every
offset.flush.interval.ms isn't acceptable, which is a very contrived scenario.
##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java:
##########
@@ -101,7 +101,13 @@ public class MirrorSourceConfig extends
MirrorConnectorConfig {
public static final Class<?> CONFIG_PROPERTY_FILTER_CLASS_DEFAULT =
DefaultConfigPropertyFilter.class;
public static final String OFFSET_LAG_MAX = "offset.lag.max";
- private static final String OFFSET_LAG_MAX_DOC = "How out-of-sync a remote
partition can be before it is resynced.";
+ private static final String OFFSET_LAG_MAX_DOC = "Determines the maximum
allowed lag between the source and remote partitions before MirrorMaker
initiates a resync operation to catch up the remote partition. The lag is
calculated as the difference between the latest offset in the source partition
and the last committed offset in the remote partition.\n" +
Review Comment:
> The lag is calculated as the difference between the latest offset in the
source partition and the last committed offset in the remote partition.
The lag is the difference between the latest offset in the source partition,
and the last sync emitted to the offset syncs topic.
##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java:
##########
@@ -101,7 +101,13 @@ public class MirrorSourceConfig extends
MirrorConnectorConfig {
public static final Class<?> CONFIG_PROPERTY_FILTER_CLASS_DEFAULT =
DefaultConfigPropertyFilter.class;
public static final String OFFSET_LAG_MAX = "offset.lag.max";
- private static final String OFFSET_LAG_MAX_DOC = "How out-of-sync a remote
partition can be before it is resynced.";
+ private static final String OFFSET_LAG_MAX_DOC = "Determines the maximum
allowed lag between the source and remote partitions before MirrorMaker
initiates a resync operation to catch up the remote partition. The lag is
calculated as the difference between the latest offset in the source partition
and the last committed offset in the remote partition.\n" +
+ "\n" +
Review Comment:
Not sure about these double newlines and how they render, how are you
previewing these changes?
##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java:
##########
@@ -101,7 +101,13 @@ public class MirrorSourceConfig extends
MirrorConnectorConfig {
public static final Class<?> CONFIG_PROPERTY_FILTER_CLASS_DEFAULT =
DefaultConfigPropertyFilter.class;
public static final String OFFSET_LAG_MAX = "offset.lag.max";
- private static final String OFFSET_LAG_MAX_DOC = "How out-of-sync a remote
partition can be before it is resynced.";
+ private static final String OFFSET_LAG_MAX_DOC = "Determines the maximum
allowed lag between the source and remote partitions before MirrorMaker
initiates a resync operation to catch up the remote partition. The lag is
calculated as the difference between the latest offset in the source partition
and the last committed offset in the remote partition.\n" +
+ "\n" +
+ "When the lag for a remote partition exceeds the
<code>offset.lag.max</code> value, MirrorMaker will initiate a resync operation
to catch up the remote partition with the source partition. This involves
reading records from the source partition starting from the last committed
offset in the remote partition and writing them to the remote partition.\n" +
+ "\n" +
+ "Setting <code>offset.lag.max</code> to a lower value can be
beneficial in scenarios where records may not flow constantly or at a
consistent rate, as it ensures the remote partitions stay more closely in sync
with the source partitions during periods of low throughput or inactivity. On
the other hand, setting it to a higher value can be useful when the source
topic has high throughput and the remote partitions can tolerate a larger
lag.\n" +
+ "\n" +
+ "It's also possible to set <code>offset.lag.max</code> to 0, which
will cause MirrorMaker to initiate a resync operation for a remote partition as
soon as it falls behind the source partition. This can be useful for strict
synchronization requirements but may increase the load on the source cluster
due to frequent resync operations.";
Review Comment:
Instead of saying "initate a resync operation" we can say "emit an offset
sync for every source record", mentioning that the additional load and extra
throughput on the offset sync topic specifically.
The comment about increased load on the source cluster can be inaccurate,
because the syncs topic may be present on the source or target cluster
depending on the value of offset-syncs.topic.location configuration.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]