virajjasani commented on code in PR #5523:
URL: https://github.com/apache/hadoop/pull/5523#discussion_r1156450948


##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java:
##########
@@ -227,7 +270,23 @@ public <T extends BaseRecord> QueryResult<T> get(Class<T> 
clazz)
     if (metrics != null) {
       metrics.addRead(monotonicNow() - start);
     }
-    return new QueryResult<T>(ret, getTime());
+    return new QueryResult<>(result, getTime());
+  }
+
+  private <T extends BaseRecord> Void 
getRecordsFromFileAndRemoveOldTmpRecords(Class<T> clazz,

Review Comment:
   Done, thanks



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java:
##########
@@ -345,36 +397,71 @@ public <T extends BaseRecord> boolean putAll(
     }
 
     // Write the records
-    boolean success = true;
-    for (Entry<String, T> entry : toWrite.entrySet()) {
-      String recordPath = entry.getKey();
-      String recordPathTemp = recordPath + "." + now() + TMP_MARK;
-      boolean recordWrittenSuccessfully = true;
-      try (BufferedWriter writer = getWriter(recordPathTemp)) {
-        T record = entry.getValue();
-        String line = serializeString(record);
-        writer.write(line);
-      } catch (IOException e) {
-        LOG.error("Cannot write {}", recordPathTemp, e);
-        recordWrittenSuccessfully = false;
-        success = false;
+    final AtomicBoolean success = new AtomicBoolean(true);
+    final List<Callable<Void>> callables = new ArrayList<>();
+    toWrite.entrySet().forEach(entry -> callables.add(() -> 
writeRecordToFile(success, entry)));
+    if (this.concurrentStoreAccessPool != null) {
+      // Write records concurrently
+      List<Future<Void>> futures = null;
+      try {
+        futures = this.concurrentStoreAccessPool.invokeAll(callables);
+      } catch (InterruptedException e) {
+        success.set(false);
+        LOG.error("Failed to put record concurrently.", e);
       }
-      // Commit
-      if (recordWrittenSuccessfully && !rename(recordPathTemp, recordPath)) {
-        LOG.error("Failed committing record into {}", recordPath);
-        success = false;
+      if (futures != null) {
+        for (Future<Void> future : futures) {
+          try {
+            future.get();
+          } catch (InterruptedException | ExecutionException e) {
+            success.set(false);
+            LOG.error("Failed to retrieve results from concurrent record put 
runs.", e);
+          }
+        }
       }
+    } else {
+      // Write records serially
+      callables.forEach(callable -> {
+        try {
+          callable.call();
+        } catch (Exception e) {
+          success.set(false);
+          LOG.error("Failed to put record.", e);
+        }
+      });
     }
 
     long end = monotonicNow();
     if (metrics != null) {
-      if (success) {
+      if (success.get()) {
         metrics.addWrite(end - start);
       } else {
         metrics.addFailure(end - start);
       }
     }
-    return success;
+    return success.get();
+  }
+
+  private <T extends BaseRecord> Void writeRecordToFile(AtomicBoolean success,

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to