taklwu commented on code in PR #7617:
URL: https://github.com/apache/hbase/pull/7617#discussion_r2943109855


##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java:
##########
@@ -106,9 +117,25 @@ public final void run() {
         continue;
       }
       try {
-        WALEntryBatch entryBatch = entryReader.poll(getEntriesTimeout);
+        // check time-based offset persistence
+        if (shouldPersistLogPosition()) {
+          // Trigger offset persistence via existing retry/backoff mechanism 
in shipEdits()
+          WALEntryBatch emptyBatch = createEmptyBatchForTimeBasedFlush();
+          if (emptyBatch != null) shipEdits(emptyBatch);

Review Comment:
   nit better to keep bracket even if this is a one-liner
   
   ```suggestion
             if (emptyBatch != null) {
               shipEdits(emptyBatch);
             }
   ```



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java:
##########
@@ -154,15 +196,16 @@ protected void postFinish() {
   private void shipEdits(WALEntryBatch entryBatch) {
     List<Entry> entries = entryBatch.getWalEntries();
     int sleepMultiplier = 0;
-    if (entries.isEmpty()) {
-      updateLogPosition(entryBatch);
-      return;
-    }
     int currentSize = (int) entryBatch.getHeapSize();
     source.getSourceMetrics()
       .setTimeStampNextToReplicate(entries.get(entries.size() - 
1).getKey().getWriteTime());
     while (isActive()) {
       try {
+        if (entries.isEmpty()) {

Review Comment:
   @ankitsol is this align/update in the latest patch ?



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java:
##########
@@ -215,6 +255,13 @@ private void shipEdits(WALEntryBatch entryBatch) {
             entryBatch.getNbOperations(), (endTimeNs - startTimeNs) / 1000000);
         }
         break;
+      } catch (IOException ioe) {
+        // Offset-Persist failure is treated as fatal to this worker since it 
might come from
+        // beforePersistingReplicationOffset.
+        // ReplicationSource will restart the Shipper, and WAL reading
+        // will resume from the last successfully persisted offset
+        throw new ReplicationRuntimeException(

Review Comment:
   k, it looks like the previous updateLogPosition issue 
https://github.com/apache/hbase/pull/7617/changes#r2802692158  has been handled 
here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to