kgeisz commented on code in PR #6788:
URL: https://github.com/apache/hbase/pull/6788#discussion_r2112770649
##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java:
##########
@@ -517,28 +517,47 @@ public String backupTables(BackupRequest request) throws
IOException {
String backupId = BackupRestoreConstants.BACKUPID_PREFIX +
EnvironmentEdgeManager.currentTime();
if (type == BackupType.INCREMENTAL) {
- Set<TableName> incrTableSet;
- try (BackupSystemTable table = new BackupSystemTable(conn)) {
- incrTableSet = table.getIncrementalBackupTableSet(targetRootDir);
- }
+ if (request.isContinuousBackupEnabled()) {
+ Set<TableName> continuousBackupTableSet;
+ try (BackupSystemTable table = new BackupSystemTable(conn)) {
+ continuousBackupTableSet =
table.getContinuousBackupTableSet().keySet();
+ }
+ if (continuousBackupTableSet.isEmpty()) {
+ String msg = "Continuous backup table set contains no tables. "
+ + "You need to run Continuous backup first "
+ + (tableList != null ? "on " + StringUtils.join(tableList, ",") :
"");
+ throw new IOException(msg);
+ }
+ if (!continuousBackupTableSet.containsAll(tableList)) {
+ String extraTables = StringUtils.join(tableList, ",");
+ String msg = "Some tables (" + extraTables + ") haven't gone through
Continuous backup. "
+ + "Perform Continuous backup on " + extraTables + " first, " +
"then retry the command";
Review Comment:
nit
```suggestion
+ "Perform Continuous backup on " + extraTables + " first, then
retry the command";
```
##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java:
##########
@@ -517,28 +517,47 @@ public String backupTables(BackupRequest request) throws
IOException {
String backupId = BackupRestoreConstants.BACKUPID_PREFIX +
EnvironmentEdgeManager.currentTime();
if (type == BackupType.INCREMENTAL) {
- Set<TableName> incrTableSet;
- try (BackupSystemTable table = new BackupSystemTable(conn)) {
- incrTableSet = table.getIncrementalBackupTableSet(targetRootDir);
- }
+ if (request.isContinuousBackupEnabled()) {
+ Set<TableName> continuousBackupTableSet;
+ try (BackupSystemTable table = new BackupSystemTable(conn)) {
+ continuousBackupTableSet =
table.getContinuousBackupTableSet().keySet();
+ }
+ if (continuousBackupTableSet.isEmpty()) {
+ String msg = "Continuous backup table set contains no tables. "
+ + "You need to run Continuous backup first "
+ + (tableList != null ? "on " + StringUtils.join(tableList, ",") :
"");
+ throw new IOException(msg);
+ }
+ if (!continuousBackupTableSet.containsAll(tableList)) {
+ String extraTables = StringUtils.join(tableList, ",");
+ String msg = "Some tables (" + extraTables + ") haven't gone through
Continuous backup. "
+ + "Perform Continuous backup on " + extraTables + " first, " +
"then retry the command";
+ throw new IOException(msg);
+ }
+ } else {
+ Set<TableName> incrTableSet;
+ try (BackupSystemTable table = new BackupSystemTable(conn)) {
+ incrTableSet = table.getIncrementalBackupTableSet(targetRootDir);
+ }
- if (incrTableSet.isEmpty()) {
- String msg =
- "Incremental backup table set contains no tables. " + "You need to
run full backup first "
+ if (incrTableSet.isEmpty()) {
+ String msg = "Incremental backup table set contains no tables. "
+ + "You need to run full backup first "
+ (tableList != null ? "on " + StringUtils.join(tableList, ",") :
"");
- throw new IOException(msg);
- }
- if (tableList != null) {
- tableList.removeAll(incrTableSet);
- if (!tableList.isEmpty()) {
- String extraTables = StringUtils.join(tableList, ",");
- String msg = "Some tables (" + extraTables + ") haven't gone through
full backup. "
- + "Perform full backup on " + extraTables + " first, " + "then
retry the command";
throw new IOException(msg);
}
+ if (tableList != null) {
+ tableList.removeAll(incrTableSet);
+ if (!tableList.isEmpty()) {
+ String extraTables = StringUtils.join(tableList, ",");
+ String msg = "Some tables (" + extraTables + ") haven't gone
through full backup. "
+ + "Perform full backup on " + extraTables + " first, " + "then
retry the command";
Review Comment:
nit
```suggestion
+ "Perform full backup on " + extraTables + " first, then
retry the command";
```
##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java:
##########
@@ -262,9 +273,18 @@ public void execute() throws IOException,
ColumnFamilyMismatchException {
// case PREPARE_INCREMENTAL:
beginBackup(backupManager, backupInfo);
backupInfo.setPhase(BackupPhase.PREPARE_INCREMENTAL);
- LOG.debug("For incremental backup, current table set is "
- + backupManager.getIncrementalBackupTableSet());
- newTimestamps = ((IncrementalBackupManager)
backupManager).getIncrBackupLogFileMap();
+ // Non-continuous Backup incremental backup is controlled by
'incremental backup table set'
+ // and not by user provided backup table list. This is an optimization
to avoid copying
+ // the same set of WALs for incremental backups of different tables at
different time
Review Comment:
nit
```suggestion
// the same set of WALs for incremental backups of different tables at
different times
```
##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java:
##########
@@ -262,9 +273,18 @@ public void execute() throws IOException,
ColumnFamilyMismatchException {
// case PREPARE_INCREMENTAL:
beginBackup(backupManager, backupInfo);
backupInfo.setPhase(BackupPhase.PREPARE_INCREMENTAL);
- LOG.debug("For incremental backup, current table set is "
- + backupManager.getIncrementalBackupTableSet());
- newTimestamps = ((IncrementalBackupManager)
backupManager).getIncrBackupLogFileMap();
+ // Non-continuous Backup incremental backup is controlled by
'incremental backup table set'
+ // and not by user provided backup table list. This is an optimization
to avoid copying
+ // the same set of WALs for incremental backups of different tables at
different time
+ // HBASE-14038
+ // Continuous-incremental backup backs up user provided table list/set
+ if (backupInfo.isContinuousBackupEnabled()) {
+ LOG.debug("For incremental backup, current table set is " +
backupInfo.getTables());
+ } else {
+ LOG.debug("For incremental backup, current table set is "
+ + backupManager.getIncrementalBackupTableSet());
+ newTimestamps = ((IncrementalBackupManager)
backupManager).getIncrBackupLogFileMap();
+ }
Review Comment:
nit
Should the first log message say "For continuous backup..."? I see it is in
the `backupInfo.isContinuousBackupEnabled()` block. If not, then I have a
small suggestion to eliminate the repeated log message and prevent Intellij
from complaining:
```suggestion
Set<TableName> currentTableSet;
if (backupInfo.isContinuousBackupEnabled()) {
currentTableSet = backupInfo.getTables();
} else {
currentTableSet = backupManager.getIncrementalBackupTableSet();
newTimestamps = ((IncrementalBackupManager)
backupManager).getIncrBackupLogFileMap();
}
LOG.debug("For incremental backup, the current table set is {}",
currentTableSet);
```
##########
hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.*;
+import static
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_STAGED_WAL_FLUSH_INITIAL_DELAY;
+import static
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_STAGED_WAL_FLUSH_INTERVAL;
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
+import org.apache.hadoop.hbase.backup.impl.BackupManifest;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.backup.impl.BulkLoad;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.HFileTestUtil;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
+
+@Category(LargeTests.class)
+public class TestIncrementalBackupWithContinuous extends TestContinuousBackup {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestIncrementalBackupWithContinuous.class);
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestIncrementalBackupWithContinuous.class);
+
+ private byte[] ROW = Bytes.toBytes("row1");
+ private final byte[] FAMILY = Bytes.toBytes("family");
+ private final byte[] COLUMN = Bytes.toBytes("col");
+ String backupWalDirName = "TestContinuousBackupWalDir";
+ private static final int ROWS_IN_BULK_LOAD = 100;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ TEST_UTIL = new HBaseTestingUtil();
+ conf1 = TEST_UTIL.getConfiguration();
+ autoRestoreOnFailure = true;
+ useSecondCluster = false;
+ conf1.setInt(CONF_STAGED_WAL_FLUSH_INTERVAL, 1);
+ conf1.setInt(CONF_STAGED_WAL_FLUSH_INITIAL_DELAY, 1);
+ setUpHelper();
+ }
+
+ @Before
+ public void beforeTest() throws IOException {
+ super.beforeTest();
+ }
+
+ @After
+ public void afterTest() throws IOException {
+ super.afterTest();
+ }
+
+ @Test
+ public void testContinuousBackupWithIncrementalBackupSuccess() throws
Exception {
+ LOG.info("Testing incremental backup with continuous backup");
+ String methodName =
Thread.currentThread().getStackTrace()[1].getMethodName();
+ TableName tableName = TableName.valueOf("table_" + methodName);
+ Table t1 = TEST_UTIL.createTable(tableName, FAMILY);
+
+ try (BackupSystemTable table = new
BackupSystemTable(TEST_UTIL.getConnection())) {
+ int before = table.getBackupHistory().size();
+
+ // Run continuous backup
+ String[] args = buildBackupArgs("full", new TableName[] { tableName },
true);
+ int ret = ToolRunner.run(conf1, new BackupDriver(), args);
+ assertEquals("Full Backup should succeed", 0, ret);
+
+ // Verify backup history increased and all the backups are succeeded
+ LOG.info("Verify backup history increased and all the backups are
succeeded");
+ List<BackupInfo> backups = table.getBackupHistory();
+ assertEquals("Backup history should increase", before + 1,
backups.size());
+ for (BackupInfo data : List.of(backups.get(0))) {
+ String backupId = data.getBackupId();
+ assertTrue(checkSucceeded(backupId));
+ }
+
+ // Verify backup manifest contains the correct tables
+ LOG.info("Verify backup manifest contains the correct tables");
+ BackupManifest manifest = getLatestBackupManifest(backups);
+ assertEquals("Backup should contain the expected tables",
Sets.newHashSet(tableName),
+ new HashSet<>(manifest.getTableList()));
+
+ Put p = new Put(ROW);
+ p.addColumn(FAMILY, COLUMN, COLUMN);
+ t1.put(p);
+ // Thread.sleep(5000);
Review Comment:
Delete commented code?
##########
hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.*;
+import static
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_STAGED_WAL_FLUSH_INITIAL_DELAY;
+import static
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_STAGED_WAL_FLUSH_INTERVAL;
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
+import org.apache.hadoop.hbase.backup.impl.BackupManifest;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.backup.impl.BulkLoad;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.HFileTestUtil;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
+
+@Category(LargeTests.class)
+public class TestIncrementalBackupWithContinuous extends TestContinuousBackup {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestIncrementalBackupWithContinuous.class);
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestIncrementalBackupWithContinuous.class);
+
+ private byte[] ROW = Bytes.toBytes("row1");
+ private final byte[] FAMILY = Bytes.toBytes("family");
+ private final byte[] COLUMN = Bytes.toBytes("col");
+ String backupWalDirName = "TestContinuousBackupWalDir";
+ private static final int ROWS_IN_BULK_LOAD = 100;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ TEST_UTIL = new HBaseTestingUtil();
+ conf1 = TEST_UTIL.getConfiguration();
+ autoRestoreOnFailure = true;
+ useSecondCluster = false;
+ conf1.setInt(CONF_STAGED_WAL_FLUSH_INTERVAL, 1);
+ conf1.setInt(CONF_STAGED_WAL_FLUSH_INITIAL_DELAY, 1);
+ setUpHelper();
+ }
+
+ @Before
+ public void beforeTest() throws IOException {
+ super.beforeTest();
+ }
+
+ @After
+ public void afterTest() throws IOException {
+ super.afterTest();
+ }
+
+ @Test
+ public void testContinuousBackupWithIncrementalBackupSuccess() throws
Exception {
+ LOG.info("Testing incremental backup with continuous backup");
+ String methodName =
Thread.currentThread().getStackTrace()[1].getMethodName();
+ TableName tableName = TableName.valueOf("table_" + methodName);
+ Table t1 = TEST_UTIL.createTable(tableName, FAMILY);
+
+ try (BackupSystemTable table = new
BackupSystemTable(TEST_UTIL.getConnection())) {
+ int before = table.getBackupHistory().size();
+
+ // Run continuous backup
+ String[] args = buildBackupArgs("full", new TableName[] { tableName },
true);
+ int ret = ToolRunner.run(conf1, new BackupDriver(), args);
+ assertEquals("Full Backup should succeed", 0, ret);
+
+ // Verify backup history increased and all the backups are succeeded
+ LOG.info("Verify backup history increased and all the backups are
succeeded");
+ List<BackupInfo> backups = table.getBackupHistory();
+ assertEquals("Backup history should increase", before + 1,
backups.size());
+ for (BackupInfo data : List.of(backups.get(0))) {
+ String backupId = data.getBackupId();
+ assertTrue(checkSucceeded(backupId));
+ }
+
+ // Verify backup manifest contains the correct tables
+ LOG.info("Verify backup manifest contains the correct tables");
+ BackupManifest manifest = getLatestBackupManifest(backups);
+ assertEquals("Backup should contain the expected tables",
Sets.newHashSet(tableName),
+ new HashSet<>(manifest.getTableList()));
+
+ Put p = new Put(ROW);
+ p.addColumn(FAMILY, COLUMN, COLUMN);
+ t1.put(p);
+ // Thread.sleep(5000);
+
+ // Run incremental backup
+ LOG.info("Run incremental backup now");
+ before = table.getBackupHistory().size();
+ args = buildBackupArgs("incremental", new TableName[] { tableName },
false);
+ ret = ToolRunner.run(conf1, new BackupDriver(), args);
+ assertEquals("Incremental Backup should succeed", 0, ret);
+
+ // Verify backup history increased and all the backups are succeeded
+ backups = table.getBackupHistory();
+ String incrementalBackupid = null;
+ assertEquals("Backup history should increase", before + 1,
backups.size());
+ for (BackupInfo data : List.of(backups.get(0))) {
+ String backupId = data.getBackupId();
+ incrementalBackupid = backupId;
+ assertTrue(checkSucceeded(backupId));
+ }
+
+ TEST_UTIL.truncateTable(tableName);
+ // Restore incremental backup
+ TableName[] tables = new TableName[] { tableName };
+ BackupAdminImpl client = new BackupAdminImpl(TEST_UTIL.getConnection());
+ client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR,
incrementalBackupid, false,
+ tables, tables, true));
+
+ verifyTable(t1);
+ }
+ }
+
+ @Test
+ public void testContinuousBackupWithIncrementalBackupAndBulkloadSuccess()
throws Exception {
+ String methodName =
Thread.currentThread().getStackTrace()[1].getMethodName();
+ try (BackupSystemTable systemTable = new
BackupSystemTable(TEST_UTIL.getConnection())) {
+ // The test starts with some data, and no bulk loaded rows.
+ int expectedRowCount = NB_ROWS_IN_BATCH;
+ assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
+ assertTrue(systemTable.readBulkloadRows(List.of(table1)).isEmpty());
+
+ // Bulk loads aren't tracked if the table isn't backed up yet
+ performBulkLoad("bulk1", methodName);
+ expectedRowCount += ROWS_IN_BULK_LOAD;
+ assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
+ assertEquals(0, systemTable.readBulkloadRows(List.of(table1)).size());
+
+ // Create a backup, bulk loads are now being tracked
+ String backup1 = backupTables(BackupType.FULL, List.of(table1),
BACKUP_ROOT_DIR, true);
+ assertTrue(checkSucceeded(backup1));
+
+ loadTable(TEST_UTIL.getConnection().getTable(table1));
+ // expectedRowCount += ROWS_IN_BULK_LOAD;
Review Comment:
Delete commented code?
##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java:
##########
@@ -362,23 +385,89 @@ protected void deleteBulkLoadDirectory() throws
IOException {
}
protected void convertWALsToHFiles() throws IOException {
- // get incremental backup file list and prepare parameters for DistCp
- List<String> incrBackupFileList = backupInfo.getIncrBackupFileList();
- // Get list of tables in incremental backup set
- Set<TableName> tableSet = backupManager.getIncrementalBackupTableSet();
- // filter missing files out (they have been copied by previous backups)
- incrBackupFileList = filterMissingFiles(incrBackupFileList);
- List<String> tableList = new ArrayList<String>();
- for (TableName table : tableSet) {
- // Check if table exists
- if (tableExists(table, conn)) {
- tableList.add(table.getNameAsString());
- } else {
- LOG.warn("Table " + table + " does not exists. Skipping in WAL
converter");
+ long previousBackupTs = 0L;
+ if (backupInfo.isContinuousBackupEnabled()) {
+ Set<TableName> tableSet = backupInfo.getTables();
+ List<BackupInfo> backupInfos = backupManager.getBackupHistory(true);
+ for (TableName table : tableSet) {
+ for (BackupInfo backup : backupInfos) {
+ // find previous backup for this table
+ if (backup.getTables().contains(table)) {
+ LOG.info("Found previous backup of type {} with id {} for table
{}", backup.getType(),
+ backup.getBackupId(), table.getNameAsString());
+ List<String> walBackupFileList;
+ if (backup.getType() == BackupType.FULL) {
+ previousBackupTs = backup.getStartTs();
+ } else {
+ previousBackupTs = backup.getIncrCommittedWalTs();
+ }
+ walBackupFileList = getBackupLogs(previousBackupTs);
+ walToHFiles(walBackupFileList,
Arrays.asList(table.getNameAsString()),
+ previousBackupTs);
+ break;
+ }
+ }
}
+ } else {
+ // get incremental backup file list and prepare parameters for DistCp
+ List<String> incrBackupFileList = backupInfo.getIncrBackupFileList();
+ // Get list of tables in incremental backup set
+ Set<TableName> tableSet = backupManager.getIncrementalBackupTableSet();
+ // filter missing files out (they have been copied by previous backups)
+ incrBackupFileList = filterMissingFiles(incrBackupFileList);
+ List<String> tableList = new ArrayList<String>();
+ for (TableName table : tableSet) {
+ // Check if table exists
+ if (tableExists(table, conn)) {
+ tableList.add(table.getNameAsString());
+ } else {
+ LOG.warn("Table " + table + " does not exists. Skipping in WAL
converter");
+ }
+ }
+ walToHFiles(incrBackupFileList, tableList, previousBackupTs);
}
- walToHFiles(incrBackupFileList, tableList);
+ }
+ private List<String> getBackupLogs(long startTs) throws IOException {
+ // get log files from backup dir
+ String walBackupDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
+ if (walBackupDir == null || walBackupDir.isEmpty()) {
Review Comment:
nit - You can use `Strings.isNullOrEmpty()` from
`org.apache.hbase.thirdparty.com.google.common.base`:
```suggestion
if (Strings.isNullOrEmpty(walBackupDir)) {
```
##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java:
##########
@@ -362,23 +385,89 @@ protected void deleteBulkLoadDirectory() throws
IOException {
}
protected void convertWALsToHFiles() throws IOException {
- // get incremental backup file list and prepare parameters for DistCp
- List<String> incrBackupFileList = backupInfo.getIncrBackupFileList();
- // Get list of tables in incremental backup set
- Set<TableName> tableSet = backupManager.getIncrementalBackupTableSet();
- // filter missing files out (they have been copied by previous backups)
- incrBackupFileList = filterMissingFiles(incrBackupFileList);
- List<String> tableList = new ArrayList<String>();
- for (TableName table : tableSet) {
- // Check if table exists
- if (tableExists(table, conn)) {
- tableList.add(table.getNameAsString());
- } else {
- LOG.warn("Table " + table + " does not exists. Skipping in WAL
converter");
+ long previousBackupTs = 0L;
+ if (backupInfo.isContinuousBackupEnabled()) {
+ Set<TableName> tableSet = backupInfo.getTables();
+ List<BackupInfo> backupInfos = backupManager.getBackupHistory(true);
+ for (TableName table : tableSet) {
+ for (BackupInfo backup : backupInfos) {
+ // find previous backup for this table
+ if (backup.getTables().contains(table)) {
+ LOG.info("Found previous backup of type {} with id {} for table
{}", backup.getType(),
+ backup.getBackupId(), table.getNameAsString());
+ List<String> walBackupFileList;
+ if (backup.getType() == BackupType.FULL) {
+ previousBackupTs = backup.getStartTs();
+ } else {
+ previousBackupTs = backup.getIncrCommittedWalTs();
+ }
+ walBackupFileList = getBackupLogs(previousBackupTs);
+ walToHFiles(walBackupFileList,
Arrays.asList(table.getNameAsString()),
+ previousBackupTs);
+ break;
+ }
+ }
}
+ } else {
+ // get incremental backup file list and prepare parameters for DistCp
+ List<String> incrBackupFileList = backupInfo.getIncrBackupFileList();
+ // Get list of tables in incremental backup set
+ Set<TableName> tableSet = backupManager.getIncrementalBackupTableSet();
+ // filter missing files out (they have been copied by previous backups)
+ incrBackupFileList = filterMissingFiles(incrBackupFileList);
+ List<String> tableList = new ArrayList<String>();
+ for (TableName table : tableSet) {
+ // Check if table exists
+ if (tableExists(table, conn)) {
+ tableList.add(table.getNameAsString());
+ } else {
+ LOG.warn("Table " + table + " does not exists. Skipping in WAL
converter");
+ }
+ }
+ walToHFiles(incrBackupFileList, tableList, previousBackupTs);
}
- walToHFiles(incrBackupFileList, tableList);
+ }
+ private List<String> getBackupLogs(long startTs) throws IOException {
+ // get log files from backup dir
+ String walBackupDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
+ if (walBackupDir == null || walBackupDir.isEmpty()) {
+ throw new IOException(
+ "Incremental backup requires the WAL backup directory " +
CONF_CONTINUOUS_BACKUP_WAL_DIR);
+ }
+ List<String> resultLogFiles = new ArrayList<>();
+ Path walBackupPath = new Path(walBackupDir);
+ FileSystem backupFs = FileSystem.get(walBackupPath.toUri(), conf);
+ FileStatus[] dayDirs = backupFs.listStatus(new Path(walBackupDir,
WALS_DIR));
+ SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+ dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+ for (FileStatus dayDir : dayDirs) {
+ if (!dayDir.isDirectory()) {
+ continue; // Skip files, only process directories
+ }
+
+ String dirName = dayDir.getPath().getName();
+ try {
+ Date dirDate = dateFormat.parse(dirName);
+ long dirStartTime = dirDate.getTime(); // Start of that day (00:00:00)
+ long dirEndTime = dirStartTime + ONE_DAY_IN_MILLISECONDS - 1; // End
time of the day
+ // (23:59:59)
Review Comment:
This is comment a little odd. I imagine `mvn spotless:apply` is what did
this. What if you shorten the comment a little?
```suggestion
long dirEndTime = dirStartTime + ONE_DAY_IN_MILLISECONDS - 1; // End
time of day (23:59:59)
```
##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java:
##########
@@ -565,7 +584,11 @@ public String backupTables(BackupRequest request) throws
IOException {
}
}
if (nonExistingTableList != null) {
- if (type == BackupType.INCREMENTAL) {
+ // Non-continuous Backup incremental backup is controlled by
'incremental backup table set'
+ // and not by user provided backup table list. This is an optimization
to avoid copying
+ // the same set of WALs for incremental backups of different tables at
different time
Review Comment:
nit
```suggestion
// the same set of WALs for incremental backups of different tables
at different times
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]