vinayakphegde commented on code in PR #6788:
URL: https://github.com/apache/hbase/pull/6788#discussion_r2056431707


##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java:
##########
@@ -434,4 +434,15 @@ public void addContinuousBackupTableSet(Set<TableName> 
tables, long startTimesta
     throws IOException {
     systemTable.addContinuousBackupTableSet(tables, startTimestamp);
   }
+
+  /**
+   * Retrieves the current set of tables covered by continuous backup along 
with the timestamp
+   * indicating when continuous backup started for each table.
+   * @return a map where the key is the table name and the value is the 
timestamp representing the
+   *         start time of continuous backup for that table.
+   * @throws IOException if an I/O error occurs while accessing the backup 
system table.
+   */
+  public Map<TableName, Long> getContinuousBackupTableSet() throws IOException 
{

Review Comment:
   where are we using this method? 



##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java:
##########
@@ -565,7 +584,7 @@ public String backupTables(BackupRequest request) throws 
IOException {
         }
       }
       if (nonExistingTableList != null) {
-        if (type == BackupType.INCREMENTAL) {
+        if (type == BackupType.INCREMENTAL && 
!request.isContinuousBackupEnabled()) {

Review Comment:
   maybe we can put some comment to explain what is happening here? why it is 
okay for normal incremental backup to go through the process even if some 
tables are not existing and why it is not okay in case of continuous backup 
enabled incremental backup.



##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java:
##########
@@ -262,9 +271,13 @@ public void execute() throws IOException, 
ColumnFamilyMismatchException {
       // case PREPARE_INCREMENTAL:
       beginBackup(backupManager, backupInfo);
       backupInfo.setPhase(BackupPhase.PREPARE_INCREMENTAL);
-      LOG.debug("For incremental backup, current table set is "
-        + backupManager.getIncrementalBackupTableSet());
-      newTimestamps = ((IncrementalBackupManager) 
backupManager).getIncrBackupLogFileMap();
+      if (backupInfo.isContinuousBackupEnabled()) {
+        LOG.debug("For incremental backup, current table set is " + 
backupInfo.getTables());
+      } else {
+        LOG.debug("For incremental backup, current table set is "
+          + backupManager.getIncrementalBackupTableSet());
+        newTimestamps = ((IncrementalBackupManager) 
backupManager).getIncrBackupLogFileMap();
+      }

Review Comment:
   We should clarify this point in the comment or elsewhere in the code:
   
   - When continuous backup is enabled, incremental backups only consider the 
tables explicitly specified by the user.
   - For regular incremental backups, we consider all tables marked for 
incremental backup under the given backup root.
   
   We should also investigate the reasoning behind this behavior to gain better 
clarity.



##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java:
##########
@@ -124,6 +124,11 @@ public enum BackupPhase {
    */
   private long completeTs;
 
+  /**
+   * Committed WAL timestamp for incremental backup
+   */
+  private long incrCommittedWalTs;

Review Comment:
   what will be the value in case of normal incremental backup?



##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java:
##########
@@ -112,7 +113,9 @@ protected void beginBackup(BackupManager backupManager, 
BackupInfo backupInfo)
     backupManager.setBackupInfo(backupInfo);
     // set the start timestamp of the overall backup
     long startTs = EnvironmentEdgeManager.currentTime();
+    long committedWALsTs = BackupUtils.getReplicationCheckpoint(conn);

Review Comment:
   this will not be applicable in normal incremental backup, right?



##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java:
##########
@@ -282,8 +283,9 @@ private void backupWalEntries(long day, List<WAL.Entry> 
walEntries) throws IOExc
 
   private FSHLogProvider.Writer createWalWriter(long dayInMillis) {
     // Convert dayInMillis to "yyyy-MM-dd" format
-    SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);

Review Comment:
   won't just `dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));` work?



##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java:
##########
@@ -148,13 +148,16 @@ protected void snapshotCopy(BackupInfo backupInfo) throws 
IOException {
   public void execute() throws IOException {
     try (Admin admin = conn.getAdmin()) {
       beginBackup(backupManager, backupInfo);
+      initializeBackupStartCode(backupManager);
+      performLogRoll(admin);

Review Comment:
   Why are we doing these steps again here? 
   is it also needed for continuous backup enabled full backup?
   also, for normal full backup, these things are getting executed twice. once 
here and once in the `handleNonContinuousBackup` method. is that intentional?



##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java:
##########
@@ -517,28 +517,47 @@ public String backupTables(BackupRequest request) throws 
IOException {
 
     String backupId = BackupRestoreConstants.BACKUPID_PREFIX + 
EnvironmentEdgeManager.currentTime();
     if (type == BackupType.INCREMENTAL) {
-      Set<TableName> incrTableSet;
-      try (BackupSystemTable table = new BackupSystemTable(conn)) {
-        incrTableSet = table.getIncrementalBackupTableSet(targetRootDir);
-      }
+      if (request.isContinuousBackupEnabled()) {
+        Set<TableName> continuousBackupTableSet;
+        try (BackupSystemTable table = new BackupSystemTable(conn)) {
+          continuousBackupTableSet = 
table.getContinuousBackupTableSet().keySet();
+        }
+        if (continuousBackupTableSet.isEmpty()) {
+          String msg = "Continuous backup table set contains no tables. "
+            + "You need to run Continuous backup first "
+            + (tableList != null ? "on " + StringUtils.join(tableList, ",") : 
"");
+          throw new IOException(msg);
+        }
+        if (!continuousBackupTableSet.containsAll(tableList)) {
+          String extraTables = StringUtils.join(tableList, ",");
+          String msg = "Some tables (" + extraTables + ") haven't gone through 
Continuous backup. "
+            + "Perform Continuous backup on " + extraTables + " first, " + 
"then retry the command";
+          throw new IOException(msg);
+        }
+      } else {
+        Set<TableName> incrTableSet;
+        try (BackupSystemTable table = new BackupSystemTable(conn)) {
+          incrTableSet = table.getIncrementalBackupTableSet(targetRootDir);
+        }
 
-      if (incrTableSet.isEmpty()) {
-        String msg =
-          "Incremental backup table set contains no tables. " + "You need to 
run full backup first "
+        if (incrTableSet.isEmpty()) {
+          String msg = "Incremental backup table set contains no tables. "
+            + "You need to run full backup first "
             + (tableList != null ? "on " + StringUtils.join(tableList, ",") : 
"");
 
-        throw new IOException(msg);
-      }
-      if (tableList != null) {
-        tableList.removeAll(incrTableSet);
-        if (!tableList.isEmpty()) {
-          String extraTables = StringUtils.join(tableList, ",");
-          String msg = "Some tables (" + extraTables + ") haven't gone through 
full backup. "
-            + "Perform full backup on " + extraTables + " first, " + "then 
retry the command";
           throw new IOException(msg);
         }
+        if (tableList != null) {
+          tableList.removeAll(incrTableSet);
+          if (!tableList.isEmpty()) {
+            String extraTables = StringUtils.join(tableList, ",");
+            String msg = "Some tables (" + extraTables + ") haven't gone 
through full backup. "
+              + "Perform full backup on " + extraTables + " first, " + "then 
retry the command";
+            throw new IOException(msg);
+          }
+        }
+        tableList = Lists.newArrayList(incrTableSet);

Review Comment:
   Hi @rmdmattingly, I had a quick question for clarification — this is not 
related to the changes in this PR, but rather about the current incremental 
backup process.
   
   It seems like we're using incrTableSet to determine the tables for 
incremental backup. Could you help us understand why we consider all tables 
marked for incremental backup instead of just the user-specified tables?
   
   Understanding the reasoning behind this will help us better integrate the 
continuous backup functionality with the existing logic. Appreciate your 
insights — thank you!



##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java:
##########
@@ -362,23 +378,86 @@ protected void deleteBulkLoadDirectory() throws 
IOException {
   }
 
   protected void convertWALsToHFiles() throws IOException {
-    // get incremental backup file list and prepare parameters for DistCp
-    List<String> incrBackupFileList = backupInfo.getIncrBackupFileList();
-    // Get list of tables in incremental backup set
-    Set<TableName> tableSet = backupManager.getIncrementalBackupTableSet();
-    // filter missing files out (they have been copied by previous backups)
-    incrBackupFileList = filterMissingFiles(incrBackupFileList);
-    List<String> tableList = new ArrayList<String>();
-    for (TableName table : tableSet) {
-      // Check if table exists
-      if (tableExists(table, conn)) {
-        tableList.add(table.getNameAsString());
-      } else {
-        LOG.warn("Table " + table + " does not exists. Skipping in WAL 
converter");
+    if (backupInfo.isContinuousBackupEnabled()) {
+      Set<TableName> tableSet = backupInfo.getTables();
+      List<BackupInfo> backupInfos = backupManager.getBackupHistory(true);
+      for (TableName table : tableSet) {
+        for (BackupInfo backup : backupInfos) {
+          // find previous backup for this table
+          if (backup.getTables().contains(table)) {
+            LOG.info("Found previous backup of type {} with id {} for table 
{}", backup.getType(),
+              backup.getBackupId(), table.getNameAsString());
+            List<String> walBackupFileList;
+            if (backup.getType() == BackupType.FULL) {
+              walBackupFileList = getBackupLogs(backup.getStartTs());
+            } else {
+              walBackupFileList = 
getBackupLogs(backup.getIncrCommittedWalTs());
+            }
+            walToHFiles(walBackupFileList, 
Arrays.asList(table.getNameAsString()));
+            break;
+          }
+        }
+      }
+    } else {
+      // get incremental backup file list and prepare parameters for DistCp
+      List<String> incrBackupFileList = backupInfo.getIncrBackupFileList();
+      // Get list of tables in incremental backup set
+      Set<TableName> tableSet = backupManager.getIncrementalBackupTableSet();
+      // filter missing files out (they have been copied by previous backups)
+      incrBackupFileList = filterMissingFiles(incrBackupFileList);
+      List<String> tableList = new ArrayList<String>();
+      for (TableName table : tableSet) {
+        // Check if table exists
+        if (tableExists(table, conn)) {
+          tableList.add(table.getNameAsString());
+        } else {
+          LOG.warn("Table " + table + " does not exists. Skipping in WAL 
converter");
+        }
       }
+      walToHFiles(incrBackupFileList, tableList);
     }
-    walToHFiles(incrBackupFileList, tableList);
+  }
+
+  private List<String> getBackupLogs(long startTs) throws IOException {
+    // get log files from backup dir
+    String walBackupDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
+    if (walBackupDir == null || walBackupDir.isEmpty()) {
+      throw new IOException(
+        "Incremental backup requires the WAL backup directory " + 
CONF_CONTINUOUS_BACKUP_WAL_DIR);
+    }
+    List<String> resultLogFiles = new ArrayList<>();
+    Path walBackupPath = new Path(walBackupDir);
+    FileSystem backupFs = FileSystem.get(walBackupPath.toUri(), conf);
+    FileStatus[] dayDirs = backupFs.listStatus(new Path(walBackupDir, 
WALS_DIR));
+    SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);

Review Comment:
   we need to take care of that zone related thing(having the common/default 
zone across all operations to avoid inconsistencies) here as well.



##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java:
##########
@@ -148,13 +148,16 @@ protected void snapshotCopy(BackupInfo backupInfo) throws 
IOException {
   public void execute() throws IOException {
     try (Admin admin = conn.getAdmin()) {
       beginBackup(backupManager, backupInfo);
+      initializeBackupStartCode(backupManager);
+      performLogRoll(admin);

Review Comment:
   if these steps are required in both situations, then we can remove it in 
`handleNonContinuousBackup` and keep them here.



##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java:
##########
@@ -362,23 +378,86 @@ protected void deleteBulkLoadDirectory() throws 
IOException {
   }
 
   protected void convertWALsToHFiles() throws IOException {
-    // get incremental backup file list and prepare parameters for DistCp
-    List<String> incrBackupFileList = backupInfo.getIncrBackupFileList();
-    // Get list of tables in incremental backup set
-    Set<TableName> tableSet = backupManager.getIncrementalBackupTableSet();
-    // filter missing files out (they have been copied by previous backups)
-    incrBackupFileList = filterMissingFiles(incrBackupFileList);
-    List<String> tableList = new ArrayList<String>();
-    for (TableName table : tableSet) {
-      // Check if table exists
-      if (tableExists(table, conn)) {
-        tableList.add(table.getNameAsString());
-      } else {
-        LOG.warn("Table " + table + " does not exists. Skipping in WAL 
converter");
+    if (backupInfo.isContinuousBackupEnabled()) {
+      Set<TableName> tableSet = backupInfo.getTables();
+      List<BackupInfo> backupInfos = backupManager.getBackupHistory(true);
+      for (TableName table : tableSet) {
+        for (BackupInfo backup : backupInfos) {
+          // find previous backup for this table
+          if (backup.getTables().contains(table)) {
+            LOG.info("Found previous backup of type {} with id {} for table 
{}", backup.getType(),
+              backup.getBackupId(), table.getNameAsString());
+            List<String> walBackupFileList;
+            if (backup.getType() == BackupType.FULL) {
+              walBackupFileList = getBackupLogs(backup.getStartTs());
+            } else {
+              walBackupFileList = 
getBackupLogs(backup.getIncrCommittedWalTs());
+            }
+            walToHFiles(walBackupFileList, 
Arrays.asList(table.getNameAsString()));
+            break;
+          }
+        }
+      }
+    } else {
+      // get incremental backup file list and prepare parameters for DistCp
+      List<String> incrBackupFileList = backupInfo.getIncrBackupFileList();
+      // Get list of tables in incremental backup set
+      Set<TableName> tableSet = backupManager.getIncrementalBackupTableSet();
+      // filter missing files out (they have been copied by previous backups)
+      incrBackupFileList = filterMissingFiles(incrBackupFileList);
+      List<String> tableList = new ArrayList<String>();
+      for (TableName table : tableSet) {
+        // Check if table exists
+        if (tableExists(table, conn)) {
+          tableList.add(table.getNameAsString());
+        } else {
+          LOG.warn("Table " + table + " does not exists. Skipping in WAL 
converter");
+        }
       }
+      walToHFiles(incrBackupFileList, tableList);
     }
-    walToHFiles(incrBackupFileList, tableList);
+  }
+
+  private List<String> getBackupLogs(long startTs) throws IOException {
+    // get log files from backup dir
+    String walBackupDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
+    if (walBackupDir == null || walBackupDir.isEmpty()) {
+      throw new IOException(
+        "Incremental backup requires the WAL backup directory " + 
CONF_CONTINUOUS_BACKUP_WAL_DIR);
+    }
+    List<String> resultLogFiles = new ArrayList<>();
+    Path walBackupPath = new Path(walBackupDir);
+    FileSystem backupFs = FileSystem.get(walBackupPath.toUri(), conf);
+    FileStatus[] dayDirs = backupFs.listStatus(new Path(walBackupDir, 
WALS_DIR));
+    SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+
+    for (FileStatus dayDir : dayDirs) {
+      if (!dayDir.isDirectory()) {
+        continue; // Skip files, only process directories
+      }
 
+      String dirName = dayDir.getPath().getName();
+      try {
+        Date dirDate = dateFormat.parse(dirName);
+        long dirStartTime = dirDate.getTime(); // Start of that day (00:00:00)
+        long dirEndTime = dirStartTime + ONE_DAY_IN_MILLISECONDS - 1; // End 
time of the day
+        // (23:59:59)
+
+        if (dirEndTime >= startTs) {
+          Path dirPath = dayDir.getPath();
+          FileStatus[] logs = backupFs.listStatus(dirPath);
+          ;
+          for (FileStatus log : logs) {
+            String filepath = log.getPath().toString();
+            LOG.debug("currentLogFile: " + filepath);
+            resultLogFiles.add(filepath);
+          }
+        }

Review Comment:
   This part currently considers only the **day boundary**. If the requested 
start time falls in the middle of a day, the logic still includes the **entire 
day’s data**. As far as I can tell, we are not filtering based on time in 
`WALPlayer` either, even though `WALPlayer` does support specifying both start 
and end times.
   
   Regarding the **start time**, we should verify whether continuous backup 
(i.e., WALs) is actually available from that point onward. For example, if the 
requested start time is 18:30 but WALs for the table are only available 
starting from 20:00, this could cause issues. While this scenario ideally 
shouldn't happen, it’s better to handle it defensively.
   
   As for the **end time**, we should explicitly pass it to `WALPlayer`. 
Currently, we are only passing the directories, which is problematic because 
continuous backup might still be writing new WAL files to the current day's 
folder. This can cause inconsistency between the last WAL processed by 
`WALPlayer` and the committed timestamp we record.
   
   For example:
   - At the beginning of the incremental backup, we record a committed 
timestamp (`t1`) as the replication checkpoint.
   - While processing, the checkpoint advances to a later time (`t2`), meaning 
more data is backed up than what was originally captured at `t1`.
   - As a result, the actual committed timestamp is `t2`, but we report `t1`, 
leading to **extra data being backed up** beyond what the timestamp reflects.
   
   ```
                         t1                      t2
   -----------------------|----------------------|--> Time
   ```
   
   In both cases—start and end time—this can result in inconsistency. 
   
   Please take a look and share your thoughts!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to