This is an automated email from the ASF dual-hosted git repository. mmiller pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 74b48da924 Add new blip count to GC (#2672) 74b48da924 is described below commit 74b48da924ee5c2cdd378786f24275dd77266fc5 Author: Mike Miller <mmil...@apache.org> AuthorDate: Wed May 4 16:12:13 2022 -0400 Add new blip count to GC (#2672) * Update GarbageCollectionTest to count blips * Split up confirmDeletes into 3 methods --- .../accumulo/core/gc/thrift/GcCycleStats.java | 104 ++++++++++++++++++- core/src/main/thrift/gc.thrift | 1 + .../accumulo/gc/GarbageCollectionAlgorithm.java | 114 ++++++++++++--------- .../apache/accumulo/gc/SimpleGarbageCollector.java | 7 +- .../apache/accumulo/gc/GarbageCollectionTest.java | 7 +- 5 files changed, 175 insertions(+), 58 deletions(-) diff --git a/core/src/main/thrift-gen-java/org/apache/accumulo/core/gc/thrift/GcCycleStats.java b/core/src/main/thrift-gen-java/org/apache/accumulo/core/gc/thrift/GcCycleStats.java index c28b60c15a..6aa14a0bfc 100644 --- a/core/src/main/thrift-gen-java/org/apache/accumulo/core/gc/thrift/GcCycleStats.java +++ b/core/src/main/thrift-gen-java/org/apache/accumulo/core/gc/thrift/GcCycleStats.java @@ -34,6 +34,7 @@ public class GcCycleStats implements org.apache.thrift.TBase<GcCycleStats, GcCyc private static final org.apache.thrift.protocol.TField IN_USE_FIELD_DESC = new org.apache.thrift.protocol.TField("inUse", org.apache.thrift.protocol.TType.I64, (short)4); private static final org.apache.thrift.protocol.TField DELETED_FIELD_DESC = new org.apache.thrift.protocol.TField("deleted", org.apache.thrift.protocol.TType.I64, (short)5); private static final org.apache.thrift.protocol.TField ERRORS_FIELD_DESC = new org.apache.thrift.protocol.TField("errors", org.apache.thrift.protocol.TType.I64, (short)6); + private static final org.apache.thrift.protocol.TField BULKS_FIELD_DESC = new org.apache.thrift.protocol.TField("bulks", org.apache.thrift.protocol.TType.I64, (short)7); private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new GcCycleStatsStandardSchemeFactory(); private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new GcCycleStatsTupleSchemeFactory(); @@ -44,6 +45,7 @@ public class GcCycleStats implements org.apache.thrift.TBase<GcCycleStats, GcCyc public long inUse; // required public long deleted; // required public long errors; // required + public long bulks; // required /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -52,7 +54,8 @@ public class GcCycleStats implements org.apache.thrift.TBase<GcCycleStats, GcCyc CANDIDATES((short)3, "candidates"), IN_USE((short)4, "inUse"), DELETED((short)5, "deleted"), - ERRORS((short)6, "errors"); + ERRORS((short)6, "errors"), + BULKS((short)7, "bulks"); private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>(); @@ -80,6 +83,8 @@ public class GcCycleStats implements org.apache.thrift.TBase<GcCycleStats, GcCyc return DELETED; case 6: // ERRORS return ERRORS; + case 7: // BULKS + return BULKS; default: return null; } @@ -127,6 +132,7 @@ public class GcCycleStats implements org.apache.thrift.TBase<GcCycleStats, GcCyc private static final int __INUSE_ISSET_ID = 3; private static final int __DELETED_ISSET_ID = 4; private static final int __ERRORS_ISSET_ID = 5; + private static final int __BULKS_ISSET_ID = 6; private byte __isset_bitfield = 0; public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { @@ -143,6 +149,8 @@ public class GcCycleStats implements org.apache.thrift.TBase<GcCycleStats, GcCyc new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); tmpMap.put(_Fields.ERRORS, new org.apache.thrift.meta_data.FieldMetaData("errors", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); + tmpMap.put(_Fields.BULKS, new org.apache.thrift.meta_data.FieldMetaData("bulks", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(GcCycleStats.class, metaDataMap); } @@ -156,7 +164,8 @@ public class GcCycleStats implements org.apache.thrift.TBase<GcCycleStats, GcCyc long candidates, long inUse, long deleted, - long errors) + long errors, + long bulks) { this(); this.started = started; @@ -171,6 +180,8 @@ public class GcCycleStats implements org.apache.thrift.TBase<GcCycleStats, GcCyc setDeletedIsSet(true); this.errors = errors; setErrorsIsSet(true); + this.bulks = bulks; + setBulksIsSet(true); } /** @@ -184,6 +195,7 @@ public class GcCycleStats implements org.apache.thrift.TBase<GcCycleStats, GcCyc this.inUse = other.inUse; this.deleted = other.deleted; this.errors = other.errors; + this.bulks = other.bulks; } public GcCycleStats deepCopy() { @@ -204,6 +216,8 @@ public class GcCycleStats implements org.apache.thrift.TBase<GcCycleStats, GcCyc this.deleted = 0; setErrorsIsSet(false); this.errors = 0; + setBulksIsSet(false); + this.bulks = 0; } public long getStarted() { @@ -344,6 +358,29 @@ public class GcCycleStats implements org.apache.thrift.TBase<GcCycleStats, GcCyc __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __ERRORS_ISSET_ID, value); } + public long getBulks() { + return this.bulks; + } + + public GcCycleStats setBulks(long bulks) { + this.bulks = bulks; + setBulksIsSet(true); + return this; + } + + public void unsetBulks() { + __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __BULKS_ISSET_ID); + } + + /** Returns true if field bulks is set (has been assigned a value) and false otherwise */ + public boolean isSetBulks() { + return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __BULKS_ISSET_ID); + } + + public void setBulksIsSet(boolean value) { + __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __BULKS_ISSET_ID, value); + } + public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) { switch (field) { case STARTED: @@ -394,6 +431,14 @@ public class GcCycleStats implements org.apache.thrift.TBase<GcCycleStats, GcCyc } break; + case BULKS: + if (value == null) { + unsetBulks(); + } else { + setBulks((java.lang.Long)value); + } + break; + } } @@ -418,6 +463,9 @@ public class GcCycleStats implements org.apache.thrift.TBase<GcCycleStats, GcCyc case ERRORS: return getErrors(); + case BULKS: + return getBulks(); + } throw new java.lang.IllegalStateException(); } @@ -441,6 +489,8 @@ public class GcCycleStats implements org.apache.thrift.TBase<GcCycleStats, GcCyc return isSetDeleted(); case ERRORS: return isSetErrors(); + case BULKS: + return isSetBulks(); } throw new java.lang.IllegalStateException(); } @@ -512,6 +562,15 @@ public class GcCycleStats implements org.apache.thrift.TBase<GcCycleStats, GcCyc return false; } + boolean this_present_bulks = true; + boolean that_present_bulks = true; + if (this_present_bulks || that_present_bulks) { + if (!(this_present_bulks && that_present_bulks)) + return false; + if (this.bulks != that.bulks) + return false; + } + return true; } @@ -531,6 +590,8 @@ public class GcCycleStats implements org.apache.thrift.TBase<GcCycleStats, GcCyc hashCode = hashCode * 8191 + org.apache.thrift.TBaseHelper.hashCode(errors); + hashCode = hashCode * 8191 + org.apache.thrift.TBaseHelper.hashCode(bulks); + return hashCode; } @@ -602,6 +663,16 @@ public class GcCycleStats implements org.apache.thrift.TBase<GcCycleStats, GcCyc return lastComparison; } } + lastComparison = java.lang.Boolean.compare(isSetBulks(), other.isSetBulks()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetBulks()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.bulks, other.bulks); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -646,6 +717,10 @@ public class GcCycleStats implements org.apache.thrift.TBase<GcCycleStats, GcCyc sb.append("errors:"); sb.append(this.errors); first = false; + if (!first) sb.append(", "); + sb.append("bulks:"); + sb.append(this.bulks); + first = false; sb.append(")"); return sb.toString(); } @@ -739,6 +814,14 @@ public class GcCycleStats implements org.apache.thrift.TBase<GcCycleStats, GcCyc org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 7: // BULKS + if (schemeField.type == org.apache.thrift.protocol.TType.I64) { + struct.bulks = iprot.readI64(); + struct.setBulksIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -772,6 +855,9 @@ public class GcCycleStats implements org.apache.thrift.TBase<GcCycleStats, GcCyc oprot.writeFieldBegin(ERRORS_FIELD_DESC); oprot.writeI64(struct.errors); oprot.writeFieldEnd(); + oprot.writeFieldBegin(BULKS_FIELD_DESC); + oprot.writeI64(struct.bulks); + oprot.writeFieldEnd(); oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -808,7 +894,10 @@ public class GcCycleStats implements org.apache.thrift.TBase<GcCycleStats, GcCyc if (struct.isSetErrors()) { optionals.set(5); } - oprot.writeBitSet(optionals, 6); + if (struct.isSetBulks()) { + optionals.set(6); + } + oprot.writeBitSet(optionals, 7); if (struct.isSetStarted()) { oprot.writeI64(struct.started); } @@ -827,12 +916,15 @@ public class GcCycleStats implements org.apache.thrift.TBase<GcCycleStats, GcCyc if (struct.isSetErrors()) { oprot.writeI64(struct.errors); } + if (struct.isSetBulks()) { + oprot.writeI64(struct.bulks); + } } @Override public void read(org.apache.thrift.protocol.TProtocol prot, GcCycleStats struct) throws org.apache.thrift.TException { org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot; - java.util.BitSet incoming = iprot.readBitSet(6); + java.util.BitSet incoming = iprot.readBitSet(7); if (incoming.get(0)) { struct.started = iprot.readI64(); struct.setStartedIsSet(true); @@ -857,6 +949,10 @@ public class GcCycleStats implements org.apache.thrift.TBase<GcCycleStats, GcCyc struct.errors = iprot.readI64(); struct.setErrorsIsSet(true); } + if (incoming.get(6)) { + struct.bulks = iprot.readI64(); + struct.setBulksIsSet(true); + } } } diff --git a/core/src/main/thrift/gc.thrift b/core/src/main/thrift/gc.thrift index 7e7f7409d5..b1cb3aa7d7 100644 --- a/core/src/main/thrift/gc.thrift +++ b/core/src/main/thrift/gc.thrift @@ -30,6 +30,7 @@ struct GcCycleStats { 4:i64 inUse 5:i64 deleted 6:i64 errors + 7:i64 bulks } struct GCStatus { diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java index 64a2b39855..67e9a9df44 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java @@ -127,45 +127,11 @@ public class GarbageCollectionAlgorithm { return ret; } + /** + * Return the number of BLIP flags seen. + */ private void confirmDeletes(GarbageCollectionEnvironment gce, - SortedMap<String,String> candidateMap) throws TableNotFoundException { - boolean checkForBulkProcessingFiles = false; - Iterator<String> relativePaths = candidateMap.keySet().iterator(); - while (!checkForBulkProcessingFiles && relativePaths.hasNext()) - checkForBulkProcessingFiles |= - relativePaths.next().toLowerCase(Locale.ENGLISH).contains(Constants.BULK_PREFIX); - - if (checkForBulkProcessingFiles) { - try (Stream<String> blipStream = gce.getBlipPaths()) { - Iterator<String> blipiter = blipStream.iterator(); - - // WARNING: This block is IMPORTANT - // You MUST REMOVE candidates that are in the same folder as a bulk - // processing flag! - - while (blipiter.hasNext()) { - String blipPath = blipiter.next(); - blipPath = makeRelative(blipPath, 2); - - Iterator<String> tailIter = candidateMap.tailMap(blipPath).keySet().iterator(); - - int count = 0; - - while (tailIter.hasNext()) { - if (tailIter.next().startsWith(blipPath)) { - count++; - tailIter.remove(); - } else { - break; - } - } - - if (count > 0) - log.debug("Folder has bulk processing flag: {}", blipPath); - } - } - } - + SortedMap<String,String> candidateMap) { Iterator<Reference> iter = gce.getReferences().iterator(); while (iter.hasNext()) { Reference ref = iter.next(); @@ -200,17 +166,61 @@ public class GarbageCollectionAlgorithm { String dir = reference.substring(0, reference.lastIndexOf('/')); if (candidateMap.remove(dir) != null) log.debug("Candidate was still in use: {}", reference); + } + } + } + + private long removeBlipCandidates(GarbageCollectionEnvironment gce, + SortedMap<String,String> candidateMap) throws TableNotFoundException { + boolean checkForBulkProcessingFiles = false; + long blipCount = 0; + Iterator<String> relativePaths = candidateMap.keySet().iterator(); + + while (!checkForBulkProcessingFiles && relativePaths.hasNext()) + checkForBulkProcessingFiles |= + relativePaths.next().toLowerCase(Locale.ENGLISH).contains(Constants.BULK_PREFIX); + + if (checkForBulkProcessingFiles) { + try (Stream<String> blipStream = gce.getBlipPaths()) { + Iterator<String> blipiter = blipStream.iterator(); + + // WARNING: This block is IMPORTANT + // You MUST REMOVE candidates that are in the same folder as a bulk + // processing flag! + + while (blipiter.hasNext()) { + String blipPath = blipiter.next(); + blipPath = makeRelative(blipPath, 2); + + Iterator<String> tailIter = candidateMap.tailMap(blipPath).keySet().iterator(); + + int count = 0; + + while (tailIter.hasNext()) { + if (tailIter.next().startsWith(blipPath)) { + count++; + tailIter.remove(); + } else { + break; + } + } + if (count > 0) { + log.debug("Folder has bulk processing flag: {}", blipPath); + blipCount++; + } + } } } - confirmDeletesFromReplication(gce.getReplicationNeededIterator(), - candidateMap.entrySet().iterator()); + return blipCount; } - protected void confirmDeletesFromReplication( - Iterator<Entry<String,Status>> replicationNeededIterator, - Iterator<Entry<String,String>> candidateMapIterator) { + protected void confirmDeletesFromReplication(GarbageCollectionEnvironment gce, + SortedMap<String,String> candidateMap) { + var replicationNeededIterator = gce.getReplicationNeededIterator(); + var candidateMapIterator = candidateMap.entrySet().iterator(); + PeekingIterator<Entry<String,Status>> pendingReplication = Iterators.peekingIterator(replicationNeededIterator); PeekingIterator<Entry<String,String>> candidates = @@ -269,17 +279,21 @@ public class GarbageCollectionAlgorithm { } } - private void confirmDeletesTrace(GarbageCollectionEnvironment gce, + private long confirmDeletesTrace(GarbageCollectionEnvironment gce, SortedMap<String,String> candidateMap) throws TableNotFoundException { + long blips = 0; Span confirmDeletesSpan = TraceUtil.startSpan(this.getClass(), "confirmDeletes"); try (Scope scope = confirmDeletesSpan.makeCurrent()) { + blips = removeBlipCandidates(gce, candidateMap); confirmDeletes(gce, candidateMap); + confirmDeletesFromReplication(gce, candidateMap); } catch (Exception e) { TraceUtil.setException(confirmDeletesSpan, e, true); throw e; } finally { confirmDeletesSpan.end(); } + return blips; } private void deleteConfirmed(GarbageCollectionEnvironment gce, @@ -297,9 +311,10 @@ public class GarbageCollectionAlgorithm { cleanUpDeletedTableDirs(gce, candidateMap); } - public void collect(GarbageCollectionEnvironment gce) throws TableNotFoundException, IOException { + public long collect(GarbageCollectionEnvironment gce) throws TableNotFoundException, IOException { Iterator<String> candidatesIter = gce.getCandidates(); + long totalBlips = 0; while (candidatesIter.hasNext()) { List<String> batchOfCandidates; @@ -312,14 +327,15 @@ public class GarbageCollectionAlgorithm { } finally { candidatesSpan.end(); } - deleteBatch(gce, batchOfCandidates); + totalBlips += deleteBatch(gce, batchOfCandidates); } + return totalBlips; } /** * Given a sub-list of possible deletion candidates, process and remove valid deletion candidates. */ - private void deleteBatch(GarbageCollectionEnvironment gce, List<String> currentBatch) + private long deleteBatch(GarbageCollectionEnvironment gce, List<String> currentBatch) throws TableNotFoundException, IOException { long origSize = currentBatch.size(); @@ -327,10 +343,12 @@ public class GarbageCollectionAlgorithm { SortedMap<String,String> candidateMap = makeRelative(currentBatch); - confirmDeletesTrace(gce, candidateMap); + long blips = confirmDeletesTrace(gce, candidateMap); gce.incrementInUseStat(origSize - candidateMap.size()); deleteConfirmed(gce, candidateMap); + + return blips; } } diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index 0759961d08..caaf467b77 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@ -207,17 +207,17 @@ public class SimpleGarbageCollector extends AbstractServer implements Iface { var userGC = new GCRun(DataLevel.USER, getContext()); log.info("Starting Root table Garbage Collection."); - new GarbageCollectionAlgorithm().collect(rootGC); + status.current.bulks += new GarbageCollectionAlgorithm().collect(rootGC); incrementStatsForRun(rootGC); logStats(); log.info("Starting Metadata table Garbage Collection."); - new GarbageCollectionAlgorithm().collect(mdGC); + status.current.bulks += new GarbageCollectionAlgorithm().collect(mdGC); incrementStatsForRun(mdGC); logStats(); log.info("Starting User table Garbage Collection."); - new GarbageCollectionAlgorithm().collect(userGC); + status.current.bulks += new GarbageCollectionAlgorithm().collect(userGC); incrementStatsForRun(userGC); logStats(); @@ -338,6 +338,7 @@ public class SimpleGarbageCollector extends AbstractServer implements Iface { log.info("Number of data file candidates still in use: {}", status.current.inUse); log.info("Number of successfully deleted data files: {}", status.current.deleted); log.info("Number of data files delete failures: {}", status.current.errors); + log.info("Number of bulk imports in progress: {}", status.current.bulks); } /** diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectionTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectionTest.java index 6da3fca431..f67e43c97f 100644 --- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectionTest.java +++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectionTest.java @@ -622,15 +622,16 @@ public class GarbageCollectionTest { gce.candidates.clear(); gce.candidates.add("/9/default_tablet"); gce.candidates.add("/9/default_tablet/someFile"); - gca.collect(gce); + long blipCount = gca.collect(gce); assertRemoved(gce); + assertEquals(0, blipCount); gce = new TestGCE(); gce.blips.add("/1636/b-0001"); gce.candidates.add("/1636/b-0001/I0000"); - gca.collect(gce); + blipCount = gca.collect(gce); assertRemoved(gce); - + assertEquals(1, blipCount); } @Test