This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 30608d5399 avoids uneeded waiting on tablet w/o location in bulk load 
(#5471)
30608d5399 is described below

commit 30608d5399e8a30cc61e73768b29967902af3747
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Mon Apr 14 15:58:35 2025 -0400

    avoids uneeded waiting on tablet w/o location in bulk load (#5471)
    
    fixes #5469
---
 .../manager/tableOps/bulkVer2/LoadFiles.java       | 30 +++++-----
 .../manager/tableOps/bulkVer2/LoadFilesTest.java   | 65 +++++++++++++++++++++-
 2 files changed, 80 insertions(+), 15 deletions(-)

diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
index 603de073cc..4f1d8ecc66 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
@@ -160,7 +160,7 @@ class LoadFiles extends ManagerRepo {
     abstract long finish() throws Exception;
   }
 
-  private static class OnlineLoader extends Loader {
+  static class OnlineLoader extends Loader {
 
     private final int maxConnections;
     long timeInMillis;
@@ -302,7 +302,7 @@ class LoadFiles extends ManagerRepo {
       }
     }
 
-    private void addToQueue(HostAndPort server, KeyExtent extent,
+    protected void addToQueue(HostAndPort server, KeyExtent extent,
         Map<String,MapFileInfo> thriftImports) {
       if (!thriftImports.isEmpty()) {
         tabletsAdded++;
@@ -325,29 +325,33 @@ class LoadFiles extends ManagerRepo {
         // send files to tablet sever
         // ideally there should only be one tablet location to send all the 
files
 
-        Location location = tablet.getLocation();
-        HostAndPort server = null;
-        if (location == null) {
-          locationLess++;
-          continue;
-        } else {
-          server = location.getHostAndPort();
-        }
-
         Set<TabletFile> loadedFiles = tablet.getLoaded().keySet();
 
+        Location location = tablet.getLocation();
+
         Map<String,MapFileInfo> thriftImports = new HashMap<>();
 
+        boolean needToLoad = false;
         for (final Bulk.FileInfo fileInfo : files) {
           Path fullPath = new Path(bulkDir, fileInfo.getFileName());
           TabletFile bulkFile = new TabletFile(fullPath);
 
           if (!loadedFiles.contains(bulkFile)) {
-            thriftImports.put(fileInfo.getFileName(), new 
MapFileInfo(fileInfo.getEstFileSize()));
+            if (location == null) {
+              needToLoad = true;
+              break;
+            } else {
+              thriftImports.put(fileInfo.getFileName(), new 
MapFileInfo(fileInfo.getEstFileSize()));
+            }
           }
         }
 
-        addToQueue(server, tablet.getExtent(), thriftImports);
+        if (location != null) {
+          addToQueue(location.getHostAndPort(), tablet.getExtent(), 
thriftImports);
+        } else if (needToLoad) {
+          // tablet has no location and files need to be loaded so need to 
wait tablet
+          locationLess++;
+        } // else tablet has no location but all files are already loaded for 
it so nothing to do
       }
 
       sendQueued(4 * 1024 * 1024);
diff --git 
a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFilesTest.java
 
b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFilesTest.java
index 31d8cfd828..f012801909 100644
--- 
a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFilesTest.java
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFilesTest.java
@@ -19,9 +19,15 @@
 package org.apache.accumulo.manager.tableOps.bulkVer2;
 
 import static 
org.apache.accumulo.manager.tableOps.bulkVer2.PrepBulkImportTest.nke;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.strictMock;
+import static org.easymock.EasyMock.verify;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -32,11 +38,15 @@ import java.util.Map;
 import org.apache.accumulo.core.clientImpl.bulk.Bulk.FileInfo;
 import org.apache.accumulo.core.clientImpl.bulk.Bulk.Files;
 import org.apache.accumulo.core.clientImpl.bulk.LoadMappingIterator;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.dataImpl.thrift.MapFileInfo;
 import org.apache.accumulo.core.fate.FateTxId;
+import org.apache.accumulo.core.metadata.TabletFile;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
+import org.apache.accumulo.core.util.HostAndPort;
 import org.apache.accumulo.manager.Manager;
 import 
org.apache.accumulo.manager.tableOps.bulkVer2.LoadFiles.ImportTimingStats;
 import 
org.apache.accumulo.manager.tableOps.bulkVer2.LoadFiles.TabletsMetadataFactory;
@@ -160,10 +170,10 @@ public class LoadFilesTest {
 
     Manager manager = EasyMock.createMock(Manager.class);
     Path bulkDir = EasyMock.createMock(Path.class);
-    EasyMock.replay(manager, bulkDir);
+    replay(manager, bulkDir);
 
     LoadFiles.loadFiles(cl, info, bulkDir, lmi, tmf, manager, txid, 0);
-    EasyMock.verify(manager, bulkDir);
+    verify(manager, bulkDir);
     List<CaptureLoader.LoadResult> results = cl.getLoadResults();
     assertEquals(loadRanges.size(), results.size());
 
@@ -301,4 +311,55 @@ public class LoadFilesTest {
     assertTrue(extents.contains(nke("z", null)));
 
   }
+
+  /**
+   * Test tablets without locations that have loaded files and do not have 
loaded files.
+   *
+   */
+  @Test
+  public void testLoadLocation() throws Exception {
+
+    var loader = new 
LoadFiles.OnlineLoader(DefaultConfiguration.getInstance()) {
+      @Override
+      protected void addToQueue(HostAndPort server, KeyExtent extent,
+          Map<String,MapFileInfo> thriftImports) {
+        fail();
+      }
+    };
+
+    Path bulkDir = new Path("file:/accumulo/tables/1/b-00001");
+
+    Path fullPath = new Path(bulkDir, "f1.rf");
+    TabletFile loaded1 = new TabletFile(fullPath);
+
+    // Tablet with no location and no loaded files
+    TabletMetadata tablet1 = createMock(TabletMetadata.class);
+    expect(tablet1.getLocation()).andReturn(null).once();
+    expect(tablet1.getLoaded()).andReturn(Map.of()).once();
+
+    // Tablet with no location and loaded files
+    TabletMetadata tablet2 = createMock(TabletMetadata.class);
+    expect(tablet2.getLocation()).andReturn(null).once();
+    expect(tablet2.getLoaded()).andReturn(Map.of(loaded1, 123456789L)).once();
+
+    Manager manager = strictMock(Manager.class);
+    
expect(manager.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).once();
+
+    replay(tablet1, tablet2, manager);
+
+    loader.start(bulkDir, manager, 123456789L, false);
+
+    Files files = new Files(List.of(new FileInfo("f1.rf", 50, 50)));
+
+    // Since this tablet already has loaded the files the locationLess counter 
should not be
+    // incremented
+    loader.load(List.of(tablet2), files);
+    assertEquals(0, loader.locationLess);
+
+    // Since this tablet has not loaded files the locationLess counter should 
be incremented
+    loader.load(List.of(tablet1), files);
+    assertEquals(1, loader.locationLess);
+
+    verify(tablet1, tablet2, manager);
+  }
 }

Reply via email to