kgeisz commented on code in PR #6717:
URL: https://github.com/apache/hbase/pull/6717#discussion_r2065030821


##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java:
##########
@@ -770,4 +789,156 @@ public static String findMostRecentBackupId(String[] 
backupIds) {
     return BackupRestoreConstants.BACKUPID_PREFIX + recentTimestamp;
   }
 
+  /**
+   * Calculates the replication checkpoint timestamp used for continuous 
backup.
+   * <p>
+   * A replication checkpoint is the earliest timestamp across all region 
servers such that every
+   * WAL entry before that point is known to be replicated to the target 
system. This is essential
+   * for features like Point-in-Time Restore (PITR) and incremental backups, 
where we want to
+   * confidently restore data to a consistent state without missing updates.
+   * <p>
+   * The checkpoint is calculated using a combination of:
+   * <ul>
+   * <li>The start timestamps of WAL files currently being replicated for each 
server.</li>
+   * <li>The latest successfully replicated timestamp recorded by the 
replication marker chore.</li>
+   * </ul>
+   * <p>
+   * We combine these two sources to handle the following challenges:
+   * <ul>
+   * <li><b>Stale WAL start times:</b> If replication traffic is low or WALs 
are long-lived, the
+   * replication offset may point to the same WAL for a long time, resulting 
in stale timestamps
+   * that underestimate progress. This could delay PITR unnecessarily.</li>
+   * <li><b>Limitations of marker-only tracking:</b> The replication marker 
chore stores the last
+   * successfully replicated timestamp per region server in a system table. 
However, this data may
+   * become stale if the server goes offline or region ownership changes. For 
example, if a region
+   * initially belonged to rs1 and was later moved to rs4 due to re-balancing, 
rs1’s marker would
+   * persist even though it no longer holds any regions. Relying solely on 
these stale markers could
+   * lead to incorrect or outdated checkpoints.</li>
+   * </ul>
+   * <p>
+   * To handle these limitations, the method:
+   * <ol>
+   * <li>Verifies that the continuous backup peer exists to ensure replication 
is enabled.</li>
+   * <li>Retrieves WAL replication queue information for the peer, collecting 
WAL start times per
+   * region server. This gives us a lower bound for replication progress.</li>
+   * <li>Reads the marker chore's replicated timestamps from the backup system 
table.</li>
+   * <li>For servers found in both sources, if the marker timestamp is more 
recent than the WAL's
+   * start timestamp, we use the marker (since replication has progressed 
beyond the WAL).</li>
+   * <li>We discard marker entries for region servers that are not present in 
WAL queues, assuming
+   * those servers are no longer relevant (e.g., decommissioned or 
reassigned).</li>
+   * <li>The checkpoint is the minimum of all chosen timestamps — i.e., the 
slowest replicating
+   * region server.</li>
+   * <li>Finally, we persist the updated marker information to include any 
newly participating
+   * region servers.</li>
+   * </ol>
+   * <p>
+   * Note: If the replication marker chore is disabled, we fall back to using 
only the WAL start
+   * times. This ensures correctness but may lead to conservative checkpoint 
estimates during idle
+   * periods.
+   * @param conn the HBase connection
+   * @return the calculated replication checkpoint timestamp
+   * @throws IOException if reading replication queues or updating the backup 
system table fails
+   */
+  public static long getReplicationCheckpoint(Connection conn) throws 
IOException {
+    Configuration conf = conn.getConfiguration();
+    long checkpoint = EnvironmentEdgeManager.getDelegate().currentTime();
+
+    // Step 1: Ensure the continuous backup replication peer exists
+    if (!continuousBackupReplicationPeerExists(conn.getAdmin())) {
+      String msg = "Replication peer '" + CONTINUOUS_BACKUP_REPLICATION_PEER
+        + "' not found. Continuous backup not enabled.";
+      LOG.error(msg);
+      throw new IOException(msg);
+    }
+
+    // Step 2: Get all replication queues for the continuous backup peer
+    ReplicationQueueStorage queueStorage =
+      ReplicationStorageFactory.getReplicationQueueStorage(conn, conf);
+
+    List<ReplicationQueueId> queueIds;
+    try {
+      queueIds = 
queueStorage.listAllQueueIds(CONTINUOUS_BACKUP_REPLICATION_PEER);
+    } catch (ReplicationException e) {
+      String msg = "Failed to retrieve replication queue IDs for peer '"
+        + CONTINUOUS_BACKUP_REPLICATION_PEER + "'";
+      LOG.error(msg, e);
+      throw new IOException(msg, e);
+    }
+
+    if (queueIds.isEmpty()) {
+      String msg = "Replication peer '" + CONTINUOUS_BACKUP_REPLICATION_PEER + 
"' has no queues. "
+        + "This may indicate that continuous backup replication is not 
initialized correctly.";
+      LOG.error(msg);
+      throw new IOException(msg);
+    }
+
+    // Step 3: Build a map of ServerName -> WAL start timestamp (lowest seen 
per server)
+    Map<ServerName, Long> serverToCheckpoint = new HashMap<>();
+    for (ReplicationQueueId queueId : queueIds) {
+      Map<String, ReplicationGroupOffset> offsets;
+      try {
+        offsets = queueStorage.getOffsets(queueId);
+      } catch (ReplicationException e) {
+        String msg = "Failed to fetch WAL offsets for replication queue: " + 
queueId;
+        LOG.error(msg, e);
+        throw new IOException(msg, e);
+      }
+
+      for (ReplicationGroupOffset offset : offsets.values()) {
+        String walFile = offset.getWal();
+        long ts = AbstractFSWALProvider.getTimestamp(walFile); // WAL creation 
time
+        ServerName server = queueId.getServerName();
+        // Store the minimum timestamp per server (ts - 1 to avoid edge 
boundary issues)
+        serverToCheckpoint.merge(server, ts - 1, Math::min);
+      }
+    }
+
+    // Step 4: If replication markers are enabled, overlay fresher timestamps 
from backup system
+    // table
+    boolean replicationMarkerEnabled =
+      conf.getBoolean(REPLICATION_MARKER_ENABLED_KEY, 
REPLICATION_MARKER_ENABLED_DEFAULT);
+    if (replicationMarkerEnabled) {
+      try (BackupSystemTable backupSystemTable = new BackupSystemTable(conn)) {
+        Map<ServerName, Long> markerTimestamps = 
backupSystemTable.getBackupCheckpointTimestamps();
+
+        for (Map.Entry<ServerName, Long> entry : markerTimestamps.entrySet()) {
+          ServerName server = entry.getKey();
+          long markerTs = entry.getValue();
+
+          // If marker timestamp is newer, override
+          if (serverToCheckpoint.containsKey(server)) {
+            long current = serverToCheckpoint.get(server);
+            if (markerTs > current) {
+              serverToCheckpoint.put(server, markerTs);
+            }
+          } else {
+            // This server is no longer active (e.g., RS moved or removed); 
skip
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Skipping replication marker timestamp for invalid 
server: {}", server);

Review Comment:
   Is the`LOG.isDebugEnabled()` check necessary here?  Shouldn't log4j just not 
log this message if the log level is set to INFO or higher?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to