This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new 30608d5399 avoids uneeded waiting on tablet w/o location in bulk load (#5471) 30608d5399 is described below commit 30608d5399e8a30cc61e73768b29967902af3747 Author: Keith Turner <ktur...@apache.org> AuthorDate: Mon Apr 14 15:58:35 2025 -0400 avoids uneeded waiting on tablet w/o location in bulk load (#5471) fixes #5469 --- .../manager/tableOps/bulkVer2/LoadFiles.java | 30 +++++----- .../manager/tableOps/bulkVer2/LoadFilesTest.java | 65 +++++++++++++++++++++- 2 files changed, 80 insertions(+), 15 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java index 603de073cc..4f1d8ecc66 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java @@ -160,7 +160,7 @@ class LoadFiles extends ManagerRepo { abstract long finish() throws Exception; } - private static class OnlineLoader extends Loader { + static class OnlineLoader extends Loader { private final int maxConnections; long timeInMillis; @@ -302,7 +302,7 @@ class LoadFiles extends ManagerRepo { } } - private void addToQueue(HostAndPort server, KeyExtent extent, + protected void addToQueue(HostAndPort server, KeyExtent extent, Map<String,MapFileInfo> thriftImports) { if (!thriftImports.isEmpty()) { tabletsAdded++; @@ -325,29 +325,33 @@ class LoadFiles extends ManagerRepo { // send files to tablet sever // ideally there should only be one tablet location to send all the files - Location location = tablet.getLocation(); - HostAndPort server = null; - if (location == null) { - locationLess++; - continue; - } else { - server = location.getHostAndPort(); - } - Set<TabletFile> loadedFiles = tablet.getLoaded().keySet(); + Location location = tablet.getLocation(); + Map<String,MapFileInfo> thriftImports = new HashMap<>(); + boolean needToLoad = false; for (final Bulk.FileInfo fileInfo : files) { Path fullPath = new Path(bulkDir, fileInfo.getFileName()); TabletFile bulkFile = new TabletFile(fullPath); if (!loadedFiles.contains(bulkFile)) { - thriftImports.put(fileInfo.getFileName(), new MapFileInfo(fileInfo.getEstFileSize())); + if (location == null) { + needToLoad = true; + break; + } else { + thriftImports.put(fileInfo.getFileName(), new MapFileInfo(fileInfo.getEstFileSize())); + } } } - addToQueue(server, tablet.getExtent(), thriftImports); + if (location != null) { + addToQueue(location.getHostAndPort(), tablet.getExtent(), thriftImports); + } else if (needToLoad) { + // tablet has no location and files need to be loaded so need to wait tablet + locationLess++; + } // else tablet has no location but all files are already loaded for it so nothing to do } sendQueued(4 * 1024 * 1024); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFilesTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFilesTest.java index 31d8cfd828..f012801909 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFilesTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFilesTest.java @@ -19,9 +19,15 @@ package org.apache.accumulo.manager.tableOps.bulkVer2; import static org.apache.accumulo.manager.tableOps.bulkVer2.PrepBulkImportTest.nke; +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.strictMock; +import static org.easymock.EasyMock.verify; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import java.util.ArrayList; import java.util.HashMap; @@ -32,11 +38,15 @@ import java.util.Map; import org.apache.accumulo.core.clientImpl.bulk.Bulk.FileInfo; import org.apache.accumulo.core.clientImpl.bulk.Bulk.Files; import org.apache.accumulo.core.clientImpl.bulk.LoadMappingIterator; +import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.dataImpl.thrift.MapFileInfo; import org.apache.accumulo.core.fate.FateTxId; +import org.apache.accumulo.core.metadata.TabletFile; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; +import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.tableOps.bulkVer2.LoadFiles.ImportTimingStats; import org.apache.accumulo.manager.tableOps.bulkVer2.LoadFiles.TabletsMetadataFactory; @@ -160,10 +170,10 @@ public class LoadFilesTest { Manager manager = EasyMock.createMock(Manager.class); Path bulkDir = EasyMock.createMock(Path.class); - EasyMock.replay(manager, bulkDir); + replay(manager, bulkDir); LoadFiles.loadFiles(cl, info, bulkDir, lmi, tmf, manager, txid, 0); - EasyMock.verify(manager, bulkDir); + verify(manager, bulkDir); List<CaptureLoader.LoadResult> results = cl.getLoadResults(); assertEquals(loadRanges.size(), results.size()); @@ -301,4 +311,55 @@ public class LoadFilesTest { assertTrue(extents.contains(nke("z", null))); } + + /** + * Test tablets without locations that have loaded files and do not have loaded files. + * + */ + @Test + public void testLoadLocation() throws Exception { + + var loader = new LoadFiles.OnlineLoader(DefaultConfiguration.getInstance()) { + @Override + protected void addToQueue(HostAndPort server, KeyExtent extent, + Map<String,MapFileInfo> thriftImports) { + fail(); + } + }; + + Path bulkDir = new Path("file:/accumulo/tables/1/b-00001"); + + Path fullPath = new Path(bulkDir, "f1.rf"); + TabletFile loaded1 = new TabletFile(fullPath); + + // Tablet with no location and no loaded files + TabletMetadata tablet1 = createMock(TabletMetadata.class); + expect(tablet1.getLocation()).andReturn(null).once(); + expect(tablet1.getLoaded()).andReturn(Map.of()).once(); + + // Tablet with no location and loaded files + TabletMetadata tablet2 = createMock(TabletMetadata.class); + expect(tablet2.getLocation()).andReturn(null).once(); + expect(tablet2.getLoaded()).andReturn(Map.of(loaded1, 123456789L)).once(); + + Manager manager = strictMock(Manager.class); + expect(manager.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).once(); + + replay(tablet1, tablet2, manager); + + loader.start(bulkDir, manager, 123456789L, false); + + Files files = new Files(List.of(new FileInfo("f1.rf", 50, 50))); + + // Since this tablet already has loaded the files the locationLess counter should not be + // incremented + loader.load(List.of(tablet2), files); + assertEquals(0, loader.locationLess); + + // Since this tablet has not loaded files the locationLess counter should be incremented + loader.load(List.of(tablet1), files); + assertEquals(1, loader.locationLess); + + verify(tablet1, tablet2, manager); + } }