This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 30608d5399 avoids uneeded waiting on tablet w/o location in bulk load
(#5471)
30608d5399 is described below
commit 30608d5399e8a30cc61e73768b29967902af3747
Author: Keith Turner <[email protected]>
AuthorDate: Mon Apr 14 15:58:35 2025 -0400
avoids uneeded waiting on tablet w/o location in bulk load (#5471)
fixes #5469
---
.../manager/tableOps/bulkVer2/LoadFiles.java | 30 +++++-----
.../manager/tableOps/bulkVer2/LoadFilesTest.java | 65 +++++++++++++++++++++-
2 files changed, 80 insertions(+), 15 deletions(-)
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
index 603de073cc..4f1d8ecc66 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
@@ -160,7 +160,7 @@ class LoadFiles extends ManagerRepo {
abstract long finish() throws Exception;
}
- private static class OnlineLoader extends Loader {
+ static class OnlineLoader extends Loader {
private final int maxConnections;
long timeInMillis;
@@ -302,7 +302,7 @@ class LoadFiles extends ManagerRepo {
}
}
- private void addToQueue(HostAndPort server, KeyExtent extent,
+ protected void addToQueue(HostAndPort server, KeyExtent extent,
Map<String,MapFileInfo> thriftImports) {
if (!thriftImports.isEmpty()) {
tabletsAdded++;
@@ -325,29 +325,33 @@ class LoadFiles extends ManagerRepo {
// send files to tablet sever
// ideally there should only be one tablet location to send all the
files
- Location location = tablet.getLocation();
- HostAndPort server = null;
- if (location == null) {
- locationLess++;
- continue;
- } else {
- server = location.getHostAndPort();
- }
-
Set<TabletFile> loadedFiles = tablet.getLoaded().keySet();
+ Location location = tablet.getLocation();
+
Map<String,MapFileInfo> thriftImports = new HashMap<>();
+ boolean needToLoad = false;
for (final Bulk.FileInfo fileInfo : files) {
Path fullPath = new Path(bulkDir, fileInfo.getFileName());
TabletFile bulkFile = new TabletFile(fullPath);
if (!loadedFiles.contains(bulkFile)) {
- thriftImports.put(fileInfo.getFileName(), new
MapFileInfo(fileInfo.getEstFileSize()));
+ if (location == null) {
+ needToLoad = true;
+ break;
+ } else {
+ thriftImports.put(fileInfo.getFileName(), new
MapFileInfo(fileInfo.getEstFileSize()));
+ }
}
}
- addToQueue(server, tablet.getExtent(), thriftImports);
+ if (location != null) {
+ addToQueue(location.getHostAndPort(), tablet.getExtent(),
thriftImports);
+ } else if (needToLoad) {
+ // tablet has no location and files need to be loaded so need to
wait tablet
+ locationLess++;
+ } // else tablet has no location but all files are already loaded for
it so nothing to do
}
sendQueued(4 * 1024 * 1024);
diff --git
a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFilesTest.java
b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFilesTest.java
index 31d8cfd828..f012801909 100644
---
a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFilesTest.java
+++
b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFilesTest.java
@@ -19,9 +19,15 @@
package org.apache.accumulo.manager.tableOps.bulkVer2;
import static
org.apache.accumulo.manager.tableOps.bulkVer2.PrepBulkImportTest.nke;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.strictMock;
+import static org.easymock.EasyMock.verify;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
import java.util.ArrayList;
import java.util.HashMap;
@@ -32,11 +38,15 @@ import java.util.Map;
import org.apache.accumulo.core.clientImpl.bulk.Bulk.FileInfo;
import org.apache.accumulo.core.clientImpl.bulk.Bulk.Files;
import org.apache.accumulo.core.clientImpl.bulk.LoadMappingIterator;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.dataImpl.thrift.MapFileInfo;
import org.apache.accumulo.core.fate.FateTxId;
+import org.apache.accumulo.core.metadata.TabletFile;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
+import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.manager.Manager;
import
org.apache.accumulo.manager.tableOps.bulkVer2.LoadFiles.ImportTimingStats;
import
org.apache.accumulo.manager.tableOps.bulkVer2.LoadFiles.TabletsMetadataFactory;
@@ -160,10 +170,10 @@ public class LoadFilesTest {
Manager manager = EasyMock.createMock(Manager.class);
Path bulkDir = EasyMock.createMock(Path.class);
- EasyMock.replay(manager, bulkDir);
+ replay(manager, bulkDir);
LoadFiles.loadFiles(cl, info, bulkDir, lmi, tmf, manager, txid, 0);
- EasyMock.verify(manager, bulkDir);
+ verify(manager, bulkDir);
List<CaptureLoader.LoadResult> results = cl.getLoadResults();
assertEquals(loadRanges.size(), results.size());
@@ -301,4 +311,55 @@ public class LoadFilesTest {
assertTrue(extents.contains(nke("z", null)));
}
+
+ /**
+ * Test tablets without locations that have loaded files and do not have
loaded files.
+ *
+ */
+ @Test
+ public void testLoadLocation() throws Exception {
+
+ var loader = new
LoadFiles.OnlineLoader(DefaultConfiguration.getInstance()) {
+ @Override
+ protected void addToQueue(HostAndPort server, KeyExtent extent,
+ Map<String,MapFileInfo> thriftImports) {
+ fail();
+ }
+ };
+
+ Path bulkDir = new Path("file:/accumulo/tables/1/b-00001");
+
+ Path fullPath = new Path(bulkDir, "f1.rf");
+ TabletFile loaded1 = new TabletFile(fullPath);
+
+ // Tablet with no location and no loaded files
+ TabletMetadata tablet1 = createMock(TabletMetadata.class);
+ expect(tablet1.getLocation()).andReturn(null).once();
+ expect(tablet1.getLoaded()).andReturn(Map.of()).once();
+
+ // Tablet with no location and loaded files
+ TabletMetadata tablet2 = createMock(TabletMetadata.class);
+ expect(tablet2.getLocation()).andReturn(null).once();
+ expect(tablet2.getLoaded()).andReturn(Map.of(loaded1, 123456789L)).once();
+
+ Manager manager = strictMock(Manager.class);
+
expect(manager.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).once();
+
+ replay(tablet1, tablet2, manager);
+
+ loader.start(bulkDir, manager, 123456789L, false);
+
+ Files files = new Files(List.of(new FileInfo("f1.rf", 50, 50)));
+
+ // Since this tablet already has loaded the files the locationLess counter
should not be
+ // incremented
+ loader.load(List.of(tablet2), files);
+ assertEquals(0, loader.locationLess);
+
+ // Since this tablet has not loaded files the locationLess counter should
be incremented
+ loader.load(List.of(tablet1), files);
+ assertEquals(1, loader.locationLess);
+
+ verify(tablet1, tablet2, manager);
+ }
}