vinayakphegde commented on code in PR #6788:
URL: https://github.com/apache/hbase/pull/6788#discussion_r2056431707
##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java:
##########
@@ -434,4 +434,15 @@ public void addContinuousBackupTableSet(Set<TableName>
tables, long startTimesta
throws IOException {
systemTable.addContinuousBackupTableSet(tables, startTimestamp);
}
+
+ /**
+ * Retrieves the current set of tables covered by continuous backup along
with the timestamp
+ * indicating when continuous backup started for each table.
+ * @return a map where the key is the table name and the value is the
timestamp representing the
+ * start time of continuous backup for that table.
+ * @throws IOException if an I/O error occurs while accessing the backup
system table.
+ */
+ public Map<TableName, Long> getContinuousBackupTableSet() throws IOException
{
Review Comment:
where are we using this method?
##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java:
##########
@@ -565,7 +584,7 @@ public String backupTables(BackupRequest request) throws
IOException {
}
}
if (nonExistingTableList != null) {
- if (type == BackupType.INCREMENTAL) {
+ if (type == BackupType.INCREMENTAL &&
!request.isContinuousBackupEnabled()) {
Review Comment:
maybe we can put some comment to explain what is happening here? why it is
okay for normal incremental backup to go through the process even if some
tables are not existing and why it is not okay in case of continuous backup
enabled incremental backup.
##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java:
##########
@@ -262,9 +271,13 @@ public void execute() throws IOException,
ColumnFamilyMismatchException {
// case PREPARE_INCREMENTAL:
beginBackup(backupManager, backupInfo);
backupInfo.setPhase(BackupPhase.PREPARE_INCREMENTAL);
- LOG.debug("For incremental backup, current table set is "
- + backupManager.getIncrementalBackupTableSet());
- newTimestamps = ((IncrementalBackupManager)
backupManager).getIncrBackupLogFileMap();
+ if (backupInfo.isContinuousBackupEnabled()) {
+ LOG.debug("For incremental backup, current table set is " +
backupInfo.getTables());
+ } else {
+ LOG.debug("For incremental backup, current table set is "
+ + backupManager.getIncrementalBackupTableSet());
+ newTimestamps = ((IncrementalBackupManager)
backupManager).getIncrBackupLogFileMap();
+ }
Review Comment:
We should clarify this point in the comment or elsewhere in the code:
- When continuous backup is enabled, incremental backups only consider the
tables explicitly specified by the user.
- For regular incremental backups, we consider all tables marked for
incremental backup under the given backup root.
We should also investigate the reasoning behind this behavior to gain better
clarity.
##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java:
##########
@@ -124,6 +124,11 @@ public enum BackupPhase {
*/
private long completeTs;
+ /**
+ * Committed WAL timestamp for incremental backup
+ */
+ private long incrCommittedWalTs;
Review Comment:
what will be the value in case of normal incremental backup?
##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java:
##########
@@ -112,7 +113,9 @@ protected void beginBackup(BackupManager backupManager,
BackupInfo backupInfo)
backupManager.setBackupInfo(backupInfo);
// set the start timestamp of the overall backup
long startTs = EnvironmentEdgeManager.currentTime();
+ long committedWALsTs = BackupUtils.getReplicationCheckpoint(conn);
Review Comment:
this will not be applicable in normal incremental backup, right?
##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java:
##########
@@ -282,8 +283,9 @@ private void backupWalEntries(long day, List<WAL.Entry>
walEntries) throws IOExc
private FSHLogProvider.Writer createWalWriter(long dayInMillis) {
// Convert dayInMillis to "yyyy-MM-dd" format
- SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
Review Comment:
won't just `dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));` work?
##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java:
##########
@@ -148,13 +148,16 @@ protected void snapshotCopy(BackupInfo backupInfo) throws
IOException {
public void execute() throws IOException {
try (Admin admin = conn.getAdmin()) {
beginBackup(backupManager, backupInfo);
+ initializeBackupStartCode(backupManager);
+ performLogRoll(admin);
Review Comment:
Why are we doing these steps again here?
is it also needed for continuous backup enabled full backup?
also, for normal full backup, these things are getting executed twice. once
here and once in the `handleNonContinuousBackup` method. is that intentional?
##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java:
##########
@@ -517,28 +517,47 @@ public String backupTables(BackupRequest request) throws
IOException {
String backupId = BackupRestoreConstants.BACKUPID_PREFIX +
EnvironmentEdgeManager.currentTime();
if (type == BackupType.INCREMENTAL) {
- Set<TableName> incrTableSet;
- try (BackupSystemTable table = new BackupSystemTable(conn)) {
- incrTableSet = table.getIncrementalBackupTableSet(targetRootDir);
- }
+ if (request.isContinuousBackupEnabled()) {
+ Set<TableName> continuousBackupTableSet;
+ try (BackupSystemTable table = new BackupSystemTable(conn)) {
+ continuousBackupTableSet =
table.getContinuousBackupTableSet().keySet();
+ }
+ if (continuousBackupTableSet.isEmpty()) {
+ String msg = "Continuous backup table set contains no tables. "
+ + "You need to run Continuous backup first "
+ + (tableList != null ? "on " + StringUtils.join(tableList, ",") :
"");
+ throw new IOException(msg);
+ }
+ if (!continuousBackupTableSet.containsAll(tableList)) {
+ String extraTables = StringUtils.join(tableList, ",");
+ String msg = "Some tables (" + extraTables + ") haven't gone through
Continuous backup. "
+ + "Perform Continuous backup on " + extraTables + " first, " +
"then retry the command";
+ throw new IOException(msg);
+ }
+ } else {
+ Set<TableName> incrTableSet;
+ try (BackupSystemTable table = new BackupSystemTable(conn)) {
+ incrTableSet = table.getIncrementalBackupTableSet(targetRootDir);
+ }
- if (incrTableSet.isEmpty()) {
- String msg =
- "Incremental backup table set contains no tables. " + "You need to
run full backup first "
+ if (incrTableSet.isEmpty()) {
+ String msg = "Incremental backup table set contains no tables. "
+ + "You need to run full backup first "
+ (tableList != null ? "on " + StringUtils.join(tableList, ",") :
"");
- throw new IOException(msg);
- }
- if (tableList != null) {
- tableList.removeAll(incrTableSet);
- if (!tableList.isEmpty()) {
- String extraTables = StringUtils.join(tableList, ",");
- String msg = "Some tables (" + extraTables + ") haven't gone through
full backup. "
- + "Perform full backup on " + extraTables + " first, " + "then
retry the command";
throw new IOException(msg);
}
+ if (tableList != null) {
+ tableList.removeAll(incrTableSet);
+ if (!tableList.isEmpty()) {
+ String extraTables = StringUtils.join(tableList, ",");
+ String msg = "Some tables (" + extraTables + ") haven't gone
through full backup. "
+ + "Perform full backup on " + extraTables + " first, " + "then
retry the command";
+ throw new IOException(msg);
+ }
+ }
+ tableList = Lists.newArrayList(incrTableSet);
Review Comment:
Hi @rmdmattingly, I had a quick question for clarification — this is not
related to the changes in this PR, but rather about the current incremental
backup process.
It seems like we're using incrTableSet to determine the tables for
incremental backup. Could you help us understand why we consider all tables
marked for incremental backup instead of just the user-specified tables?
Understanding the reasoning behind this will help us better integrate the
continuous backup functionality with the existing logic. Appreciate your
insights — thank you!
##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java:
##########
@@ -362,23 +378,86 @@ protected void deleteBulkLoadDirectory() throws
IOException {
}
protected void convertWALsToHFiles() throws IOException {
- // get incremental backup file list and prepare parameters for DistCp
- List<String> incrBackupFileList = backupInfo.getIncrBackupFileList();
- // Get list of tables in incremental backup set
- Set<TableName> tableSet = backupManager.getIncrementalBackupTableSet();
- // filter missing files out (they have been copied by previous backups)
- incrBackupFileList = filterMissingFiles(incrBackupFileList);
- List<String> tableList = new ArrayList<String>();
- for (TableName table : tableSet) {
- // Check if table exists
- if (tableExists(table, conn)) {
- tableList.add(table.getNameAsString());
- } else {
- LOG.warn("Table " + table + " does not exists. Skipping in WAL
converter");
+ if (backupInfo.isContinuousBackupEnabled()) {
+ Set<TableName> tableSet = backupInfo.getTables();
+ List<BackupInfo> backupInfos = backupManager.getBackupHistory(true);
+ for (TableName table : tableSet) {
+ for (BackupInfo backup : backupInfos) {
+ // find previous backup for this table
+ if (backup.getTables().contains(table)) {
+ LOG.info("Found previous backup of type {} with id {} for table
{}", backup.getType(),
+ backup.getBackupId(), table.getNameAsString());
+ List<String> walBackupFileList;
+ if (backup.getType() == BackupType.FULL) {
+ walBackupFileList = getBackupLogs(backup.getStartTs());
+ } else {
+ walBackupFileList =
getBackupLogs(backup.getIncrCommittedWalTs());
+ }
+ walToHFiles(walBackupFileList,
Arrays.asList(table.getNameAsString()));
+ break;
+ }
+ }
+ }
+ } else {
+ // get incremental backup file list and prepare parameters for DistCp
+ List<String> incrBackupFileList = backupInfo.getIncrBackupFileList();
+ // Get list of tables in incremental backup set
+ Set<TableName> tableSet = backupManager.getIncrementalBackupTableSet();
+ // filter missing files out (they have been copied by previous backups)
+ incrBackupFileList = filterMissingFiles(incrBackupFileList);
+ List<String> tableList = new ArrayList<String>();
+ for (TableName table : tableSet) {
+ // Check if table exists
+ if (tableExists(table, conn)) {
+ tableList.add(table.getNameAsString());
+ } else {
+ LOG.warn("Table " + table + " does not exists. Skipping in WAL
converter");
+ }
}
+ walToHFiles(incrBackupFileList, tableList);
}
- walToHFiles(incrBackupFileList, tableList);
+ }
+
+ private List<String> getBackupLogs(long startTs) throws IOException {
+ // get log files from backup dir
+ String walBackupDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
+ if (walBackupDir == null || walBackupDir.isEmpty()) {
+ throw new IOException(
+ "Incremental backup requires the WAL backup directory " +
CONF_CONTINUOUS_BACKUP_WAL_DIR);
+ }
+ List<String> resultLogFiles = new ArrayList<>();
+ Path walBackupPath = new Path(walBackupDir);
+ FileSystem backupFs = FileSystem.get(walBackupPath.toUri(), conf);
+ FileStatus[] dayDirs = backupFs.listStatus(new Path(walBackupDir,
WALS_DIR));
+ SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
Review Comment:
we need to take care of that zone related thing(having the common/default
zone across all operations to avoid inconsistencies) here as well.
##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java:
##########
@@ -148,13 +148,16 @@ protected void snapshotCopy(BackupInfo backupInfo) throws
IOException {
public void execute() throws IOException {
try (Admin admin = conn.getAdmin()) {
beginBackup(backupManager, backupInfo);
+ initializeBackupStartCode(backupManager);
+ performLogRoll(admin);
Review Comment:
if these steps are required in both situations, then we can remove it in
`handleNonContinuousBackup` and keep them here.
##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java:
##########
@@ -362,23 +378,86 @@ protected void deleteBulkLoadDirectory() throws
IOException {
}
protected void convertWALsToHFiles() throws IOException {
- // get incremental backup file list and prepare parameters for DistCp
- List<String> incrBackupFileList = backupInfo.getIncrBackupFileList();
- // Get list of tables in incremental backup set
- Set<TableName> tableSet = backupManager.getIncrementalBackupTableSet();
- // filter missing files out (they have been copied by previous backups)
- incrBackupFileList = filterMissingFiles(incrBackupFileList);
- List<String> tableList = new ArrayList<String>();
- for (TableName table : tableSet) {
- // Check if table exists
- if (tableExists(table, conn)) {
- tableList.add(table.getNameAsString());
- } else {
- LOG.warn("Table " + table + " does not exists. Skipping in WAL
converter");
+ if (backupInfo.isContinuousBackupEnabled()) {
+ Set<TableName> tableSet = backupInfo.getTables();
+ List<BackupInfo> backupInfos = backupManager.getBackupHistory(true);
+ for (TableName table : tableSet) {
+ for (BackupInfo backup : backupInfos) {
+ // find previous backup for this table
+ if (backup.getTables().contains(table)) {
+ LOG.info("Found previous backup of type {} with id {} for table
{}", backup.getType(),
+ backup.getBackupId(), table.getNameAsString());
+ List<String> walBackupFileList;
+ if (backup.getType() == BackupType.FULL) {
+ walBackupFileList = getBackupLogs(backup.getStartTs());
+ } else {
+ walBackupFileList =
getBackupLogs(backup.getIncrCommittedWalTs());
+ }
+ walToHFiles(walBackupFileList,
Arrays.asList(table.getNameAsString()));
+ break;
+ }
+ }
+ }
+ } else {
+ // get incremental backup file list and prepare parameters for DistCp
+ List<String> incrBackupFileList = backupInfo.getIncrBackupFileList();
+ // Get list of tables in incremental backup set
+ Set<TableName> tableSet = backupManager.getIncrementalBackupTableSet();
+ // filter missing files out (they have been copied by previous backups)
+ incrBackupFileList = filterMissingFiles(incrBackupFileList);
+ List<String> tableList = new ArrayList<String>();
+ for (TableName table : tableSet) {
+ // Check if table exists
+ if (tableExists(table, conn)) {
+ tableList.add(table.getNameAsString());
+ } else {
+ LOG.warn("Table " + table + " does not exists. Skipping in WAL
converter");
+ }
}
+ walToHFiles(incrBackupFileList, tableList);
}
- walToHFiles(incrBackupFileList, tableList);
+ }
+
+ private List<String> getBackupLogs(long startTs) throws IOException {
+ // get log files from backup dir
+ String walBackupDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
+ if (walBackupDir == null || walBackupDir.isEmpty()) {
+ throw new IOException(
+ "Incremental backup requires the WAL backup directory " +
CONF_CONTINUOUS_BACKUP_WAL_DIR);
+ }
+ List<String> resultLogFiles = new ArrayList<>();
+ Path walBackupPath = new Path(walBackupDir);
+ FileSystem backupFs = FileSystem.get(walBackupPath.toUri(), conf);
+ FileStatus[] dayDirs = backupFs.listStatus(new Path(walBackupDir,
WALS_DIR));
+ SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+
+ for (FileStatus dayDir : dayDirs) {
+ if (!dayDir.isDirectory()) {
+ continue; // Skip files, only process directories
+ }
+ String dirName = dayDir.getPath().getName();
+ try {
+ Date dirDate = dateFormat.parse(dirName);
+ long dirStartTime = dirDate.getTime(); // Start of that day (00:00:00)
+ long dirEndTime = dirStartTime + ONE_DAY_IN_MILLISECONDS - 1; // End
time of the day
+ // (23:59:59)
+
+ if (dirEndTime >= startTs) {
+ Path dirPath = dayDir.getPath();
+ FileStatus[] logs = backupFs.listStatus(dirPath);
+ ;
+ for (FileStatus log : logs) {
+ String filepath = log.getPath().toString();
+ LOG.debug("currentLogFile: " + filepath);
+ resultLogFiles.add(filepath);
+ }
+ }
Review Comment:
This part currently considers only the **day boundary**. If the requested
start time falls in the middle of a day, the logic still includes the **entire
day’s data**. As far as I can tell, we are not filtering based on time in
`WALPlayer` either, even though `WALPlayer` does support specifying both start
and end times.
Regarding the **start time**, we should verify whether continuous backup
(i.e., WALs) is actually available from that point onward. For example, if the
requested start time is 18:30 but WALs for the table are only available
starting from 20:00, this could cause issues. While this scenario ideally
shouldn't happen, it’s better to handle it defensively.
As for the **end time**, we should explicitly pass it to `WALPlayer`.
Currently, we are only passing the directories, which is problematic because
continuous backup might still be writing new WAL files to the current day's
folder. This can cause inconsistency between the last WAL processed by
`WALPlayer` and the committed timestamp we record.
For example:
- At the beginning of the incremental backup, we record a committed
timestamp (`t1`) as the replication checkpoint.
- While processing, the checkpoint advances to a later time (`t2`), meaning
more data is backed up than what was originally captured at `t1`.
- As a result, the actual committed timestamp is `t2`, but we report `t1`,
leading to **extra data being backed up** beyond what the timestamp reflects.
```
t1 t2
-----------------------|----------------------|--> Time
```
In both cases—start and end time—this can result in inconsistency.
Please take a look and share your thoughts!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]