kgeisz commented on code in PR #6717:
URL: https://github.com/apache/hbase/pull/6717#discussion_r2065030821
##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java:
##########
@@ -770,4 +789,156 @@ public static String findMostRecentBackupId(String[]
backupIds) {
return BackupRestoreConstants.BACKUPID_PREFIX + recentTimestamp;
}
+ /**
+ * Calculates the replication checkpoint timestamp used for continuous
backup.
+ * <p>
+ * A replication checkpoint is the earliest timestamp across all region
servers such that every
+ * WAL entry before that point is known to be replicated to the target
system. This is essential
+ * for features like Point-in-Time Restore (PITR) and incremental backups,
where we want to
+ * confidently restore data to a consistent state without missing updates.
+ * <p>
+ * The checkpoint is calculated using a combination of:
+ * <ul>
+ * <li>The start timestamps of WAL files currently being replicated for each
server.</li>
+ * <li>The latest successfully replicated timestamp recorded by the
replication marker chore.</li>
+ * </ul>
+ * <p>
+ * We combine these two sources to handle the following challenges:
+ * <ul>
+ * <li><b>Stale WAL start times:</b> If replication traffic is low or WALs
are long-lived, the
+ * replication offset may point to the same WAL for a long time, resulting
in stale timestamps
+ * that underestimate progress. This could delay PITR unnecessarily.</li>
+ * <li><b>Limitations of marker-only tracking:</b> The replication marker
chore stores the last
+ * successfully replicated timestamp per region server in a system table.
However, this data may
+ * become stale if the server goes offline or region ownership changes. For
example, if a region
+ * initially belonged to rs1 and was later moved to rs4 due to re-balancing,
rs1’s marker would
+ * persist even though it no longer holds any regions. Relying solely on
these stale markers could
+ * lead to incorrect or outdated checkpoints.</li>
+ * </ul>
+ * <p>
+ * To handle these limitations, the method:
+ * <ol>
+ * <li>Verifies that the continuous backup peer exists to ensure replication
is enabled.</li>
+ * <li>Retrieves WAL replication queue information for the peer, collecting
WAL start times per
+ * region server. This gives us a lower bound for replication progress.</li>
+ * <li>Reads the marker chore's replicated timestamps from the backup system
table.</li>
+ * <li>For servers found in both sources, if the marker timestamp is more
recent than the WAL's
+ * start timestamp, we use the marker (since replication has progressed
beyond the WAL).</li>
+ * <li>We discard marker entries for region servers that are not present in
WAL queues, assuming
+ * those servers are no longer relevant (e.g., decommissioned or
reassigned).</li>
+ * <li>The checkpoint is the minimum of all chosen timestamps — i.e., the
slowest replicating
+ * region server.</li>
+ * <li>Finally, we persist the updated marker information to include any
newly participating
+ * region servers.</li>
+ * </ol>
+ * <p>
+ * Note: If the replication marker chore is disabled, we fall back to using
only the WAL start
+ * times. This ensures correctness but may lead to conservative checkpoint
estimates during idle
+ * periods.
+ * @param conn the HBase connection
+ * @return the calculated replication checkpoint timestamp
+ * @throws IOException if reading replication queues or updating the backup
system table fails
+ */
+ public static long getReplicationCheckpoint(Connection conn) throws
IOException {
+ Configuration conf = conn.getConfiguration();
+ long checkpoint = EnvironmentEdgeManager.getDelegate().currentTime();
+
+ // Step 1: Ensure the continuous backup replication peer exists
+ if (!continuousBackupReplicationPeerExists(conn.getAdmin())) {
+ String msg = "Replication peer '" + CONTINUOUS_BACKUP_REPLICATION_PEER
+ + "' not found. Continuous backup not enabled.";
+ LOG.error(msg);
+ throw new IOException(msg);
+ }
+
+ // Step 2: Get all replication queues for the continuous backup peer
+ ReplicationQueueStorage queueStorage =
+ ReplicationStorageFactory.getReplicationQueueStorage(conn, conf);
+
+ List<ReplicationQueueId> queueIds;
+ try {
+ queueIds =
queueStorage.listAllQueueIds(CONTINUOUS_BACKUP_REPLICATION_PEER);
+ } catch (ReplicationException e) {
+ String msg = "Failed to retrieve replication queue IDs for peer '"
+ + CONTINUOUS_BACKUP_REPLICATION_PEER + "'";
+ LOG.error(msg, e);
+ throw new IOException(msg, e);
+ }
+
+ if (queueIds.isEmpty()) {
+ String msg = "Replication peer '" + CONTINUOUS_BACKUP_REPLICATION_PEER +
"' has no queues. "
+ + "This may indicate that continuous backup replication is not
initialized correctly.";
+ LOG.error(msg);
+ throw new IOException(msg);
+ }
+
+ // Step 3: Build a map of ServerName -> WAL start timestamp (lowest seen
per server)
+ Map<ServerName, Long> serverToCheckpoint = new HashMap<>();
+ for (ReplicationQueueId queueId : queueIds) {
+ Map<String, ReplicationGroupOffset> offsets;
+ try {
+ offsets = queueStorage.getOffsets(queueId);
+ } catch (ReplicationException e) {
+ String msg = "Failed to fetch WAL offsets for replication queue: " +
queueId;
+ LOG.error(msg, e);
+ throw new IOException(msg, e);
+ }
+
+ for (ReplicationGroupOffset offset : offsets.values()) {
+ String walFile = offset.getWal();
+ long ts = AbstractFSWALProvider.getTimestamp(walFile); // WAL creation
time
+ ServerName server = queueId.getServerName();
+ // Store the minimum timestamp per server (ts - 1 to avoid edge
boundary issues)
+ serverToCheckpoint.merge(server, ts - 1, Math::min);
+ }
+ }
+
+ // Step 4: If replication markers are enabled, overlay fresher timestamps
from backup system
+ // table
+ boolean replicationMarkerEnabled =
+ conf.getBoolean(REPLICATION_MARKER_ENABLED_KEY,
REPLICATION_MARKER_ENABLED_DEFAULT);
+ if (replicationMarkerEnabled) {
+ try (BackupSystemTable backupSystemTable = new BackupSystemTable(conn)) {
+ Map<ServerName, Long> markerTimestamps =
backupSystemTable.getBackupCheckpointTimestamps();
+
+ for (Map.Entry<ServerName, Long> entry : markerTimestamps.entrySet()) {
+ ServerName server = entry.getKey();
+ long markerTs = entry.getValue();
+
+ // If marker timestamp is newer, override
+ if (serverToCheckpoint.containsKey(server)) {
+ long current = serverToCheckpoint.get(server);
+ if (markerTs > current) {
+ serverToCheckpoint.put(server, markerTs);
+ }
+ } else {
+ // This server is no longer active (e.g., RS moved or removed);
skip
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skipping replication marker timestamp for invalid
server: {}", server);
Review Comment:
Is the`LOG.isDebugEnabled()` check necessary here? Shouldn't log4j just not
log this message if the log level is set to INFO or higher?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]