This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 1451-external-compactions-feature in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push: new 837ae7e Addressed TODOs and CBUGs 837ae7e is described below commit 837ae7edc885cce0b35d3e2ba8a0c3e246914545 Author: Keith Turner <ktur...@apache.org> AuthorDate: Mon May 10 17:29:35 2021 -0400 Addressed TODOs and CBUGs --- .../core/spi/compaction/CompactionJob.java | 1 - .../thrift/TExternalCompactionJob.java | 608 ++------------------- core/src/main/thrift/tabletserver.thrift | 18 +- .../accumulo/compactor/CompactionEnvironment.java | 23 +- .../org/apache/accumulo/compactor/Compactor.java | 33 +- .../tserver/compactions/CompactionService.java | 3 - .../compactions/ExternalCompactionExecutor.java | 9 +- .../tserver/compactions/ExternalCompactionJob.java | 50 +- .../accumulo/tserver/tablet/CompactableImpl.java | 12 +- 9 files changed, 90 insertions(+), 667 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionJob.java b/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionJob.java index 8809018..d651a70 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionJob.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionJob.java @@ -30,7 +30,6 @@ import org.apache.accumulo.core.client.admin.compaction.CompactableFile; */ public interface CompactionJob { - // CBUG use a lower cardinality type for priority long getPriority(); /** diff --git a/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/TExternalCompactionJob.java b/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/TExternalCompactionJob.java index 7c13248..5992a55 100644 --- a/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/TExternalCompactionJob.java +++ b/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/TExternalCompactionJob.java @@ -31,17 +31,12 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal private static final org.apache.thrift.protocol.TField EXTERNAL_COMPACTION_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("externalCompactionId", org.apache.thrift.protocol.TType.STRING, (short)1); private static final org.apache.thrift.protocol.TField EXTENT_FIELD_DESC = new org.apache.thrift.protocol.TField("extent", org.apache.thrift.protocol.TType.STRUCT, (short)2); private static final org.apache.thrift.protocol.TField FILES_FIELD_DESC = new org.apache.thrift.protocol.TField("files", org.apache.thrift.protocol.TType.LIST, (short)3); - private static final org.apache.thrift.protocol.TField PRIORITY_FIELD_DESC = new org.apache.thrift.protocol.TField("priority", org.apache.thrift.protocol.TType.I32, (short)4); - private static final org.apache.thrift.protocol.TField READ_RATE_FIELD_DESC = new org.apache.thrift.protocol.TField("readRate", org.apache.thrift.protocol.TType.I32, (short)5); - private static final org.apache.thrift.protocol.TField WRITE_RATE_FIELD_DESC = new org.apache.thrift.protocol.TField("writeRate", org.apache.thrift.protocol.TType.I32, (short)6); - private static final org.apache.thrift.protocol.TField ITERATOR_SETTINGS_FIELD_DESC = new org.apache.thrift.protocol.TField("iteratorSettings", org.apache.thrift.protocol.TType.STRUCT, (short)7); - private static final org.apache.thrift.protocol.TField TYPE_FIELD_DESC = new org.apache.thrift.protocol.TField("type", org.apache.thrift.protocol.TType.I32, (short)8); - private static final org.apache.thrift.protocol.TField REASON_FIELD_DESC = new org.apache.thrift.protocol.TField("reason", org.apache.thrift.protocol.TType.I32, (short)9); - private static final org.apache.thrift.protocol.TField OUTPUT_FILE_FIELD_DESC = new org.apache.thrift.protocol.TField("outputFile", org.apache.thrift.protocol.TType.STRING, (short)10); - private static final org.apache.thrift.protocol.TField PROPAGATE_DELETES_FIELD_DESC = new org.apache.thrift.protocol.TField("propagateDeletes", org.apache.thrift.protocol.TType.BOOL, (short)11); - private static final org.apache.thrift.protocol.TField KIND_FIELD_DESC = new org.apache.thrift.protocol.TField("kind", org.apache.thrift.protocol.TType.I32, (short)12); - private static final org.apache.thrift.protocol.TField USER_COMPACTION_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("userCompactionId", org.apache.thrift.protocol.TType.I64, (short)13); - private static final org.apache.thrift.protocol.TField TABLE_COMPACTION_PROPERTIES_FIELD_DESC = new org.apache.thrift.protocol.TField("tableCompactionProperties", org.apache.thrift.protocol.TType.MAP, (short)14); + private static final org.apache.thrift.protocol.TField ITERATOR_SETTINGS_FIELD_DESC = new org.apache.thrift.protocol.TField("iteratorSettings", org.apache.thrift.protocol.TType.STRUCT, (short)4); + private static final org.apache.thrift.protocol.TField OUTPUT_FILE_FIELD_DESC = new org.apache.thrift.protocol.TField("outputFile", org.apache.thrift.protocol.TType.STRING, (short)5); + private static final org.apache.thrift.protocol.TField PROPAGATE_DELETES_FIELD_DESC = new org.apache.thrift.protocol.TField("propagateDeletes", org.apache.thrift.protocol.TType.BOOL, (short)6); + private static final org.apache.thrift.protocol.TField KIND_FIELD_DESC = new org.apache.thrift.protocol.TField("kind", org.apache.thrift.protocol.TType.I32, (short)7); + private static final org.apache.thrift.protocol.TField USER_COMPACTION_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("userCompactionId", org.apache.thrift.protocol.TType.I64, (short)8); + private static final org.apache.thrift.protocol.TField TABLE_COMPACTION_PROPERTIES_FIELD_DESC = new org.apache.thrift.protocol.TField("tableCompactionProperties", org.apache.thrift.protocol.TType.MAP, (short)9); private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new TExternalCompactionJobStandardSchemeFactory(); private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new TExternalCompactionJobTupleSchemeFactory(); @@ -49,20 +44,7 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal public @org.apache.thrift.annotation.Nullable java.lang.String externalCompactionId; // required public @org.apache.thrift.annotation.Nullable org.apache.accumulo.core.dataImpl.thrift.TKeyExtent extent; // required public @org.apache.thrift.annotation.Nullable java.util.List<InputFile> files; // required - public int priority; // required - public int readRate; // required - public int writeRate; // required public @org.apache.thrift.annotation.Nullable IteratorConfig iteratorSettings; // required - /** - * - * @see TCompactionType - */ - public @org.apache.thrift.annotation.Nullable TCompactionType type; // required - /** - * - * @see TCompactionReason - */ - public @org.apache.thrift.annotation.Nullable TCompactionReason reason; // required public @org.apache.thrift.annotation.Nullable java.lang.String outputFile; // required public boolean propagateDeletes; // required public @org.apache.thrift.annotation.Nullable TCompactionKind kind; // required @@ -74,25 +56,12 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal EXTERNAL_COMPACTION_ID((short)1, "externalCompactionId"), EXTENT((short)2, "extent"), FILES((short)3, "files"), - PRIORITY((short)4, "priority"), - READ_RATE((short)5, "readRate"), - WRITE_RATE((short)6, "writeRate"), - ITERATOR_SETTINGS((short)7, "iteratorSettings"), - /** - * - * @see TCompactionType - */ - TYPE((short)8, "type"), - /** - * - * @see TCompactionReason - */ - REASON((short)9, "reason"), - OUTPUT_FILE((short)10, "outputFile"), - PROPAGATE_DELETES((short)11, "propagateDeletes"), - KIND((short)12, "kind"), - USER_COMPACTION_ID((short)13, "userCompactionId"), - TABLE_COMPACTION_PROPERTIES((short)14, "tableCompactionProperties"); + ITERATOR_SETTINGS((short)4, "iteratorSettings"), + OUTPUT_FILE((short)5, "outputFile"), + PROPAGATE_DELETES((short)6, "propagateDeletes"), + KIND((short)7, "kind"), + USER_COMPACTION_ID((short)8, "userCompactionId"), + TABLE_COMPACTION_PROPERTIES((short)9, "tableCompactionProperties"); private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>(); @@ -114,27 +83,17 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal return EXTENT; case 3: // FILES return FILES; - case 4: // PRIORITY - return PRIORITY; - case 5: // READ_RATE - return READ_RATE; - case 6: // WRITE_RATE - return WRITE_RATE; - case 7: // ITERATOR_SETTINGS + case 4: // ITERATOR_SETTINGS return ITERATOR_SETTINGS; - case 8: // TYPE - return TYPE; - case 9: // REASON - return REASON; - case 10: // OUTPUT_FILE + case 5: // OUTPUT_FILE return OUTPUT_FILE; - case 11: // PROPAGATE_DELETES + case 6: // PROPAGATE_DELETES return PROPAGATE_DELETES; - case 12: // KIND + case 7: // KIND return KIND; - case 13: // USER_COMPACTION_ID + case 8: // USER_COMPACTION_ID return USER_COMPACTION_ID; - case 14: // TABLE_COMPACTION_PROPERTIES + case 9: // TABLE_COMPACTION_PROPERTIES return TABLE_COMPACTION_PROPERTIES; default: return null; @@ -177,11 +136,8 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal } // isset id assignments - private static final int __PRIORITY_ISSET_ID = 0; - private static final int __READRATE_ISSET_ID = 1; - private static final int __WRITERATE_ISSET_ID = 2; - private static final int __PROPAGATEDELETES_ISSET_ID = 3; - private static final int __USERCOMPACTIONID_ISSET_ID = 4; + private static final int __PROPAGATEDELETES_ISSET_ID = 0; + private static final int __USERCOMPACTIONID_ISSET_ID = 1; private byte __isset_bitfield = 0; public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { @@ -193,18 +149,8 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal tmpMap.put(_Fields.FILES, new org.apache.thrift.meta_data.FieldMetaData("files", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST, new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, InputFile.class)))); - tmpMap.put(_Fields.PRIORITY, new org.apache.thrift.meta_data.FieldMetaData("priority", org.apache.thrift.TFieldRequirementType.DEFAULT, - new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32))); - tmpMap.put(_Fields.READ_RATE, new org.apache.thrift.meta_data.FieldMetaData("readRate", org.apache.thrift.TFieldRequirementType.DEFAULT, - new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32))); - tmpMap.put(_Fields.WRITE_RATE, new org.apache.thrift.meta_data.FieldMetaData("writeRate", org.apache.thrift.TFieldRequirementType.DEFAULT, - new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32))); tmpMap.put(_Fields.ITERATOR_SETTINGS, new org.apache.thrift.meta_data.FieldMetaData("iteratorSettings", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, IteratorConfig.class))); - tmpMap.put(_Fields.TYPE, new org.apache.thrift.meta_data.FieldMetaData("type", org.apache.thrift.TFieldRequirementType.DEFAULT, - new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, TCompactionType.class))); - tmpMap.put(_Fields.REASON, new org.apache.thrift.meta_data.FieldMetaData("reason", org.apache.thrift.TFieldRequirementType.DEFAULT, - new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, TCompactionReason.class))); tmpMap.put(_Fields.OUTPUT_FILE, new org.apache.thrift.meta_data.FieldMetaData("outputFile", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); tmpMap.put(_Fields.PROPAGATE_DELETES, new org.apache.thrift.meta_data.FieldMetaData("propagateDeletes", org.apache.thrift.TFieldRequirementType.DEFAULT, @@ -228,12 +174,7 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal java.lang.String externalCompactionId, org.apache.accumulo.core.dataImpl.thrift.TKeyExtent extent, java.util.List<InputFile> files, - int priority, - int readRate, - int writeRate, IteratorConfig iteratorSettings, - TCompactionType type, - TCompactionReason reason, java.lang.String outputFile, boolean propagateDeletes, TCompactionKind kind, @@ -244,15 +185,7 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal this.externalCompactionId = externalCompactionId; this.extent = extent; this.files = files; - this.priority = priority; - setPriorityIsSet(true); - this.readRate = readRate; - setReadRateIsSet(true); - this.writeRate = writeRate; - setWriteRateIsSet(true); this.iteratorSettings = iteratorSettings; - this.type = type; - this.reason = reason; this.outputFile = outputFile; this.propagateDeletes = propagateDeletes; setPropagateDeletesIsSet(true); @@ -280,18 +213,9 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal } this.files = __this__files; } - this.priority = other.priority; - this.readRate = other.readRate; - this.writeRate = other.writeRate; if (other.isSetIteratorSettings()) { this.iteratorSettings = new IteratorConfig(other.iteratorSettings); } - if (other.isSetType()) { - this.type = other.type; - } - if (other.isSetReason()) { - this.reason = other.reason; - } if (other.isSetOutputFile()) { this.outputFile = other.outputFile; } @@ -315,15 +239,7 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal this.externalCompactionId = null; this.extent = null; this.files = null; - setPriorityIsSet(false); - this.priority = 0; - setReadRateIsSet(false); - this.readRate = 0; - setWriteRateIsSet(false); - this.writeRate = 0; this.iteratorSettings = null; - this.type = null; - this.reason = null; this.outputFile = null; setPropagateDeletesIsSet(false); this.propagateDeletes = false; @@ -424,75 +340,6 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal } } - public int getPriority() { - return this.priority; - } - - public TExternalCompactionJob setPriority(int priority) { - this.priority = priority; - setPriorityIsSet(true); - return this; - } - - public void unsetPriority() { - __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __PRIORITY_ISSET_ID); - } - - /** Returns true if field priority is set (has been assigned a value) and false otherwise */ - public boolean isSetPriority() { - return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __PRIORITY_ISSET_ID); - } - - public void setPriorityIsSet(boolean value) { - __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __PRIORITY_ISSET_ID, value); - } - - public int getReadRate() { - return this.readRate; - } - - public TExternalCompactionJob setReadRate(int readRate) { - this.readRate = readRate; - setReadRateIsSet(true); - return this; - } - - public void unsetReadRate() { - __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __READRATE_ISSET_ID); - } - - /** Returns true if field readRate is set (has been assigned a value) and false otherwise */ - public boolean isSetReadRate() { - return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __READRATE_ISSET_ID); - } - - public void setReadRateIsSet(boolean value) { - __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __READRATE_ISSET_ID, value); - } - - public int getWriteRate() { - return this.writeRate; - } - - public TExternalCompactionJob setWriteRate(int writeRate) { - this.writeRate = writeRate; - setWriteRateIsSet(true); - return this; - } - - public void unsetWriteRate() { - __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __WRITERATE_ISSET_ID); - } - - /** Returns true if field writeRate is set (has been assigned a value) and false otherwise */ - public boolean isSetWriteRate() { - return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __WRITERATE_ISSET_ID); - } - - public void setWriteRateIsSet(boolean value) { - __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __WRITERATE_ISSET_ID, value); - } - @org.apache.thrift.annotation.Nullable public IteratorConfig getIteratorSettings() { return this.iteratorSettings; @@ -518,72 +365,6 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal } } - /** - * - * @see TCompactionType - */ - @org.apache.thrift.annotation.Nullable - public TCompactionType getType() { - return this.type; - } - - /** - * - * @see TCompactionType - */ - public TExternalCompactionJob setType(@org.apache.thrift.annotation.Nullable TCompactionType type) { - this.type = type; - return this; - } - - public void unsetType() { - this.type = null; - } - - /** Returns true if field type is set (has been assigned a value) and false otherwise */ - public boolean isSetType() { - return this.type != null; - } - - public void setTypeIsSet(boolean value) { - if (!value) { - this.type = null; - } - } - - /** - * - * @see TCompactionReason - */ - @org.apache.thrift.annotation.Nullable - public TCompactionReason getReason() { - return this.reason; - } - - /** - * - * @see TCompactionReason - */ - public TExternalCompactionJob setReason(@org.apache.thrift.annotation.Nullable TCompactionReason reason) { - this.reason = reason; - return this; - } - - public void unsetReason() { - this.reason = null; - } - - /** Returns true if field reason is set (has been assigned a value) and false otherwise */ - public boolean isSetReason() { - return this.reason != null; - } - - public void setReasonIsSet(boolean value) { - if (!value) { - this.reason = null; - } - } - @org.apache.thrift.annotation.Nullable public java.lang.String getOutputFile() { return this.outputFile; @@ -742,30 +523,6 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal } break; - case PRIORITY: - if (value == null) { - unsetPriority(); - } else { - setPriority((java.lang.Integer)value); - } - break; - - case READ_RATE: - if (value == null) { - unsetReadRate(); - } else { - setReadRate((java.lang.Integer)value); - } - break; - - case WRITE_RATE: - if (value == null) { - unsetWriteRate(); - } else { - setWriteRate((java.lang.Integer)value); - } - break; - case ITERATOR_SETTINGS: if (value == null) { unsetIteratorSettings(); @@ -774,22 +531,6 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal } break; - case TYPE: - if (value == null) { - unsetType(); - } else { - setType((TCompactionType)value); - } - break; - - case REASON: - if (value == null) { - unsetReason(); - } else { - setReason((TCompactionReason)value); - } - break; - case OUTPUT_FILE: if (value == null) { unsetOutputFile(); @@ -845,24 +586,9 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal case FILES: return getFiles(); - case PRIORITY: - return getPriority(); - - case READ_RATE: - return getReadRate(); - - case WRITE_RATE: - return getWriteRate(); - case ITERATOR_SETTINGS: return getIteratorSettings(); - case TYPE: - return getType(); - - case REASON: - return getReason(); - case OUTPUT_FILE: return getOutputFile(); @@ -895,18 +621,8 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal return isSetExtent(); case FILES: return isSetFiles(); - case PRIORITY: - return isSetPriority(); - case READ_RATE: - return isSetReadRate(); - case WRITE_RATE: - return isSetWriteRate(); case ITERATOR_SETTINGS: return isSetIteratorSettings(); - case TYPE: - return isSetType(); - case REASON: - return isSetReason(); case OUTPUT_FILE: return isSetOutputFile(); case PROPAGATE_DELETES: @@ -963,33 +679,6 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal return false; } - boolean this_present_priority = true; - boolean that_present_priority = true; - if (this_present_priority || that_present_priority) { - if (!(this_present_priority && that_present_priority)) - return false; - if (this.priority != that.priority) - return false; - } - - boolean this_present_readRate = true; - boolean that_present_readRate = true; - if (this_present_readRate || that_present_readRate) { - if (!(this_present_readRate && that_present_readRate)) - return false; - if (this.readRate != that.readRate) - return false; - } - - boolean this_present_writeRate = true; - boolean that_present_writeRate = true; - if (this_present_writeRate || that_present_writeRate) { - if (!(this_present_writeRate && that_present_writeRate)) - return false; - if (this.writeRate != that.writeRate) - return false; - } - boolean this_present_iteratorSettings = true && this.isSetIteratorSettings(); boolean that_present_iteratorSettings = true && that.isSetIteratorSettings(); if (this_present_iteratorSettings || that_present_iteratorSettings) { @@ -999,24 +688,6 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal return false; } - boolean this_present_type = true && this.isSetType(); - boolean that_present_type = true && that.isSetType(); - if (this_present_type || that_present_type) { - if (!(this_present_type && that_present_type)) - return false; - if (!this.type.equals(that.type)) - return false; - } - - boolean this_present_reason = true && this.isSetReason(); - boolean that_present_reason = true && that.isSetReason(); - if (this_present_reason || that_present_reason) { - if (!(this_present_reason && that_present_reason)) - return false; - if (!this.reason.equals(that.reason)) - return false; - } - boolean this_present_outputFile = true && this.isSetOutputFile(); boolean that_present_outputFile = true && that.isSetOutputFile(); if (this_present_outputFile || that_present_outputFile) { @@ -1081,24 +752,10 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal if (isSetFiles()) hashCode = hashCode * 8191 + files.hashCode(); - hashCode = hashCode * 8191 + priority; - - hashCode = hashCode * 8191 + readRate; - - hashCode = hashCode * 8191 + writeRate; - hashCode = hashCode * 8191 + ((isSetIteratorSettings()) ? 131071 : 524287); if (isSetIteratorSettings()) hashCode = hashCode * 8191 + iteratorSettings.hashCode(); - hashCode = hashCode * 8191 + ((isSetType()) ? 131071 : 524287); - if (isSetType()) - hashCode = hashCode * 8191 + type.getValue(); - - hashCode = hashCode * 8191 + ((isSetReason()) ? 131071 : 524287); - if (isSetReason()) - hashCode = hashCode * 8191 + reason.getValue(); - hashCode = hashCode * 8191 + ((isSetOutputFile()) ? 131071 : 524287); if (isSetOutputFile()) hashCode = hashCode * 8191 + outputFile.hashCode(); @@ -1156,36 +813,6 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal return lastComparison; } } - lastComparison = java.lang.Boolean.valueOf(isSetPriority()).compareTo(other.isSetPriority()); - if (lastComparison != 0) { - return lastComparison; - } - if (isSetPriority()) { - lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.priority, other.priority); - if (lastComparison != 0) { - return lastComparison; - } - } - lastComparison = java.lang.Boolean.valueOf(isSetReadRate()).compareTo(other.isSetReadRate()); - if (lastComparison != 0) { - return lastComparison; - } - if (isSetReadRate()) { - lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.readRate, other.readRate); - if (lastComparison != 0) { - return lastComparison; - } - } - lastComparison = java.lang.Boolean.valueOf(isSetWriteRate()).compareTo(other.isSetWriteRate()); - if (lastComparison != 0) { - return lastComparison; - } - if (isSetWriteRate()) { - lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.writeRate, other.writeRate); - if (lastComparison != 0) { - return lastComparison; - } - } lastComparison = java.lang.Boolean.valueOf(isSetIteratorSettings()).compareTo(other.isSetIteratorSettings()); if (lastComparison != 0) { return lastComparison; @@ -1196,26 +823,6 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal return lastComparison; } } - lastComparison = java.lang.Boolean.valueOf(isSetType()).compareTo(other.isSetType()); - if (lastComparison != 0) { - return lastComparison; - } - if (isSetType()) { - lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.type, other.type); - if (lastComparison != 0) { - return lastComparison; - } - } - lastComparison = java.lang.Boolean.valueOf(isSetReason()).compareTo(other.isSetReason()); - if (lastComparison != 0) { - return lastComparison; - } - if (isSetReason()) { - lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.reason, other.reason); - if (lastComparison != 0) { - return lastComparison; - } - } lastComparison = java.lang.Boolean.valueOf(isSetOutputFile()).compareTo(other.isSetOutputFile()); if (lastComparison != 0) { return lastComparison; @@ -1311,18 +918,6 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal } first = false; if (!first) sb.append(", "); - sb.append("priority:"); - sb.append(this.priority); - first = false; - if (!first) sb.append(", "); - sb.append("readRate:"); - sb.append(this.readRate); - first = false; - if (!first) sb.append(", "); - sb.append("writeRate:"); - sb.append(this.writeRate); - first = false; - if (!first) sb.append(", "); sb.append("iteratorSettings:"); if (this.iteratorSettings == null) { sb.append("null"); @@ -1331,22 +926,6 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal } first = false; if (!first) sb.append(", "); - sb.append("type:"); - if (this.type == null) { - sb.append("null"); - } else { - sb.append(this.type); - } - first = false; - if (!first) sb.append(", "); - sb.append("reason:"); - if (this.reason == null) { - sb.append("null"); - } else { - sb.append(this.reason); - } - first = false; - if (!first) sb.append(", "); sb.append("outputFile:"); if (this.outputFile == null) { sb.append("null"); @@ -1465,31 +1044,7 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; - case 4: // PRIORITY - if (schemeField.type == org.apache.thrift.protocol.TType.I32) { - struct.priority = iprot.readI32(); - struct.setPriorityIsSet(true); - } else { - org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); - } - break; - case 5: // READ_RATE - if (schemeField.type == org.apache.thrift.protocol.TType.I32) { - struct.readRate = iprot.readI32(); - struct.setReadRateIsSet(true); - } else { - org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); - } - break; - case 6: // WRITE_RATE - if (schemeField.type == org.apache.thrift.protocol.TType.I32) { - struct.writeRate = iprot.readI32(); - struct.setWriteRateIsSet(true); - } else { - org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); - } - break; - case 7: // ITERATOR_SETTINGS + case 4: // ITERATOR_SETTINGS if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) { struct.iteratorSettings = new IteratorConfig(); struct.iteratorSettings.read(iprot); @@ -1498,23 +1053,7 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; - case 8: // TYPE - if (schemeField.type == org.apache.thrift.protocol.TType.I32) { - struct.type = org.apache.accumulo.core.tabletserver.thrift.TCompactionType.findByValue(iprot.readI32()); - struct.setTypeIsSet(true); - } else { - org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); - } - break; - case 9: // REASON - if (schemeField.type == org.apache.thrift.protocol.TType.I32) { - struct.reason = org.apache.accumulo.core.tabletserver.thrift.TCompactionReason.findByValue(iprot.readI32()); - struct.setReasonIsSet(true); - } else { - org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); - } - break; - case 10: // OUTPUT_FILE + case 5: // OUTPUT_FILE if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { struct.outputFile = iprot.readString(); struct.setOutputFileIsSet(true); @@ -1522,7 +1061,7 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; - case 11: // PROPAGATE_DELETES + case 6: // PROPAGATE_DELETES if (schemeField.type == org.apache.thrift.protocol.TType.BOOL) { struct.propagateDeletes = iprot.readBool(); struct.setPropagateDeletesIsSet(true); @@ -1530,7 +1069,7 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; - case 12: // KIND + case 7: // KIND if (schemeField.type == org.apache.thrift.protocol.TType.I32) { struct.kind = org.apache.accumulo.core.tabletserver.thrift.TCompactionKind.findByValue(iprot.readI32()); struct.setKindIsSet(true); @@ -1538,7 +1077,7 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; - case 13: // USER_COMPACTION_ID + case 8: // USER_COMPACTION_ID if (schemeField.type == org.apache.thrift.protocol.TType.I64) { struct.userCompactionId = iprot.readI64(); struct.setUserCompactionIdIsSet(true); @@ -1546,7 +1085,7 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; - case 14: // TABLE_COMPACTION_PROPERTIES + case 9: // TABLE_COMPACTION_PROPERTIES if (schemeField.type == org.apache.thrift.protocol.TType.MAP) { { org.apache.thrift.protocol.TMap _map119 = iprot.readMapBegin(); @@ -1603,30 +1142,11 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal } oprot.writeFieldEnd(); } - oprot.writeFieldBegin(PRIORITY_FIELD_DESC); - oprot.writeI32(struct.priority); - oprot.writeFieldEnd(); - oprot.writeFieldBegin(READ_RATE_FIELD_DESC); - oprot.writeI32(struct.readRate); - oprot.writeFieldEnd(); - oprot.writeFieldBegin(WRITE_RATE_FIELD_DESC); - oprot.writeI32(struct.writeRate); - oprot.writeFieldEnd(); if (struct.iteratorSettings != null) { oprot.writeFieldBegin(ITERATOR_SETTINGS_FIELD_DESC); struct.iteratorSettings.write(oprot); oprot.writeFieldEnd(); } - if (struct.type != null) { - oprot.writeFieldBegin(TYPE_FIELD_DESC); - oprot.writeI32(struct.type.getValue()); - oprot.writeFieldEnd(); - } - if (struct.reason != null) { - oprot.writeFieldBegin(REASON_FIELD_DESC); - oprot.writeI32(struct.reason.getValue()); - oprot.writeFieldEnd(); - } if (struct.outputFile != null) { oprot.writeFieldBegin(OUTPUT_FILE_FIELD_DESC); oprot.writeString(struct.outputFile); @@ -1683,40 +1203,25 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal if (struct.isSetFiles()) { optionals.set(2); } - if (struct.isSetPriority()) { - optionals.set(3); - } - if (struct.isSetReadRate()) { - optionals.set(4); - } - if (struct.isSetWriteRate()) { - optionals.set(5); - } if (struct.isSetIteratorSettings()) { - optionals.set(6); - } - if (struct.isSetType()) { - optionals.set(7); - } - if (struct.isSetReason()) { - optionals.set(8); + optionals.set(3); } if (struct.isSetOutputFile()) { - optionals.set(9); + optionals.set(4); } if (struct.isSetPropagateDeletes()) { - optionals.set(10); + optionals.set(5); } if (struct.isSetKind()) { - optionals.set(11); + optionals.set(6); } if (struct.isSetUserCompactionId()) { - optionals.set(12); + optionals.set(7); } if (struct.isSetTableCompactionProperties()) { - optionals.set(13); + optionals.set(8); } - oprot.writeBitSet(optionals, 14); + oprot.writeBitSet(optionals, 9); if (struct.isSetExternalCompactionId()) { oprot.writeString(struct.externalCompactionId); } @@ -1732,24 +1237,9 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal } } } - if (struct.isSetPriority()) { - oprot.writeI32(struct.priority); - } - if (struct.isSetReadRate()) { - oprot.writeI32(struct.readRate); - } - if (struct.isSetWriteRate()) { - oprot.writeI32(struct.writeRate); - } if (struct.isSetIteratorSettings()) { struct.iteratorSettings.write(oprot); } - if (struct.isSetType()) { - oprot.writeI32(struct.type.getValue()); - } - if (struct.isSetReason()) { - oprot.writeI32(struct.reason.getValue()); - } if (struct.isSetOutputFile()) { oprot.writeString(struct.outputFile); } @@ -1777,7 +1267,7 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal @Override public void read(org.apache.thrift.protocol.TProtocol prot, TExternalCompactionJob struct) throws org.apache.thrift.TException { org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot; - java.util.BitSet incoming = iprot.readBitSet(14); + java.util.BitSet incoming = iprot.readBitSet(9); if (incoming.get(0)) { struct.externalCompactionId = iprot.readString(); struct.setExternalCompactionIdIsSet(true); @@ -1802,47 +1292,27 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal struct.setFilesIsSet(true); } if (incoming.get(3)) { - struct.priority = iprot.readI32(); - struct.setPriorityIsSet(true); - } - if (incoming.get(4)) { - struct.readRate = iprot.readI32(); - struct.setReadRateIsSet(true); - } - if (incoming.get(5)) { - struct.writeRate = iprot.readI32(); - struct.setWriteRateIsSet(true); - } - if (incoming.get(6)) { struct.iteratorSettings = new IteratorConfig(); struct.iteratorSettings.read(iprot); struct.setIteratorSettingsIsSet(true); } - if (incoming.get(7)) { - struct.type = org.apache.accumulo.core.tabletserver.thrift.TCompactionType.findByValue(iprot.readI32()); - struct.setTypeIsSet(true); - } - if (incoming.get(8)) { - struct.reason = org.apache.accumulo.core.tabletserver.thrift.TCompactionReason.findByValue(iprot.readI32()); - struct.setReasonIsSet(true); - } - if (incoming.get(9)) { + if (incoming.get(4)) { struct.outputFile = iprot.readString(); struct.setOutputFileIsSet(true); } - if (incoming.get(10)) { + if (incoming.get(5)) { struct.propagateDeletes = iprot.readBool(); struct.setPropagateDeletesIsSet(true); } - if (incoming.get(11)) { + if (incoming.get(6)) { struct.kind = org.apache.accumulo.core.tabletserver.thrift.TCompactionKind.findByValue(iprot.readI32()); struct.setKindIsSet(true); } - if (incoming.get(12)) { + if (incoming.get(7)) { struct.userCompactionId = iprot.readI64(); struct.setUserCompactionIdIsSet(true); } - if (incoming.get(13)) { + if (incoming.get(8)) { { org.apache.thrift.protocol.TMap _map130 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, iprot.readI32()); struct.tableCompactionProperties = new java.util.HashMap<java.lang.String,java.lang.String>(2*_map130.size); diff --git a/core/src/main/thrift/tabletserver.thrift b/core/src/main/thrift/tabletserver.thrift index afc47f8..4aaabef 100644 --- a/core/src/main/thrift/tabletserver.thrift +++ b/core/src/main/thrift/tabletserver.thrift @@ -170,18 +170,12 @@ struct TExternalCompactionJob { 1:string externalCompactionId 2:data.TKeyExtent extent 3:list<InputFile> files - 4:i32 priority - 5:i32 readRate - 6:i32 writeRate - 7:IteratorConfig iteratorSettings - 8:TCompactionType type - # CBUG Need to add SELECTOR To TCompactionReason, delete TCompactionKind? - 9:TCompactionReason reason - 10:string outputFile - 11:bool propagateDeletes - 12:TCompactionKind kind - 13:i64 userCompactionId - 14:map<string, string> tableCompactionProperties + 4:IteratorConfig iteratorSettings + 5:string outputFile + 6:bool propagateDeletes + 7:TCompactionKind kind + 8:i64 userCompactionId + 9:map<string, string> tableCompactionProperties } enum TCompactionKind { diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java index 7e4e0de..9a8e0c9 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java @@ -18,9 +18,6 @@ */ package org.apache.accumulo.compactor; -import java.io.Closeable; -import java.io.IOException; - import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.TableId; @@ -30,8 +27,8 @@ import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.spi.compaction.CompactionKind; import org.apache.accumulo.core.tabletserver.thrift.TCompactionReason; import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; +import org.apache.accumulo.core.util.ratelimit.NullRateLimiter; import org.apache.accumulo.core.util.ratelimit.RateLimiter; -import org.apache.accumulo.core.util.ratelimit.SharedRateLimiterFactory; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.compaction.Compactor.CompactionEnv; import org.apache.accumulo.server.iterators.SystemIteratorEnvironment; @@ -39,11 +36,9 @@ import org.apache.accumulo.server.iterators.TabletIteratorEnvironment; import com.google.common.annotations.VisibleForTesting; -public class CompactionEnvironment implements Closeable, CompactionEnv { +public class CompactionEnvironment implements CompactionEnv { - private final ServerContext context; private final CompactionJobHolder jobHolder; - private final SharedRateLimiterFactory limiter; private TExternalCompactionJob job; private String queueName; @@ -63,21 +58,13 @@ public class CompactionEnvironment implements Closeable, CompactionEnv { } } - CompactionEnvironment(ServerContext context, CompactionJobHolder jobHolder, String queueName) { - this.context = context; + CompactionEnvironment(CompactionJobHolder jobHolder, String queueName) { this.jobHolder = jobHolder; this.job = jobHolder.getJob(); - this.limiter = SharedRateLimiterFactory.getInstance(this.context.getConfiguration()); this.queueName = queueName; } @Override - public void close() throws IOException { - limiter.remove("read_rate_limiter"); - limiter.remove("write_rate_limiter"); - } - - @Override public boolean isCompactionEnabled() { return !jobHolder.isCancelled(); } @@ -89,12 +76,12 @@ public class CompactionEnvironment implements Closeable, CompactionEnv { @Override public RateLimiter getReadLimiter() { - return limiter.create("read_rate_limiter", () -> job.getReadRate()); + return NullRateLimiter.INSTANCE; } @Override public RateLimiter getWriteLimiter() { - return limiter.create("write_rate_limiter", () -> job.getWriteRate()); + return NullRateLimiter.INSTANCE; } @Override diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index b51655f..9461c19 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -558,23 +558,22 @@ public class Compactor extends AbstractServer job.getIteratorSettings().getIterators() .forEach(tis -> iters.add(SystemIteratorUtil.toIteratorSetting(tis))); - try (CompactionEnvironment cenv = - new CompactionEnvironment(getContext(), JOB_HOLDER, queueName)) { - org.apache.accumulo.server.compaction.Compactor compactor = - new org.apache.accumulo.server.compaction.Compactor(getContext(), - KeyExtent.fromThrift(job.getExtent()), files, outputFile, - job.isPropagateDeletes(), cenv, iters, tConfig); - - LOG.info("Starting compactor"); - started.countDown(); - - org.apache.accumulo.server.compaction.CompactionStats stat = compactor.call(); - TCompactionStats cs = new TCompactionStats(); - cs.setEntriesRead(stat.getEntriesRead()); - cs.setEntriesWritten(stat.getEntriesWritten()); - cs.setFileSize(stat.getFileSize()); - JOB_HOLDER.setStats(cs); - } + CompactionEnvironment cenv = new CompactionEnvironment(JOB_HOLDER, queueName); + org.apache.accumulo.server.compaction.Compactor compactor = + new org.apache.accumulo.server.compaction.Compactor(getContext(), + KeyExtent.fromThrift(job.getExtent()), files, outputFile, + job.isPropagateDeletes(), cenv, iters, tConfig); + + LOG.info("Starting compactor"); + started.countDown(); + + org.apache.accumulo.server.compaction.CompactionStats stat = compactor.call(); + TCompactionStats cs = new TCompactionStats(); + cs.setEntriesRead(stat.getEntriesRead()); + cs.setEntriesWritten(stat.getEntriesWritten()); + cs.setFileSize(stat.getFileSize()); + JOB_HOLDER.setStats(cs); + LOG.info("Compaction completed successfully {} ", job.getExternalCompactionId()); // Update state when completed updateCompactionState(job, TCompactionState.SUCCEEDED, diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java index e83b238..326bdd0 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java @@ -159,9 +159,6 @@ public class CompactionService { this.rateLimit.set(maxRate); - // CBUG it may make sense to move the rate limit config to the planner and executors... it makes - // no sense at the service level for a mix of internal and external compactions... makes a lot - // more sense at the executor level this.readLimiter = SharedRateLimiterFactory.getInstance(this.serverCtx.getConfiguration()) .create("CS_" + serviceName + "_read", () -> rateLimit.get()); this.writeLimiter = SharedRateLimiterFactory.getInstance(this.serverCtx.getConfiguration()) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java index 63d91b3..edb01eb 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java @@ -112,11 +112,11 @@ public class ExternalCompactionExecutor implements CompactionExecutor { this.ceid = ceid; Comparator<ExternalJob> priorityComparator = Comparator.comparingLong(ej -> ej.getJob().getPriority()); - priorityComparator = priorityComparator.reversed(); - Comparator<ExternalJob> timeComparator = Comparator.comparingLong(ExternalJob::getTimeCreated); + priorityComparator = + priorityComparator.reversed().thenComparingLong(ExternalJob::getTimeCreated); this.queue = new PriorityBlockingQueue<ExternalJob>(100, - priorityComparator.thenComparing(timeComparator)); + priorityComparator.thenComparing(priorityComparator)); } @Override @@ -181,12 +181,13 @@ public class ExternalCompactionExecutor implements CompactionExecutor { } } else { queue.add(extJob); + found = null; + break; } } return found; } - // TODO maybe create non-thrift type to avoid thrift types all over the code public TCompactionQueueSummary summarize() { long priority = 0; ExternalJob topJob = queue.peek(); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java index 33e8e8c..9ef9191 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java @@ -21,7 +21,6 @@ package org.apache.accumulo.tserver.compactions; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Set; import java.util.stream.Collectors; import org.apache.accumulo.core.client.IteratorSetting; @@ -29,24 +28,22 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.TabletFile; +import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.spi.compaction.CompactionKind; import org.apache.accumulo.core.tabletserver.thrift.InputFile; import org.apache.accumulo.core.tabletserver.thrift.IteratorConfig; -import org.apache.accumulo.core.tabletserver.thrift.TCompactionReason; -import org.apache.accumulo.core.tabletserver.thrift.TCompactionType; import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; import com.google.common.base.Preconditions; public class ExternalCompactionJob { - private Set<StoredTabletFile> jobFiles; + private Map<StoredTabletFile,DataFileValue> jobFiles; private boolean propogateDeletes; private TabletFile compactTmpName; private KeyExtent extent; private ExternalCompactionId externalCompactionId; - private long priority; private CompactionKind kind; private List<IteratorSetting> iters; private long userCompactionId; @@ -54,16 +51,15 @@ public class ExternalCompactionJob { public ExternalCompactionJob() {} - public ExternalCompactionJob(Set<StoredTabletFile> jobFiles, boolean propogateDeletes, - TabletFile compactTmpName, KeyExtent extent, ExternalCompactionId externalCompactionId, - long priority, CompactionKind kind, List<IteratorSetting> iters, Long userCompactionId, - Map<String,String> tableCompactionConfiguration) { + public ExternalCompactionJob(Map<StoredTabletFile,DataFileValue> jobFiles, + boolean propogateDeletes, TabletFile compactTmpName, KeyExtent extent, + ExternalCompactionId externalCompactionId, CompactionKind kind, List<IteratorSetting> iters, + Long userCompactionId, Map<String,String> tableCompactionConfiguration) { this.jobFiles = Objects.requireNonNull(jobFiles); this.propogateDeletes = propogateDeletes; this.compactTmpName = Objects.requireNonNull(compactTmpName); this.extent = Objects.requireNonNull(extent); this.externalCompactionId = Objects.requireNonNull(externalCompactionId); - this.priority = priority; this.kind = Objects.requireNonNull(kind); this.iters = Objects.requireNonNull(iters); if (kind == CompactionKind.USER) { @@ -76,38 +72,16 @@ public class ExternalCompactionJob { } public TExternalCompactionJob toThrift() { - - // TODO read and write rate - int readRate = 0; - int writeRate = 0; - - // TODO how are these two used? - TCompactionType type = propogateDeletes ? TCompactionType.MAJOR : TCompactionType.FULL; - TCompactionReason reason; - switch (kind) { - case USER: - reason = TCompactionReason.USER; - break; - case CHOP: - reason = TCompactionReason.CHOP; - break; - case SYSTEM: - case SELECTOR: - reason = TCompactionReason.SYSTEM; - break; - default: - throw new IllegalStateException(); - } IteratorConfig iteratorSettings = SystemIteratorUtil.toIteratorConfig(iters); - // TODO what are things that are zeros below needed for - List<InputFile> files = jobFiles.stream().map(stf -> new InputFile(stf.getPathStr(), 0, 0, 0)) - .collect(Collectors.toList()); + List<InputFile> files = jobFiles.entrySet().stream().map(e -> { + var dfv = e.getValue(); + return new InputFile(e.getKey().getPathStr(), dfv.getSize(), dfv.getNumEntries(), + dfv.getTime()); + }).collect(Collectors.toList()); - // TODO priority cast and compactionId cast... compactionId could be null I think return new TExternalCompactionJob(externalCompactionId.toString(), extent.toThrift(), files, - (int) priority, readRate, writeRate, iteratorSettings, type, reason, - compactTmpName.getPathStr(), propogateDeletes, + iteratorSettings, compactTmpName.getPathStr(), propogateDeletes, org.apache.accumulo.core.tabletserver.thrift.TCompactionKind.valueOf(kind.name()), userCompactionId, tableCompactionConfiguration); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java index 6ff27f6..001776a 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java @@ -935,7 +935,6 @@ public class CompactableImpl implements Compactable { throw new RuntimeException(e); } finally { completeCompaction(job, cInfo.jobFiles, metaFile); - // TODO should this be in completeCompaction tablet.updateTimer(MAJOR, queuedTime, startTime, stats.getEntriesRead(), metaFile == null); } } @@ -981,9 +980,13 @@ public class CompactableImpl implements Compactable { externalCompactions.put(externalCompactionId, ecInfo); - return new ExternalCompactionJob(cInfo.jobFiles, cInfo.propogateDeletes, compactTmpName, - getExtent(), externalCompactionId, job.getPriority(), job.getKind(), cInfo.iters, - cInfo.checkCompactionId, tableCompactionProperties); + SortedMap<StoredTabletFile,DataFileValue> allFiles = tablet.getDatafiles(); + HashMap<StoredTabletFile,DataFileValue> compactFiles = new HashMap<>(); + cInfo.jobFiles.forEach(file -> compactFiles.put(file, allFiles.get(file))); + + return new ExternalCompactionJob(compactFiles, cInfo.propogateDeletes, compactTmpName, + getExtent(), externalCompactionId, job.getKind(), cInfo.iters, cInfo.checkCompactionId, + tableCompactionProperties); } catch (Exception e) { externalCompactions.remove(externalCompactionId); @@ -1012,7 +1015,6 @@ public class CompactableImpl implements Compactable { if (ecInfo != null) { log.debug("Attempting to commit external compaction {}", extCompactionId); - // TODO do a sanity check that files exists in dfs? StoredTabletFile metaFile = null; try { // possibly do some sanity checks here