kgeisz commented on code in PR #6717:
URL: https://github.com/apache/hbase/pull/6717#discussion_r2114793555


##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/AbstractRestoreDriver.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_CHECK;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_CHECK_DESC;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_DEBUG;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_DEBUG_DESC;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_OVERWRITE;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_OVERWRITE_DESC;
+import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_SET;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_SET_RESTORE_DESC;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_LIST_DESC;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_MAPPING;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_MAPPING_DESC;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME_RESTORE_DESC;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupManager;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.logging.Log4jUtils;
+import org.apache.hadoop.hbase.util.AbstractHBaseTool;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
+
[email protected]
+public abstract class AbstractRestoreDriver extends AbstractHBaseTool {
+  protected static final Logger LOG = 
LoggerFactory.getLogger(AbstractRestoreDriver.class);
+  protected CommandLine cmd;
+
+  protected static final String USAGE_FOOTER = "";
+
+  protected AbstractRestoreDriver() {
+    init();
+  }
+
+  protected void init() {
+    Log4jUtils.disableZkAndClientLoggers();
+  }
+
+  protected abstract int executeRestore(boolean check, TableName[] fromTables, 
TableName[] toTables,
+    boolean isOverwrite);
+
+  private int parseAndRun() throws IOException {
+    if (!BackupManager.isBackupEnabled(getConf())) {
+      System.err.println(BackupRestoreConstants.ENABLE_BACKUP);
+      return -1;
+    }
+
+    if (cmd.hasOption(OPTION_DEBUG)) {
+      Log4jUtils.setLogLevel("org.apache.hadoop.hbase.backup", "DEBUG");
+    }
+
+    boolean overwrite = cmd.hasOption(OPTION_OVERWRITE);
+    if (overwrite) {
+      LOG.debug("Found -overwrite option in restore command, "
+        + "will overwrite to existing table if any in the restore target");
+    }
+
+    boolean check = cmd.hasOption(OPTION_CHECK);
+    if (check) {
+      LOG.debug(
+        "Found -check option in restore command, " + "will check and verify 
the dependencies");
+    }
+
+    if (cmd.hasOption(OPTION_SET) && cmd.hasOption(OPTION_TABLE)) {
+      System.err.println(
+        "Options -s and -t are mutually exclusive," + " you can not specify 
both of them.");

Review Comment:
   Maybe add a little info to quickly remind the user what `-s` and `-t` do?
   
   ```suggestion
         System.err.printf(
           "Set name (-%s) and table list (-%s) are mutually exclusive, you can 
not specify both " 
             + "of them.%n", OPTION_SET, OPTION_TABLE);
   ```
   
   
   If you decide this isn't necessary, then please remove the unnecessary `+`:
   
   ```suggestion
         System.err.println(
           "Options -s and -t are mutually exclusive, you can not specify both 
of them.");
   ```



##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/AbstractRestoreDriver.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_CHECK;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_CHECK_DESC;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_DEBUG;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_DEBUG_DESC;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_OVERWRITE;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_OVERWRITE_DESC;
+import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_SET;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_SET_RESTORE_DESC;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_LIST_DESC;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_MAPPING;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_MAPPING_DESC;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME_RESTORE_DESC;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupManager;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.logging.Log4jUtils;
+import org.apache.hadoop.hbase.util.AbstractHBaseTool;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
+
[email protected]
+public abstract class AbstractRestoreDriver extends AbstractHBaseTool {
+  protected static final Logger LOG = 
LoggerFactory.getLogger(AbstractRestoreDriver.class);
+  protected CommandLine cmd;
+
+  protected static final String USAGE_FOOTER = "";
+
+  protected AbstractRestoreDriver() {
+    init();
+  }
+
+  protected void init() {
+    Log4jUtils.disableZkAndClientLoggers();
+  }
+
+  protected abstract int executeRestore(boolean check, TableName[] fromTables, 
TableName[] toTables,
+    boolean isOverwrite);
+
+  private int parseAndRun() throws IOException {
+    if (!BackupManager.isBackupEnabled(getConf())) {
+      System.err.println(BackupRestoreConstants.ENABLE_BACKUP);
+      return -1;
+    }
+
+    if (cmd.hasOption(OPTION_DEBUG)) {
+      Log4jUtils.setLogLevel("org.apache.hadoop.hbase.backup", "DEBUG");
+    }
+
+    boolean overwrite = cmd.hasOption(OPTION_OVERWRITE);
+    if (overwrite) {

Review Comment:
   nit - I see in Line 78 you're just using the `hasOption()` method and not 
assigning its output to a variable.  Same with Lines 94, 101, and 107.  Maybe 
this should be like that for consistency?
   
   ```suggestion
       if (cmd.hasOption(OPTION_OVERWRITE)) {
   ```



##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java:
##########
@@ -218,6 +230,12 @@ public ReplicationResult replicate(ReplicateContext 
replicateContext) {
         backupWalEntries(entry.getKey(), entry.getValue());
       }
 
+      // Capture the timestamp of the last WAL entry processed. This is used 
as the replication
+      // checkpoint
+      // so that point-in-time restores know the latest consistent time up to 
which replication has
+      // occurred.

Review Comment:
   nit - It looks like there is an accidental newline after `checkpoint`.
   ```suggestion
         // Capture the timestamp of the last WAL entry processed. This is used 
as the replication
         // checkpoint so that point-in-time restores know the latest 
consistent time up to which
         // replication has occurred.
   ```



##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/RestoreDriver.java:
##########
@@ -17,117 +17,34 @@
  */
 package org.apache.hadoop.hbase.backup;
 
-import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_CHECK;
-import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_CHECK_DESC;
-import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_DEBUG;
-import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_DEBUG_DESC;
-import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_OVERWRITE;
-import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_OVERWRITE_DESC;
-import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_SET;
-import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_SET_RESTORE_DESC;
-import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE;
-import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_LIST_DESC;
-import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_MAPPING;
-import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_MAPPING_DESC;
-import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME;
-import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME_RESTORE_DESC;
-
-import java.io.IOException;
 import java.net.URI;
-import java.util.List;
-import java.util.Objects;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
-import org.apache.hadoop.hbase.backup.impl.BackupManager;
-import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
 import org.apache.hadoop.hbase.backup.util.BackupUtils;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.logging.Log4jUtils;
-import org.apache.hadoop.hbase.util.AbstractHBaseTool;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
-import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
 
 /**
  * Command-line entry point for restore operation
  */
 @InterfaceAudience.Private
-public class RestoreDriver extends AbstractHBaseTool {
-  private static final Logger LOG = 
LoggerFactory.getLogger(RestoreDriver.class);
-  private CommandLine cmd;
-
-  private static final String USAGE_STRING =
-    "Usage: hbase restore <backup_path> <backup_id> [options]\n"
-      + "  backup_path     Path to a backup destination root\n"
-      + "  backup_id       Backup image ID to restore\n"
-      + "  table(s)        Comma-separated list of tables to restore\n";
-
-  private static final String USAGE_FOOTER = "";
-
-  protected RestoreDriver() throws IOException {
-    init();
-  }
-
-  protected void init() {
-    // disable irrelevant loggers to avoid it mess up command output
-    Log4jUtils.disableZkAndClientLoggers();
-  }
-
-  private int parseAndRun() throws IOException {
-    // Check if backup is enabled
-    if (!BackupManager.isBackupEnabled(getConf())) {
-      System.err.println(BackupRestoreConstants.ENABLE_BACKUP);
-      return -1;
-    }
-
-    // enable debug logging
-    if (cmd.hasOption(OPTION_DEBUG)) {
-      Log4jUtils.setLogLevel("org.apache.hadoop.hbase.backup", "DEBUG");
-    }
-
-    // whether to overwrite to existing table if any, false by default
-    boolean overwrite = cmd.hasOption(OPTION_OVERWRITE);
-    if (overwrite) {
-      LOG.debug("Found -overwrite option in restore command, "
-        + "will overwrite to existing table if any in the restore target");
-    }
-
-    // whether to only check the dependencies, false by default
-    boolean check = cmd.hasOption(OPTION_CHECK);
-    if (check) {
-      LOG.debug(
-        "Found -check option in restore command, " + "will check and verify 
the dependencies");
-    }
-
-    if (cmd.hasOption(OPTION_SET) && cmd.hasOption(OPTION_TABLE)) {
-      System.err.println(
-        "Options -s and -t are mutaully exclusive," + " you can not specify 
both of them.");
-      printToolUsage();
-      return -1;
-    }
-
-    if (!cmd.hasOption(OPTION_SET) && !cmd.hasOption(OPTION_TABLE)) {
-      System.err.println("You have to specify either set name or table list to 
restore");
-      printToolUsage();
-      return -1;
-    }
-
-    if (cmd.hasOption(OPTION_YARN_QUEUE_NAME)) {
-      String queueName = cmd.getOptionValue(OPTION_YARN_QUEUE_NAME);
-      // Set MR job queuename to configuration
-      getConf().set("mapreduce.job.queuename", queueName);
-    }
+public class RestoreDriver extends AbstractRestoreDriver {
+  private static final String USAGE_STRING = """
+      Usage: hbase restore <backup_path> <backup_id> [options]

Review Comment:
   Should `<table(s)>` be in this usage line as well?



##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/AbstractRestoreDriver.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_CHECK;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_CHECK_DESC;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_DEBUG;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_DEBUG_DESC;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_OVERWRITE;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_OVERWRITE_DESC;
+import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_SET;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_SET_RESTORE_DESC;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_LIST_DESC;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_MAPPING;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_MAPPING_DESC;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME_RESTORE_DESC;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupManager;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.logging.Log4jUtils;
+import org.apache.hadoop.hbase.util.AbstractHBaseTool;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
+
[email protected]
+public abstract class AbstractRestoreDriver extends AbstractHBaseTool {
+  protected static final Logger LOG = 
LoggerFactory.getLogger(AbstractRestoreDriver.class);
+  protected CommandLine cmd;
+
+  protected static final String USAGE_FOOTER = "";
+
+  protected AbstractRestoreDriver() {
+    init();
+  }
+
+  protected void init() {
+    Log4jUtils.disableZkAndClientLoggers();
+  }
+
+  protected abstract int executeRestore(boolean check, TableName[] fromTables, 
TableName[] toTables,
+    boolean isOverwrite);
+
+  private int parseAndRun() throws IOException {
+    if (!BackupManager.isBackupEnabled(getConf())) {
+      System.err.println(BackupRestoreConstants.ENABLE_BACKUP);
+      return -1;
+    }
+
+    if (cmd.hasOption(OPTION_DEBUG)) {
+      Log4jUtils.setLogLevel("org.apache.hadoop.hbase.backup", "DEBUG");
+    }
+
+    boolean overwrite = cmd.hasOption(OPTION_OVERWRITE);
+    if (overwrite) {
+      LOG.debug("Found -overwrite option in restore command, "
+        + "will overwrite to existing table if any in the restore target");
+    }
+
+    boolean check = cmd.hasOption(OPTION_CHECK);
+    if (check) {
+      LOG.debug(
+        "Found -check option in restore command, " + "will check and verify 
the dependencies");

Review Comment:
   nit - Same as the previous comment
   
   ```suggestion
         LOG.debug(
           "Found check option (-{}) in restore command, will check and verify 
the dependencies",
           OPTION_CHECK);
   ```



##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/AbstractRestoreDriver.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_CHECK;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_CHECK_DESC;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_DEBUG;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_DEBUG_DESC;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_OVERWRITE;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_OVERWRITE_DESC;
+import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_SET;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_SET_RESTORE_DESC;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_LIST_DESC;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_MAPPING;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_MAPPING_DESC;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME_RESTORE_DESC;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupManager;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.logging.Log4jUtils;
+import org.apache.hadoop.hbase.util.AbstractHBaseTool;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
+
[email protected]
+public abstract class AbstractRestoreDriver extends AbstractHBaseTool {
+  protected static final Logger LOG = 
LoggerFactory.getLogger(AbstractRestoreDriver.class);
+  protected CommandLine cmd;
+
+  protected static final String USAGE_FOOTER = "";
+
+  protected AbstractRestoreDriver() {
+    init();
+  }
+
+  protected void init() {
+    Log4jUtils.disableZkAndClientLoggers();
+  }
+
+  protected abstract int executeRestore(boolean check, TableName[] fromTables, 
TableName[] toTables,
+    boolean isOverwrite);
+
+  private int parseAndRun() throws IOException {
+    if (!BackupManager.isBackupEnabled(getConf())) {
+      System.err.println(BackupRestoreConstants.ENABLE_BACKUP);
+      return -1;
+    }
+
+    if (cmd.hasOption(OPTION_DEBUG)) {
+      Log4jUtils.setLogLevel("org.apache.hadoop.hbase.backup", "DEBUG");
+    }
+
+    boolean overwrite = cmd.hasOption(OPTION_OVERWRITE);
+    if (overwrite) {
+      LOG.debug("Found -overwrite option in restore command, "
+        + "will overwrite to existing table if any in the restore target");
+    }
+
+    boolean check = cmd.hasOption(OPTION_CHECK);
+    if (check) {

Review Comment:
   Same as the previous comment
   
   ```suggestion
       if (cmd.hasOption(OPTION_CHECK)) {
   ```



##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/PointInTimeRestoreDriver.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.LONG_OPTION_PITR_BACKUP_PATH;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.LONG_OPTION_TO_DATETIME;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_PITR_BACKUP_PATH;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_PITR_BACKUP_PATH_DESC;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TO_DATETIME;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TO_DATETIME_DESC;
+
+import java.net.URI;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Command-line entry point for restore operation
+ */
[email protected]
+public class PointInTimeRestoreDriver extends AbstractRestoreDriver {
+  private static final String USAGE_STRING = """
+      Usage: hbase pitr [options]
+        <backup_path>   Backup Path to use for Point in Time Restore
+        table(s)        Comma-separated list of tables to restore
+      """;
+
+  @Override
+  protected int executeRestore(boolean check, TableName[] fromTables, 
TableName[] toTables,
+    boolean isOverwrite) {
+    String walBackupDir = getConf().get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
+    if (walBackupDir == null || walBackupDir.isEmpty()) {

Review Comment:
   nit - You can use `Strings.isNullOrEmpty()` from 
`org.apache.hbase.thirdparty.com.google.common.base`:
   
   ```suggestion
       if (Strings.isNullOrEmpty(walBackupDir)) {
   ```



##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/AbstractRestoreDriver.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_CHECK;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_CHECK_DESC;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_DEBUG;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_DEBUG_DESC;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_OVERWRITE;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_OVERWRITE_DESC;
+import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_SET;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_SET_RESTORE_DESC;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_LIST_DESC;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_MAPPING;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_MAPPING_DESC;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME_RESTORE_DESC;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupManager;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.logging.Log4jUtils;
+import org.apache.hadoop.hbase.util.AbstractHBaseTool;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
+
[email protected]
+public abstract class AbstractRestoreDriver extends AbstractHBaseTool {
+  protected static final Logger LOG = 
LoggerFactory.getLogger(AbstractRestoreDriver.class);
+  protected CommandLine cmd;
+
+  protected static final String USAGE_FOOTER = "";
+
+  protected AbstractRestoreDriver() {
+    init();
+  }
+
+  protected void init() {
+    Log4jUtils.disableZkAndClientLoggers();
+  }
+
+  protected abstract int executeRestore(boolean check, TableName[] fromTables, 
TableName[] toTables,
+    boolean isOverwrite);
+
+  private int parseAndRun() throws IOException {
+    if (!BackupManager.isBackupEnabled(getConf())) {
+      System.err.println(BackupRestoreConstants.ENABLE_BACKUP);
+      return -1;
+    }
+
+    if (cmd.hasOption(OPTION_DEBUG)) {
+      Log4jUtils.setLogLevel("org.apache.hadoop.hbase.backup", "DEBUG");
+    }
+
+    boolean overwrite = cmd.hasOption(OPTION_OVERWRITE);
+    if (overwrite) {
+      LOG.debug("Found -overwrite option in restore command, "
+        + "will overwrite to existing table if any in the restore target");

Review Comment:
   nit - I see you have `-o`, but from what I can tell `-overwrite` is not an 
available alternative option.
   
   ```suggestion
         LOG.debug("Found overwrite option (-{}) in restore command, "
           + "will overwrite to existing table if any in the restore target", 
OPTION_OVERWRITE);
   ```



##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/AbstractRestoreDriver.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_CHECK;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_CHECK_DESC;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_DEBUG;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_DEBUG_DESC;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_OVERWRITE;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_OVERWRITE_DESC;
+import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_SET;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_SET_RESTORE_DESC;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_LIST_DESC;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_MAPPING;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_MAPPING_DESC;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME_RESTORE_DESC;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupManager;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.logging.Log4jUtils;
+import org.apache.hadoop.hbase.util.AbstractHBaseTool;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
+
[email protected]
+public abstract class AbstractRestoreDriver extends AbstractHBaseTool {
+  protected static final Logger LOG = 
LoggerFactory.getLogger(AbstractRestoreDriver.class);
+  protected CommandLine cmd;
+
+  protected static final String USAGE_FOOTER = "";
+
+  protected AbstractRestoreDriver() {
+    init();
+  }
+
+  protected void init() {
+    Log4jUtils.disableZkAndClientLoggers();
+  }
+
+  protected abstract int executeRestore(boolean check, TableName[] fromTables, 
TableName[] toTables,
+    boolean isOverwrite);
+
+  private int parseAndRun() throws IOException {
+    if (!BackupManager.isBackupEnabled(getConf())) {
+      System.err.println(BackupRestoreConstants.ENABLE_BACKUP);
+      return -1;
+    }
+
+    if (cmd.hasOption(OPTION_DEBUG)) {
+      Log4jUtils.setLogLevel("org.apache.hadoop.hbase.backup", "DEBUG");
+    }
+
+    boolean overwrite = cmd.hasOption(OPTION_OVERWRITE);
+    if (overwrite) {
+      LOG.debug("Found -overwrite option in restore command, "
+        + "will overwrite to existing table if any in the restore target");
+    }
+
+    boolean check = cmd.hasOption(OPTION_CHECK);
+    if (check) {
+      LOG.debug(
+        "Found -check option in restore command, " + "will check and verify 
the dependencies");
+    }
+
+    if (cmd.hasOption(OPTION_SET) && cmd.hasOption(OPTION_TABLE)) {
+      System.err.println(
+        "Options -s and -t are mutually exclusive," + " you can not specify 
both of them.");
+      printToolUsage();
+      return -1;
+    }
+
+    if (!cmd.hasOption(OPTION_SET) && !cmd.hasOption(OPTION_TABLE)) {
+      System.err.println("You have to specify either set name or table list to 
restore");
+      printToolUsage();
+      return -1;
+    }
+
+    if (cmd.hasOption(OPTION_YARN_QUEUE_NAME)) {
+      String queueName = cmd.getOptionValue(OPTION_YARN_QUEUE_NAME);
+      // Set MR job queuename to configuration
+      getConf().set("mapreduce.job.queuename", queueName);
+    }
+
+    String tables;
+    TableName[] sTableArray;
+    TableName[] tTableArray;
+
+    String tableMapping =
+      cmd.hasOption(OPTION_TABLE_MAPPING) ? 
cmd.getOptionValue(OPTION_TABLE_MAPPING) : null;

Review Comment:
   I believe `cmd.getOptionValue(OPTION_TABLE_MAPPING)` will just automatically 
return `null` if needed.
   
   
https://commons.apache.org/proper/commons-cli/apidocs/org/apache/commons/cli/CommandLine.html#getOptionValue(java.lang.String)



##########
hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java:
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_ENABLE_CONTINUOUS_BACKUP;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TO_DATETIME;
+import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.stream.Collectors;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category(LargeTests.class)
+public class TestPointInTimeRestore extends TestBackupBase {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestPointInTimeRestore.class);
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestPointInTimeRestore.class);
+
+  private static final String backupWalDirName = 
"TestPointInTimeRestoreWalDir";
+  private static final int WAIT_FOR_REPLICATION_MS = 30_000;
+  static Path backupWalDir;
+  static FileSystem fs;
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    Path root = TEST_UTIL.getDataTestDirOnTestFS();
+    backupWalDir = new Path(root, backupWalDirName);
+    fs = FileSystem.get(conf1);
+    fs.mkdirs(backupWalDir);
+    conf1.set(CONF_CONTINUOUS_BACKUP_WAL_DIR, backupWalDir.toString());
+
+    setUpBackupUps();
+  }
+
+  /**
+   * Sets up multiple backups at different timestamps by: 1. Adjusting the 
system time to simulate
+   * past backup points. 2. Loading data into tables to create meaningful 
snapshots. 3. Running full
+   * backups with or without continuous backup enabled. 4. Ensuring 
replication is complete before
+   * proceeding.
+   */
+  private static void setUpBackupUps() throws Exception {

Review Comment:
   Is this method name a typo?  Should it say `setUpBackups()`?



##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/PointInTimeRestoreDriver.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.LONG_OPTION_PITR_BACKUP_PATH;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.LONG_OPTION_TO_DATETIME;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_PITR_BACKUP_PATH;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_PITR_BACKUP_PATH_DESC;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TO_DATETIME;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TO_DATETIME_DESC;
+
+import java.net.URI;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Command-line entry point for restore operation
+ */
[email protected]
+public class PointInTimeRestoreDriver extends AbstractRestoreDriver {
+  private static final String USAGE_STRING = """
+      Usage: hbase pitr [options]
+        <backup_path>   Backup Path to use for Point in Time Restore
+        table(s)        Comma-separated list of tables to restore
+      """;
+
+  @Override
+  protected int executeRestore(boolean check, TableName[] fromTables, 
TableName[] toTables,
+    boolean isOverwrite) {
+    String walBackupDir = getConf().get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
+    if (walBackupDir == null || walBackupDir.isEmpty()) {
+      System.err.printf(
+        "Point-in-Time Restore requires the WAL backup directory (%s) to 
replay logs after full and incremental backups. "
+          + "Set this property if you need Point-in-Time Restore. Otherwise, 
use the normal restore process with the appropriate backup ID.%n",
+        CONF_CONTINUOUS_BACKUP_WAL_DIR);
+      return -1;
+    }
+
+    String[] remainArgs = cmd.getArgs();
+    if (remainArgs.length != 0) {
+      printToolUsage();
+      return -1;
+    }
+
+    String backupRootDir =
+      cmd.hasOption(OPTION_PITR_BACKUP_PATH) ? 
cmd.getOptionValue(OPTION_PITR_BACKUP_PATH) : null;

Review Comment:
   `cmd.getOptionValue()` will automatically return `null` if the option is not 
set.



##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/AbstractRestoreDriver.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_CHECK;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_CHECK_DESC;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_DEBUG;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_DEBUG_DESC;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_OVERWRITE;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_OVERWRITE_DESC;
+import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_SET;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_SET_RESTORE_DESC;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_LIST_DESC;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_MAPPING;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TABLE_MAPPING_DESC;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME_RESTORE_DESC;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupManager;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.logging.Log4jUtils;
+import org.apache.hadoop.hbase.util.AbstractHBaseTool;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
+
[email protected]
+public abstract class AbstractRestoreDriver extends AbstractHBaseTool {
+  protected static final Logger LOG = 
LoggerFactory.getLogger(AbstractRestoreDriver.class);
+  protected CommandLine cmd;
+
+  protected static final String USAGE_FOOTER = "";
+
+  protected AbstractRestoreDriver() {
+    init();
+  }
+
+  protected void init() {
+    Log4jUtils.disableZkAndClientLoggers();
+  }
+
+  protected abstract int executeRestore(boolean check, TableName[] fromTables, 
TableName[] toTables,
+    boolean isOverwrite);
+
+  private int parseAndRun() throws IOException {
+    if (!BackupManager.isBackupEnabled(getConf())) {
+      System.err.println(BackupRestoreConstants.ENABLE_BACKUP);
+      return -1;
+    }
+
+    if (cmd.hasOption(OPTION_DEBUG)) {
+      Log4jUtils.setLogLevel("org.apache.hadoop.hbase.backup", "DEBUG");
+    }
+
+    boolean overwrite = cmd.hasOption(OPTION_OVERWRITE);
+    if (overwrite) {
+      LOG.debug("Found -overwrite option in restore command, "
+        + "will overwrite to existing table if any in the restore target");
+    }
+
+    boolean check = cmd.hasOption(OPTION_CHECK);
+    if (check) {
+      LOG.debug(
+        "Found -check option in restore command, " + "will check and verify 
the dependencies");
+    }
+
+    if (cmd.hasOption(OPTION_SET) && cmd.hasOption(OPTION_TABLE)) {
+      System.err.println(
+        "Options -s and -t are mutually exclusive," + " you can not specify 
both of them.");
+      printToolUsage();
+      return -1;
+    }
+
+    if (!cmd.hasOption(OPTION_SET) && !cmd.hasOption(OPTION_TABLE)) {
+      System.err.println("You have to specify either set name or table list to 
restore");

Review Comment:
   Similar to the previous comment
   
   ```suggestion
         System.err.printf("You have to specify either set name (-%s) or table 
list (-%s) to "
           + "restore\n", OPTION_SET, OPTION_TABLE);
   ```



##########
hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java:
##########
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_ENABLE_CONTINUOUS_BACKUP;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TO_DATETIME;
+import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.stream.Collectors;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category(LargeTests.class)
+public class TestPointInTimeRestore extends TestBackupBase {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestPointInTimeRestore.class);
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestPointInTimeRestore.class);
+
+  private static final String backupWalDirName = 
"TestPointInTimeRestoreWalDir";
+  private static final int WAIT_FOR_REPLICATION_MS = 30_000;
+  static Path backupWalDir;
+  static FileSystem fs;
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    Path root = TEST_UTIL.getDataTestDirOnTestFS();
+    backupWalDir = new Path(root, backupWalDirName);
+    fs = FileSystem.get(conf1);
+    fs.mkdirs(backupWalDir);
+    conf1.set(CONF_CONTINUOUS_BACKUP_WAL_DIR, backupWalDir.toString());
+
+    setUpBackupUps();
+  }
+
+  /**
+   * Sets up multiple backups at different timestamps by: 1. Adjusting the 
system time to simulate
+   * past backup points. 2. Loading data into tables to create meaningful 
snapshots. 3. Running full
+   * backups with or without continuous backup enabled. 4. Ensuring 
replication is complete before
+   * proceeding.
+   */
+  private static void setUpBackupUps() throws Exception {
+    // Simulate a backup taken 20 days ago
+    EnvironmentEdgeManager
+      .injectEdge(() -> System.currentTimeMillis() - 20 * 
ONE_DAY_IN_MILLISECONDS);
+    loadRandomData(table1, 1000); // Insert initial data into table1
+
+    // Perform a full backup for table1 with continuous backup enabled
+    String[] args = buildBackupArgs("full", new TableName[] { table1 }, true);
+    int ret = ToolRunner.run(conf1, new BackupDriver(), args);
+    assertEquals("Backup should succeed", 0, ret);
+
+    // Move time forward to simulate 15 days ago
+    EnvironmentEdgeManager
+      .injectEdge(() -> System.currentTimeMillis() - 15 * 
ONE_DAY_IN_MILLISECONDS);
+    loadRandomData(table1, 1000); // Add more data to table1
+    loadRandomData(table2, 500); // Insert data into table2
+
+    waitForReplication(); // Ensure replication is complete
+
+    // Perform a full backup for table2 with continuous backup enabled
+    args = buildBackupArgs("full", new TableName[] { table2 }, true);
+    ret = ToolRunner.run(conf1, new BackupDriver(), args);
+    assertEquals("Backup should succeed", 0, ret);
+
+    // Move time forward to simulate 10 days ago
+    EnvironmentEdgeManager
+      .injectEdge(() -> System.currentTimeMillis() - 10 * 
ONE_DAY_IN_MILLISECONDS);
+    loadRandomData(table2, 500); // Add more data to table2
+    loadRandomData(table3, 500); // Insert data into table3
+
+    // Perform a full backup for table3 and table4 (without continuous backup)
+    args = buildBackupArgs("full", new TableName[] { table3, table4 }, false);
+    ret = ToolRunner.run(conf1, new BackupDriver(), args);
+    assertEquals("Backup should succeed", 0, ret);
+
+    waitForReplication(); // Ensure replication is complete before concluding 
setup
+
+    // Reset time mocking to avoid affecting other tests
+    EnvironmentEdgeManager.reset();
+  }
+
+  @AfterClass
+  public static void setupAfterClass() throws IOException {
+    Path root = TEST_UTIL.getDataTestDirOnTestFS();
+    Path backupWalDir = new Path(root, backupWalDirName);
+    FileSystem fs = FileSystem.get(conf1);
+
+    if (fs.exists(backupWalDir)) {
+      fs.delete(backupWalDir, true);
+    }
+
+    conf1.unset(CONF_CONTINUOUS_BACKUP_WAL_DIR);
+  }
+
+  /**
+   * Verifies that PITR (Point-in-Time Restore) fails when the requested 
restore time is either in
+   * the future or outside the allowed retention window.
+   */
+  @Test
+  public void testPITR_FailsOutsideWindow() throws Exception {
+    // Case 1: Requested restore time is in the future (should fail)
+    String[] args = buildPITRArgs(new TableName[] { table1 },
+      new TableName[] { TableName.valueOf("restoredTable1") },
+      EnvironmentEdgeManager.currentTime() + ONE_DAY_IN_MILLISECONDS);
+
+    int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args);
+    assertNotEquals("Restore should fail since the requested restore time is 
in the future", 0,
+      ret);
+
+    // Case 2: Requested restore time is too old (beyond the retention window, 
should fail)
+    args = buildPITRArgs(new TableName[] { table1 },
+      new TableName[] { TableName.valueOf("restoredTable1") },
+      EnvironmentEdgeManager.currentTime() - 40 * ONE_DAY_IN_MILLISECONDS);
+
+    ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args);
+    assertNotEquals(
+      "Restore should fail since the requested restore time is outside the 
retention window", 0,
+      ret);
+  }
+
+  /**
+   * Ensures that PITR fails when attempting to restore tables where 
continuous backup was not
+   * enabled.
+   */
+  @Test
+  public void testPointInTimeRestore_ContinuousBackupNotEnabledTables() throws 
Exception {
+    String[] args = buildPITRArgs(new TableName[] { table3 },
+      new TableName[] { TableName.valueOf("restoredTable1") },
+      EnvironmentEdgeManager.currentTime() - 10 * ONE_DAY_IN_MILLISECONDS);
+
+    int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args);
+    assertNotEquals("Restore should fail since continuous backup is not 
enabled for the table", 0,
+      ret);
+  }
+
+  /**
+   * Ensures that PITR fails when trying to restore from a point before 
continuous backup started.
+   */
+  @Test
+  public void testPointInTimeRestore_TablesWithNoProperBackupOrWals() throws 
Exception {
+    String[] args = buildPITRArgs(new TableName[] { table2 },
+      new TableName[] { TableName.valueOf("restoredTable1") },
+      EnvironmentEdgeManager.currentTime() - 16 * ONE_DAY_IN_MILLISECONDS);
+
+    int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args);
+    assertNotEquals(
+      "Restore should fail since the requested restore point is before the 
start of continuous backup",
+      0, ret);
+  }
+
+  /**
+   * Verifies that PITR successfully restores data for a single table.
+   */
+  @Test
+  public void testPointInTimeRestore_SuccessfulRestoreForOneTable() throws 
Exception {
+    TableName restoredTable = TableName.valueOf("restoredTable");
+
+    // Perform restore operation
+    String[] args = buildPITRArgs(new TableName[] { table1 }, new TableName[] 
{ restoredTable },
+      EnvironmentEdgeManager.currentTime() - 5 * ONE_DAY_IN_MILLISECONDS);
+
+    int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args);
+    assertEquals("Restore should succeed", 0, ret);
+
+    // Validate that the restored table contains the same number of rows as 
the original table
+    assertEquals("Restored table should have the same row count as the 
original",
+      getRowCount(table1), getRowCount(restoredTable));
+  }
+
+  /**
+   * Verifies that PITR successfully restores multiple tables at once.
+   */
+  @Test
+  public void testPointInTimeRestore_SuccessfulRestoreForMultipleTables() 
throws Exception {
+    TableName restoredTable1 = TableName.valueOf("restoredTable1");
+    TableName restoredTable2 = TableName.valueOf("restoredTable2");
+
+    // Perform restore operation for multiple tables
+    String[] args = buildPITRArgs(new TableName[] { table1, table2 },
+      new TableName[] { restoredTable1, restoredTable2 },
+      EnvironmentEdgeManager.currentTime() - 5 * ONE_DAY_IN_MILLISECONDS);
+
+    int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args);
+    assertEquals("Restore should succeed", 0, ret);
+
+    // Validate that the restored tables contain the same number of rows as 
the originals
+    assertEquals("Restored table1 should have the same row count as the 
original",
+      getRowCount(table1), getRowCount(restoredTable1));
+    assertEquals("Restored table2 should have the same row count as the 
original",
+      getRowCount(table2), getRowCount(restoredTable2));
+  }
+
+  private String[] buildPITRArgs(TableName[] sourceTables, TableName[] 
targetTables, long endTime) {
+    String sourceTableNames =
+      
Arrays.stream(sourceTables).map(TableName::getNameAsString).collect(Collectors.joining(","));
+
+    String targetTableNames =
+      
Arrays.stream(targetTables).map(TableName::getNameAsString).collect(Collectors.joining(","));
+
+    return new String[] { "-t", sourceTableNames, "-m", targetTableNames, "-" 
+ OPTION_TO_DATETIME,

Review Comment:
   To be safe, you can use `"-" + OPTION_TABLE` instead of `-t` in case the 
option is ever changed.  Similar for `-m` and the args in `buildBackupArgs()`.



##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java:
##########
@@ -509,6 +520,338 @@ public void restore(RestoreRequest request) throws 
IOException {
     new RestoreTablesClient(conn, request).execute();
   }
 
+  private boolean validateRequest(RestoreRequest request) throws IOException {
+    // check and load backup image manifest for the tables
+    Path rootPath = new Path(request.getBackupRootDir());
+    String backupId = request.getBackupId();
+    TableName[] sTableArray = request.getFromTables();
+    BackupManifest manifest =
+      HBackupFileSystem.getManifest(conn.getConfiguration(), rootPath, 
backupId);
+
+    // Validate the backup image and its dependencies
+    return BackupUtils.validate(Arrays.asList(sTableArray), manifest, 
conn.getConfiguration());
+  }
+
+  @Override
+  public void pointInTimeRestore(PointInTimeRestoreRequest request) throws 
IOException {
+    if (request.getBackupRootDir() == null) {
+      defaultPointInTimeRestore(request);
+    } else {
+      // TODO: special case, not supported at the moment
+      throw new IOException("Custom backup location for Point-In-Time Recovery 
Not supported!");
+    }
+    LOG.info("Successfully completed Point In Time Restore for all tables.");
+  }
+
+  /**
+   * Performs a default Point-In-Time Restore (PITR) by restoring the latest 
valid backup and
+   * replaying the WALs to bring the table to the desired state. PITR 
requires: 1. A valid backup
+   * available before the end time. 2. Write-Ahead Logs (WALs) covering the 
remaining duration up to
+   * the end time.
+   * @param request PointInTimeRestoreRequest containing restore parameters.
+   * @throws IOException If no valid backup or WALs are found, or if an error 
occurs during
+   *                     restoration.
+   */
+  private void defaultPointInTimeRestore(PointInTimeRestoreRequest request) 
throws IOException {
+    long endTime = request.getToDateTime();
+    validateRequestToTime(endTime);
+
+    TableName[] sTableArray = request.getFromTables();
+    TableName[] tTableArray = resolveTargetTables(sTableArray, 
request.getToTables());
+
+    // Validate PITR requirements
+    validatePitr(endTime, sTableArray, tTableArray);
+
+    // If only validation is required, log and return
+    if (request.isCheck()) {
+      LOG.info("PITR can be successfully executed");
+      return;
+    }
+
+    // Execute PITR process
+    try (BackupSystemTable table = new BackupSystemTable(conn)) {
+      Map<TableName, Long> continuousBackupTables = 
table.getContinuousBackupTableSet();
+      List<BackupInfo> backupInfos = 
table.getBackupInfos(BackupState.COMPLETE);
+
+      for (int i = 0; i < sTableArray.length; i++) {
+        restoreTableWithWalReplay(sTableArray[i], tTableArray[i], endTime, 
continuousBackupTables,
+          backupInfos, request);
+      }
+    }
+  }
+
+  /**
+   * Validates whether the requested end time falls within the allowed PITR 
recovery window.
+   * @param endTime The target recovery time.
+   * @throws IOException If the requested recovery time is outside the allowed 
window.
+   */
+  private void validateRequestToTime(long endTime) throws IOException {
+    long pitrWindowDays = 
conn.getConfiguration().getLong(CONF_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS,
+      DEFAULT_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS);
+    long currentTime = EnvironmentEdgeManager.getDelegate().currentTime();
+    long pitrMaxStartTime = currentTime - 
TimeUnit.DAYS.toMillis(pitrWindowDays);
+
+    if (endTime < pitrMaxStartTime) {
+      String errorMsg = String.format(
+        "Requested recovery time (%d) is out of the allowed PITR window (last 
%d days).", endTime,
+        pitrWindowDays);
+      LOG.error(errorMsg);
+      throw new IOException(errorMsg);
+    }
+
+    if (endTime > currentTime) {
+      String errorMsg = String.format(
+        "Requested recovery time (%d) is in the future. Current time: %d.", 
endTime, currentTime);
+      LOG.error(errorMsg);
+      throw new IOException(errorMsg);
+    }
+  }
+
+  /**
+   * Resolves the target table array. If null or empty, defaults to the source 
table array.
+   */
+  private TableName[] resolveTargetTables(TableName[] sourceTables, 
TableName[] targetTables) {
+    return (targetTables == null || targetTables.length == 0) ? sourceTables : 
targetTables;
+  }
+
+  /**
+   * Validates whether Point-In-Time Recovery (PITR) is possible for the given 
tables at the
+   * specified time.
+   * <p>
+   * PITR requires:
+   * <ul>
+   * <li>Continuous backup to be enabled for the source tables.</li>
+   * <li>A valid backup image and corresponding WALs to be available.</li>
+   * </ul>
+   * @param endTime     The target recovery time.
+   * @param sTableArray The source tables to restore.
+   * @param tTableArray The target tables where the restore will be performed.
+   * @throws IOException If PITR is not possible due to missing continuous 
backup or backup images.
+   */
+  private void validatePitr(long endTime, TableName[] sTableArray, TableName[] 
tTableArray)
+    throws IOException {
+    try (BackupSystemTable table = new BackupSystemTable(conn)) {
+      // Retrieve the set of tables with continuous backup enabled
+      Map<TableName, Long> continuousBackupTables = 
table.getContinuousBackupTableSet();
+
+      // Ensure all source tables have continuous backup enabled
+      validateContinuousBackup(sTableArray, continuousBackupTables);
+
+      // Fetch completed backup information
+      List<BackupInfo> backupInfos = 
table.getBackupInfos(BackupState.COMPLETE);
+
+      // Ensure a valid backup and WALs exist for PITR
+      validateBackupAvailability(sTableArray, tTableArray, endTime, 
continuousBackupTables,
+        backupInfos);
+    }
+  }
+
+  /**
+   * Ensures that all source tables have continuous backup enabled.
+   */
+  private void validateContinuousBackup(TableName[] tables,
+    Map<TableName, Long> continuousBackupTables) throws IOException {
+    List<TableName> missingTables =
+      Arrays.stream(tables).filter(table -> 
!continuousBackupTables.containsKey(table)).toList();
+
+    if (!missingTables.isEmpty()) {
+      String errorMsg = "Continuous Backup is not enabled for the following 
tables: "
+        + 
missingTables.stream().map(TableName::getNameAsString).collect(Collectors.joining(",
 "));
+      LOG.error(errorMsg);
+      throw new IOException(errorMsg);
+    }
+  }
+
+  /**
+   * Ensures that a valid backup and corresponding WALs exist for PITR for 
each source table. PITR
+   * requires: 1. A valid backup available before the end time. 2. Write-Ahead 
Logs (WALs) covering
+   * the remaining duration up to the end time.
+   */
+  private void validateBackupAvailability(TableName[] sTableArray, TableName[] 
tTableArray,
+    long endTime, Map<TableName, Long> continuousBackupTables, 
List<BackupInfo> backupInfos)
+    throws IOException {
+    for (int i = 0; i < sTableArray.length; i++) {
+      if (
+        !canPerformPitr(sTableArray[i], tTableArray[i], endTime, 
continuousBackupTables,
+          backupInfos)
+      ) {
+        String errorMsg = "Could not find a valid backup and WALs for PITR for 
table: "
+          + sTableArray[i].getNameAsString();
+        LOG.error(errorMsg);
+        throw new IOException(errorMsg);
+      }
+    }
+  }
+
+  /**
+   * Checks whether PITR can be performed for a given source-target table pair.
+   */
+  private boolean canPerformPitr(TableName stableName, TableName tTableName, 
long endTime,
+    Map<TableName, Long> continuousBackupTables, List<BackupInfo> backupInfos) 
{
+    return getValidBackupInfo(stableName, tTableName, endTime, 
continuousBackupTables, backupInfos)
+        != null;
+  }
+
+  /**
+   * Finds a valid backup for PITR that meets the required conditions.
+   */
+  private BackupInfo getValidBackupInfo(TableName sTableName, TableName 
tTablename, long endTime,
+    Map<TableName, Long> continuousBackupTables, List<BackupInfo> backupInfos) 
{
+    for (BackupInfo info : backupInfos) {
+      if (isValidBackupForPitr(info, sTableName, endTime, 
continuousBackupTables)) {
+
+        RestoreRequest restoreRequest =
+          BackupUtils.createRestoreRequest(info.getBackupRootDir(), 
info.getBackupId(), true,
+            new TableName[] { sTableName }, new TableName[] { tTablename }, 
false);
+
+        try {
+          if (validateRequest(restoreRequest)) {
+            return info;
+          }
+        } catch (IOException e) {
+          LOG.warn("Exception occurred while testing the backup : {} for 
restore ", info, e);
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Determines if the given backup is valid for PITR.
+   * <p>
+   * A backup is valid if:
+   * <ul>
+   * <li>It contains the source table.</li>
+   * <li>It was completed before the end time.</li>
+   * <li>The start timestamp of the backup is after the continuous backup 
start time for the
+   * table.</li>
+   * </ul>
+   * @param info                   Backup information object.
+   * @param tableName              Table to check.
+   * @param endTime                The target recovery time.
+   * @param continuousBackupTables Map of tables with continuous backup 
enabled.
+   * @return true if the backup is valid for PITR, false otherwise.
+   */
+  private boolean isValidBackupForPitr(BackupInfo info, TableName tableName, 
long endTime,
+    Map<TableName, Long> continuousBackupTables) {
+    return info.getTableNames().contains(tableName) && info.getCompleteTs() <= 
endTime
+      && continuousBackupTables.getOrDefault(tableName, 0L) <= 
info.getStartTs();
+  }
+
+  /**
+   * Restores a table from a valid backup and replays WALs to reach the 
desired PITR state.
+   */
+  private void restoreTableWithWalReplay(TableName sourceTable, TableName 
targetTable, long endTime,
+    Map<TableName, Long> continuousBackupTables, List<BackupInfo> backupInfos,
+    PointInTimeRestoreRequest request) throws IOException {
+    BackupInfo backupInfo =
+      getValidBackupInfo(sourceTable, targetTable, endTime, 
continuousBackupTables, backupInfos);
+    if (backupInfo == null) {
+      String errorMsg = "Could not find a valid backup and WALs for PITR for 
table: "
+        + sourceTable.getNameAsString();
+      LOG.error(errorMsg);
+      throw new IOException(errorMsg);
+    }
+
+    RestoreRequest restoreRequest = 
BackupUtils.createRestoreRequest(backupInfo.getBackupRootDir(),
+      backupInfo.getBackupId(), false, new TableName[] { sourceTable },
+      new TableName[] { targetTable }, request.isOverwrite());
+
+    restore(restoreRequest);
+    replayWal(sourceTable, targetTable, backupInfo.getStartTs(), endTime);
+  }
+
+  /**
+   * Replays WALs to bring the table to the desired state.
+   */
+  private void replayWal(TableName sourceTable, TableName targetTable, long 
startTime, long endTime)
+    throws IOException {
+    String walBackupDir = 
conn.getConfiguration().get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
+    Path walDirPath = new Path(walBackupDir);
+    LOG.info(
+      "Starting WAL replay for source: {}, target: {}, time range: {} - {}, 
WAL backup dir: {}",
+      sourceTable, targetTable, startTime, endTime, walDirPath);
+
+    List<String> validDirs =
+      getValidWalDirs(conn.getConfiguration(), walDirPath, startTime, endTime);
+    if (validDirs.isEmpty()) {
+      LOG.warn("No valid WAL directories found for range {} - {}. Skipping WAL 
replay.", startTime,
+        endTime);
+      return;
+    }
+
+    executeWalReplay(validDirs, sourceTable, targetTable, startTime, endTime);
+  }
+
+  /**
+   * Fetches valid WAL directories based on the given time range.
+   */
+  private List<String> getValidWalDirs(Configuration conf, Path walBackupDir, 
long startTime,
+    long endTime) throws IOException {
+    FileSystem backupFs = FileSystem.get(walBackupDir.toUri(), conf);
+    FileStatus[] dayDirs = backupFs.listStatus(new Path(walBackupDir, 
WALS_DIR));
+
+    List<String> validDirs = new ArrayList<>();
+    SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+
+    for (FileStatus dayDir : dayDirs) {
+      if (!dayDir.isDirectory()) {
+        continue; // Skip files, only process directories
+      }
+
+      String dirName = dayDir.getPath().getName();
+      try {
+        Date dirDate = dateFormat.parse(dirName);
+        long dirStartTime = dirDate.getTime(); // Start of that day (00:00:00)
+        long dirEndTime = dirStartTime + ONE_DAY_IN_MILLISECONDS - 1; // End 
time of the day
+        // (23:59:59)

Review Comment:
   It feels a little odd to have an inline comment on two lines.  It's probably 
because of `mvn spotless:apply`.  You can probably get it all on one line if 
you trim the comment down:
   
   ```suggestion
           long dirEndTime = dirStartTime + ONE_DAY_IN_MILLISECONDS - 1; // End 
time of day (23:59:59)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to