This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 1dd322581c Removed Arbitrator, ZooArbitrator, and TransactionWatcher (#3890) 1dd322581c is described below commit 1dd322581ca7b888756b7e2a23c543e6de6f3543 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Thu Nov 2 11:37:33 2023 -0400 Removed Arbitrator, ZooArbitrator, and TransactionWatcher (#3890) Fixes #3351 --- .../java/org/apache/accumulo/core/Constants.java | 3 - .../core/clientImpl/thrift/ClientService.java | 1025 -------------------- core/src/main/thrift/client.thrift | 6 - .../server/client/ClientServiceHandler.java | 10 +- .../server/iterators/MetadataBulkLoadFilter.java | 2 - .../accumulo/server/manager/LiveTServerSet.java | 11 - .../server/zookeeper/TransactionWatcher.java | 195 ---- .../accumulo/server/rpc/TServerUtilsTest.java | 2 +- .../server/zookeeper/TransactionWatcherTest.java | 135 --- .../manager/tableOps/bulkVer2/PrepBulkImport.java | 3 - .../accumulo/tserver/TabletClientHandler.java | 6 +- .../org/apache/accumulo/tserver/TabletServer.java | 15 +- .../test/functional/AmpleConditionalWriterIT.java | 5 - .../accumulo/test/functional/BulkFailureIT.java | 5 - .../accumulo/test/functional/SplitRecoveryIT.java | 2 - .../accumulo/test/functional/ZombieTServer.java | 4 +- .../accumulo/test/performance/NullTserver.java | 3 +- 17 files changed, 11 insertions(+), 1421 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/Constants.java b/core/src/main/java/org/apache/accumulo/core/Constants.java index 172ff8b355..84dece9030 100644 --- a/core/src/main/java/org/apache/accumulo/core/Constants.java +++ b/core/src/main/java/org/apache/accumulo/core/Constants.java @@ -86,9 +86,6 @@ public class Constants { public static final String ZPROBLEMS = "/problems"; - // ELASTICITY_TODO remove - public static final String BULK_ARBITRATOR_TYPE = "bulkTx"; - public static final String ZFATE = "/fate"; public static final String ZNEXT_FILE = "/next_file"; diff --git a/core/src/main/thrift-gen-java/org/apache/accumulo/core/clientImpl/thrift/ClientService.java b/core/src/main/thrift-gen-java/org/apache/accumulo/core/clientImpl/thrift/ClientService.java index 64713aff8b..799bb4a6f7 100644 --- a/core/src/main/thrift-gen-java/org/apache/accumulo/core/clientImpl/thrift/ClientService.java +++ b/core/src/main/thrift-gen-java/org/apache/accumulo/core/clientImpl/thrift/ClientService.java @@ -35,8 +35,6 @@ public class ClientService { public java.lang.String getZooKeepers() throws org.apache.thrift.TException; - public boolean isActive(TInfo tinfo, long tid) throws org.apache.thrift.TException; - public void ping(org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials) throws ThriftSecurityException, org.apache.thrift.TException; public java.util.List<TDiskUsage> getDiskUsage(java.util.Set<java.lang.String> tables, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials) throws ThriftSecurityException, ThriftTableOperationException, org.apache.thrift.TException; @@ -109,8 +107,6 @@ public class ClientService { public void getZooKeepers(org.apache.thrift.async.AsyncMethodCallback<java.lang.String> resultHandler) throws org.apache.thrift.TException; - public void isActive(TInfo tinfo, long tid, org.apache.thrift.async.AsyncMethodCallback<java.lang.Boolean> resultHandler) throws org.apache.thrift.TException; - public void ping(org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws org.apache.thrift.TException; public void getDiskUsage(java.util.Set<java.lang.String> tables, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, org.apache.thrift.async.AsyncMethodCallback<java.util.List<TDiskUsage>> resultHandler) throws org.apache.thrift.TException; @@ -266,31 +262,6 @@ public class ClientService { throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "getZooKeepers failed: unknown result"); } - @Override - public boolean isActive(TInfo tinfo, long tid) throws org.apache.thrift.TException - { - send_isActive(tinfo, tid); - return recv_isActive(); - } - - public void send_isActive(TInfo tinfo, long tid) throws org.apache.thrift.TException - { - isActive_args args = new isActive_args(); - args.setTinfo(tinfo); - args.setTid(tid); - sendBase("isActive", args); - } - - public boolean recv_isActive() throws org.apache.thrift.TException - { - isActive_result result = new isActive_result(); - receiveBase(result, "isActive"); - if (result.isSetSuccess()) { - return result.success; - } - throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "isActive failed: unknown result"); - } - @Override public void ping(org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials) throws ThriftSecurityException, org.apache.thrift.TException { @@ -1330,44 +1301,6 @@ public class ClientService { } } - @Override - public void isActive(TInfo tinfo, long tid, org.apache.thrift.async.AsyncMethodCallback<java.lang.Boolean> resultHandler) throws org.apache.thrift.TException { - checkReady(); - isActive_call method_call = new isActive_call(tinfo, tid, resultHandler, this, ___protocolFactory, ___transport); - this.___currentMethod = method_call; - ___manager.call(method_call); - } - - public static class isActive_call extends org.apache.thrift.async.TAsyncMethodCall<java.lang.Boolean> { - private TInfo tinfo; - private long tid; - public isActive_call(TInfo tinfo, long tid, org.apache.thrift.async.AsyncMethodCallback<java.lang.Boolean> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException { - super(client, protocolFactory, transport, resultHandler, false); - this.tinfo = tinfo; - this.tid = tid; - } - - @Override - public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException { - prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("isActive", org.apache.thrift.protocol.TMessageType.CALL, 0)); - isActive_args args = new isActive_args(); - args.setTinfo(tinfo); - args.setTid(tid); - args.write(prot); - prot.writeMessageEnd(); - } - - @Override - public java.lang.Boolean getResult() throws org.apache.thrift.TException { - if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) { - throw new java.lang.IllegalStateException("Method call not finished!"); - } - org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array()); - org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport); - return (new Client(prot)).recv_isActive(); - } - } - @Override public void ping(org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws org.apache.thrift.TException { checkReady(); @@ -2714,7 +2647,6 @@ public class ClientService { processMap.put("getRootTabletLocation", new getRootTabletLocation()); processMap.put("getInstanceId", new getInstanceId()); processMap.put("getZooKeepers", new getZooKeepers()); - processMap.put("isActive", new isActive()); processMap.put("ping", new ping()); processMap.put("getDiskUsage", new getDiskUsage()); processMap.put("listLocalUsers", new listLocalUsers()); @@ -2833,35 +2765,6 @@ public class ClientService { } } - public static class isActive<I extends Iface> extends org.apache.thrift.ProcessFunction<I, isActive_args> { - public isActive() { - super("isActive"); - } - - @Override - public isActive_args getEmptyArgsInstance() { - return new isActive_args(); - } - - @Override - protected boolean isOneway() { - return false; - } - - @Override - protected boolean rethrowUnhandledExceptions() { - return false; - } - - @Override - public isActive_result getResult(I iface, isActive_args args) throws org.apache.thrift.TException { - isActive_result result = new isActive_result(); - result.success = iface.isActive(args.tinfo, args.tid); - result.setSuccessIsSet(true); - return result; - } - } - public static class ping<I extends Iface> extends org.apache.thrift.ProcessFunction<I, ping_args> { public ping() { super("ping"); @@ -3904,7 +3807,6 @@ public class ClientService { processMap.put("getRootTabletLocation", new getRootTabletLocation()); processMap.put("getInstanceId", new getInstanceId()); processMap.put("getZooKeepers", new getZooKeepers()); - processMap.put("isActive", new isActive()); processMap.put("ping", new ping()); processMap.put("getDiskUsage", new getDiskUsage()); processMap.put("listLocalUsers", new listLocalUsers()); @@ -4140,74 +4042,6 @@ public class ClientService { } } - public static class isActive<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, isActive_args, java.lang.Boolean> { - public isActive() { - super("isActive"); - } - - @Override - public isActive_args getEmptyArgsInstance() { - return new isActive_args(); - } - - @Override - public org.apache.thrift.async.AsyncMethodCallback<java.lang.Boolean> getResultHandler(final org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer fb, final int seqid) { - final org.apache.thrift.AsyncProcessFunction fcall = this; - return new org.apache.thrift.async.AsyncMethodCallback<java.lang.Boolean>() { - @Override - public void onComplete(java.lang.Boolean o) { - isActive_result result = new isActive_result(); - result.success = o; - result.setSuccessIsSet(true); - try { - fcall.sendResponse(fb, result, org.apache.thrift.protocol.TMessageType.REPLY,seqid); - } catch (org.apache.thrift.transport.TTransportException e) { - _LOGGER.error("TTransportException writing to internal frame buffer", e); - fb.close(); - } catch (java.lang.Exception e) { - _LOGGER.error("Exception writing to internal frame buffer", e); - onError(e); - } - } - @Override - public void onError(java.lang.Exception e) { - byte msgType = org.apache.thrift.protocol.TMessageType.REPLY; - org.apache.thrift.TSerializable msg; - isActive_result result = new isActive_result(); - if (e instanceof org.apache.thrift.transport.TTransportException) { - _LOGGER.error("TTransportException inside handler", e); - fb.close(); - return; - } else if (e instanceof org.apache.thrift.TApplicationException) { - _LOGGER.error("TApplicationException inside handler", e); - msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION; - msg = (org.apache.thrift.TApplicationException)e; - } else { - _LOGGER.error("Exception inside handler", e); - msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION; - msg = new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage()); - } - try { - fcall.sendResponse(fb,msg,msgType,seqid); - } catch (java.lang.Exception ex) { - _LOGGER.error("Exception writing to internal frame buffer", ex); - fb.close(); - } - } - }; - } - - @Override - protected boolean isOneway() { - return false; - } - - @Override - public void start(I iface, isActive_args args, org.apache.thrift.async.AsyncMethodCallback<java.lang.Boolean> resultHandler) throws org.apache.thrift.TException { - iface.isActive(args.tinfo, args.tid,resultHandler); - } - } - public static class ping<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, ping_args, Void> { public ping() { super("ping"); @@ -8396,865 +8230,6 @@ public class ClientService { } } - @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) - public static class isActive_args implements org.apache.thrift.TBase<isActive_args, isActive_args._Fields>, java.io.Serializable, Cloneable, Comparable<isActive_args> { - private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("isActive_args"); - - private static final org.apache.thrift.protocol.TField TINFO_FIELD_DESC = new org.apache.thrift.protocol.TField("tinfo", org.apache.thrift.protocol.TType.STRUCT, (short)1); - private static final org.apache.thrift.protocol.TField TID_FIELD_DESC = new org.apache.thrift.protocol.TField("tid", org.apache.thrift.protocol.TType.I64, (short)2); - - private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new isActive_argsStandardSchemeFactory(); - private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new isActive_argsTupleSchemeFactory(); - - public @org.apache.thrift.annotation.Nullable TInfo tinfo; // required - public long tid; // required - - /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ - public enum _Fields implements org.apache.thrift.TFieldIdEnum { - TINFO((short)1, "tinfo"), - TID((short)2, "tid"); - - private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>(); - - static { - for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) { - byName.put(field.getFieldName(), field); - } - } - - /** - * Find the _Fields constant that matches fieldId, or null if its not found. - */ - @org.apache.thrift.annotation.Nullable - public static _Fields findByThriftId(int fieldId) { - switch(fieldId) { - case 1: // TINFO - return TINFO; - case 2: // TID - return TID; - default: - return null; - } - } - - /** - * Find the _Fields constant that matches fieldId, throwing an exception - * if it is not found. - */ - public static _Fields findByThriftIdOrThrow(int fieldId) { - _Fields fields = findByThriftId(fieldId); - if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!"); - return fields; - } - - /** - * Find the _Fields constant that matches name, or null if its not found. - */ - @org.apache.thrift.annotation.Nullable - public static _Fields findByName(java.lang.String name) { - return byName.get(name); - } - - private final short _thriftId; - private final java.lang.String _fieldName; - - _Fields(short thriftId, java.lang.String fieldName) { - _thriftId = thriftId; - _fieldName = fieldName; - } - - @Override - public short getThriftFieldId() { - return _thriftId; - } - - @Override - public java.lang.String getFieldName() { - return _fieldName; - } - } - - // isset id assignments - private static final int __TID_ISSET_ID = 0; - private byte __isset_bitfield = 0; - public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; - static { - java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); - tmpMap.put(_Fields.TINFO, new org.apache.thrift.meta_data.FieldMetaData("tinfo", org.apache.thrift.TFieldRequirementType.DEFAULT, - new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, TInfo.class))); - tmpMap.put(_Fields.TID, new org.apache.thrift.meta_data.FieldMetaData("tid", org.apache.thrift.TFieldRequirementType.DEFAULT, - new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); - metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); - org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(isActive_args.class, metaDataMap); - } - - public isActive_args() { - } - - public isActive_args( - TInfo tinfo, - long tid) - { - this(); - this.tinfo = tinfo; - this.tid = tid; - setTidIsSet(true); - } - - /** - * Performs a deep copy on <i>other</i>. - */ - public isActive_args(isActive_args other) { - __isset_bitfield = other.__isset_bitfield; - if (other.isSetTinfo()) { - this.tinfo = new TInfo(other.tinfo); - } - this.tid = other.tid; - } - - @Override - public isActive_args deepCopy() { - return new isActive_args(this); - } - - @Override - public void clear() { - this.tinfo = null; - setTidIsSet(false); - this.tid = 0; - } - - @org.apache.thrift.annotation.Nullable - public TInfo getTinfo() { - return this.tinfo; - } - - public isActive_args setTinfo(@org.apache.thrift.annotation.Nullable TInfo tinfo) { - this.tinfo = tinfo; - return this; - } - - public void unsetTinfo() { - this.tinfo = null; - } - - /** Returns true if field tinfo is set (has been assigned a value) and false otherwise */ - public boolean isSetTinfo() { - return this.tinfo != null; - } - - public void setTinfoIsSet(boolean value) { - if (!value) { - this.tinfo = null; - } - } - - public long getTid() { - return this.tid; - } - - public isActive_args setTid(long tid) { - this.tid = tid; - setTidIsSet(true); - return this; - } - - public void unsetTid() { - __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __TID_ISSET_ID); - } - - /** Returns true if field tid is set (has been assigned a value) and false otherwise */ - public boolean isSetTid() { - return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __TID_ISSET_ID); - } - - public void setTidIsSet(boolean value) { - __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __TID_ISSET_ID, value); - } - - @Override - public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) { - switch (field) { - case TINFO: - if (value == null) { - unsetTinfo(); - } else { - setTinfo((TInfo)value); - } - break; - - case TID: - if (value == null) { - unsetTid(); - } else { - setTid((java.lang.Long)value); - } - break; - - } - } - - @org.apache.thrift.annotation.Nullable - @Override - public java.lang.Object getFieldValue(_Fields field) { - switch (field) { - case TINFO: - return getTinfo(); - - case TID: - return getTid(); - - } - throw new java.lang.IllegalStateException(); - } - - /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ - @Override - public boolean isSet(_Fields field) { - if (field == null) { - throw new java.lang.IllegalArgumentException(); - } - - switch (field) { - case TINFO: - return isSetTinfo(); - case TID: - return isSetTid(); - } - throw new java.lang.IllegalStateException(); - } - - @Override - public boolean equals(java.lang.Object that) { - if (that instanceof isActive_args) - return this.equals((isActive_args)that); - return false; - } - - public boolean equals(isActive_args that) { - if (that == null) - return false; - if (this == that) - return true; - - boolean this_present_tinfo = true && this.isSetTinfo(); - boolean that_present_tinfo = true && that.isSetTinfo(); - if (this_present_tinfo || that_present_tinfo) { - if (!(this_present_tinfo && that_present_tinfo)) - return false; - if (!this.tinfo.equals(that.tinfo)) - return false; - } - - boolean this_present_tid = true; - boolean that_present_tid = true; - if (this_present_tid || that_present_tid) { - if (!(this_present_tid && that_present_tid)) - return false; - if (this.tid != that.tid) - return false; - } - - return true; - } - - @Override - public int hashCode() { - int hashCode = 1; - - hashCode = hashCode * 8191 + ((isSetTinfo()) ? 131071 : 524287); - if (isSetTinfo()) - hashCode = hashCode * 8191 + tinfo.hashCode(); - - hashCode = hashCode * 8191 + org.apache.thrift.TBaseHelper.hashCode(tid); - - return hashCode; - } - - @Override - public int compareTo(isActive_args other) { - if (!getClass().equals(other.getClass())) { - return getClass().getName().compareTo(other.getClass().getName()); - } - - int lastComparison = 0; - - lastComparison = java.lang.Boolean.compare(isSetTinfo(), other.isSetTinfo()); - if (lastComparison != 0) { - return lastComparison; - } - if (isSetTinfo()) { - lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.tinfo, other.tinfo); - if (lastComparison != 0) { - return lastComparison; - } - } - lastComparison = java.lang.Boolean.compare(isSetTid(), other.isSetTid()); - if (lastComparison != 0) { - return lastComparison; - } - if (isSetTid()) { - lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.tid, other.tid); - if (lastComparison != 0) { - return lastComparison; - } - } - return 0; - } - - @org.apache.thrift.annotation.Nullable - @Override - public _Fields fieldForId(int fieldId) { - return _Fields.findByThriftId(fieldId); - } - - @Override - public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { - scheme(iprot).read(iprot, this); - } - - @Override - public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { - scheme(oprot).write(oprot, this); - } - - @Override - public java.lang.String toString() { - java.lang.StringBuilder sb = new java.lang.StringBuilder("isActive_args("); - boolean first = true; - - sb.append("tinfo:"); - if (this.tinfo == null) { - sb.append("null"); - } else { - sb.append(this.tinfo); - } - first = false; - if (!first) sb.append(", "); - sb.append("tid:"); - sb.append(this.tid); - first = false; - sb.append(")"); - return sb.toString(); - } - - public void validate() throws org.apache.thrift.TException { - // check for required fields - // check for sub-struct validity - if (tinfo != null) { - tinfo.validate(); - } - } - - private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { - try { - write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); - } catch (org.apache.thrift.TException te) { - throw new java.io.IOException(te); - } - } - - private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException { - try { - // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor. - __isset_bitfield = 0; - read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); - } catch (org.apache.thrift.TException te) { - throw new java.io.IOException(te); - } - } - - private static class isActive_argsStandardSchemeFactory implements org.apache.thrift.scheme.SchemeFactory { - @Override - public isActive_argsStandardScheme getScheme() { - return new isActive_argsStandardScheme(); - } - } - - private static class isActive_argsStandardScheme extends org.apache.thrift.scheme.StandardScheme<isActive_args> { - - @Override - public void read(org.apache.thrift.protocol.TProtocol iprot, isActive_args struct) throws org.apache.thrift.TException { - org.apache.thrift.protocol.TField schemeField; - iprot.readStructBegin(); - while (true) - { - schemeField = iprot.readFieldBegin(); - if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { - break; - } - switch (schemeField.id) { - case 1: // TINFO - if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) { - struct.tinfo = new TInfo(); - struct.tinfo.read(iprot); - struct.setTinfoIsSet(true); - } else { - org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); - } - break; - case 2: // TID - if (schemeField.type == org.apache.thrift.protocol.TType.I64) { - struct.tid = iprot.readI64(); - struct.setTidIsSet(true); - } else { - org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); - } - break; - default: - org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); - } - iprot.readFieldEnd(); - } - iprot.readStructEnd(); - - // check for required fields of primitive type, which can't be checked in the validate method - struct.validate(); - } - - @Override - public void write(org.apache.thrift.protocol.TProtocol oprot, isActive_args struct) throws org.apache.thrift.TException { - struct.validate(); - - oprot.writeStructBegin(STRUCT_DESC); - if (struct.tinfo != null) { - oprot.writeFieldBegin(TINFO_FIELD_DESC); - struct.tinfo.write(oprot); - oprot.writeFieldEnd(); - } - oprot.writeFieldBegin(TID_FIELD_DESC); - oprot.writeI64(struct.tid); - oprot.writeFieldEnd(); - oprot.writeFieldStop(); - oprot.writeStructEnd(); - } - - } - - private static class isActive_argsTupleSchemeFactory implements org.apache.thrift.scheme.SchemeFactory { - @Override - public isActive_argsTupleScheme getScheme() { - return new isActive_argsTupleScheme(); - } - } - - private static class isActive_argsTupleScheme extends org.apache.thrift.scheme.TupleScheme<isActive_args> { - - @Override - public void write(org.apache.thrift.protocol.TProtocol prot, isActive_args struct) throws org.apache.thrift.TException { - org.apache.thrift.protocol.TTupleProtocol oprot = (org.apache.thrift.protocol.TTupleProtocol) prot; - java.util.BitSet optionals = new java.util.BitSet(); - if (struct.isSetTinfo()) { - optionals.set(0); - } - if (struct.isSetTid()) { - optionals.set(1); - } - oprot.writeBitSet(optionals, 2); - if (struct.isSetTinfo()) { - struct.tinfo.write(oprot); - } - if (struct.isSetTid()) { - oprot.writeI64(struct.tid); - } - } - - @Override - public void read(org.apache.thrift.protocol.TProtocol prot, isActive_args struct) throws org.apache.thrift.TException { - org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot; - java.util.BitSet incoming = iprot.readBitSet(2); - if (incoming.get(0)) { - struct.tinfo = new TInfo(); - struct.tinfo.read(iprot); - struct.setTinfoIsSet(true); - } - if (incoming.get(1)) { - struct.tid = iprot.readI64(); - struct.setTidIsSet(true); - } - } - } - - private static <S extends org.apache.thrift.scheme.IScheme> S scheme(org.apache.thrift.protocol.TProtocol proto) { - return (org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme(); - } - } - - @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) - public static class isActive_result implements org.apache.thrift.TBase<isActive_result, isActive_result._Fields>, java.io.Serializable, Cloneable, Comparable<isActive_result> { - private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("isActive_result"); - - private static final org.apache.thrift.protocol.TField SUCCESS_FIELD_DESC = new org.apache.thrift.protocol.TField("success", org.apache.thrift.protocol.TType.BOOL, (short)0); - - private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new isActive_resultStandardSchemeFactory(); - private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new isActive_resultTupleSchemeFactory(); - - public boolean success; // required - - /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ - public enum _Fields implements org.apache.thrift.TFieldIdEnum { - SUCCESS((short)0, "success"); - - private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>(); - - static { - for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) { - byName.put(field.getFieldName(), field); - } - } - - /** - * Find the _Fields constant that matches fieldId, or null if its not found. - */ - @org.apache.thrift.annotation.Nullable - public static _Fields findByThriftId(int fieldId) { - switch(fieldId) { - case 0: // SUCCESS - return SUCCESS; - default: - return null; - } - } - - /** - * Find the _Fields constant that matches fieldId, throwing an exception - * if it is not found. - */ - public static _Fields findByThriftIdOrThrow(int fieldId) { - _Fields fields = findByThriftId(fieldId); - if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!"); - return fields; - } - - /** - * Find the _Fields constant that matches name, or null if its not found. - */ - @org.apache.thrift.annotation.Nullable - public static _Fields findByName(java.lang.String name) { - return byName.get(name); - } - - private final short _thriftId; - private final java.lang.String _fieldName; - - _Fields(short thriftId, java.lang.String fieldName) { - _thriftId = thriftId; - _fieldName = fieldName; - } - - @Override - public short getThriftFieldId() { - return _thriftId; - } - - @Override - public java.lang.String getFieldName() { - return _fieldName; - } - } - - // isset id assignments - private static final int __SUCCESS_ISSET_ID = 0; - private byte __isset_bitfield = 0; - public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; - static { - java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); - tmpMap.put(_Fields.SUCCESS, new org.apache.thrift.meta_data.FieldMetaData("success", org.apache.thrift.TFieldRequirementType.DEFAULT, - new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL))); - metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); - org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(isActive_result.class, metaDataMap); - } - - public isActive_result() { - } - - public isActive_result( - boolean success) - { - this(); - this.success = success; - setSuccessIsSet(true); - } - - /** - * Performs a deep copy on <i>other</i>. - */ - public isActive_result(isActive_result other) { - __isset_bitfield = other.__isset_bitfield; - this.success = other.success; - } - - @Override - public isActive_result deepCopy() { - return new isActive_result(this); - } - - @Override - public void clear() { - setSuccessIsSet(false); - this.success = false; - } - - public boolean isSuccess() { - return this.success; - } - - public isActive_result setSuccess(boolean success) { - this.success = success; - setSuccessIsSet(true); - return this; - } - - public void unsetSuccess() { - __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __SUCCESS_ISSET_ID); - } - - /** Returns true if field success is set (has been assigned a value) and false otherwise */ - public boolean isSetSuccess() { - return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __SUCCESS_ISSET_ID); - } - - public void setSuccessIsSet(boolean value) { - __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __SUCCESS_ISSET_ID, value); - } - - @Override - public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) { - switch (field) { - case SUCCESS: - if (value == null) { - unsetSuccess(); - } else { - setSuccess((java.lang.Boolean)value); - } - break; - - } - } - - @org.apache.thrift.annotation.Nullable - @Override - public java.lang.Object getFieldValue(_Fields field) { - switch (field) { - case SUCCESS: - return isSuccess(); - - } - throw new java.lang.IllegalStateException(); - } - - /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ - @Override - public boolean isSet(_Fields field) { - if (field == null) { - throw new java.lang.IllegalArgumentException(); - } - - switch (field) { - case SUCCESS: - return isSetSuccess(); - } - throw new java.lang.IllegalStateException(); - } - - @Override - public boolean equals(java.lang.Object that) { - if (that instanceof isActive_result) - return this.equals((isActive_result)that); - return false; - } - - public boolean equals(isActive_result that) { - if (that == null) - return false; - if (this == that) - return true; - - boolean this_present_success = true; - boolean that_present_success = true; - if (this_present_success || that_present_success) { - if (!(this_present_success && that_present_success)) - return false; - if (this.success != that.success) - return false; - } - - return true; - } - - @Override - public int hashCode() { - int hashCode = 1; - - hashCode = hashCode * 8191 + ((success) ? 131071 : 524287); - - return hashCode; - } - - @Override - public int compareTo(isActive_result other) { - if (!getClass().equals(other.getClass())) { - return getClass().getName().compareTo(other.getClass().getName()); - } - - int lastComparison = 0; - - lastComparison = java.lang.Boolean.compare(isSetSuccess(), other.isSetSuccess()); - if (lastComparison != 0) { - return lastComparison; - } - if (isSetSuccess()) { - lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.success, other.success); - if (lastComparison != 0) { - return lastComparison; - } - } - return 0; - } - - @org.apache.thrift.annotation.Nullable - @Override - public _Fields fieldForId(int fieldId) { - return _Fields.findByThriftId(fieldId); - } - - @Override - public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { - scheme(iprot).read(iprot, this); - } - - public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { - scheme(oprot).write(oprot, this); - } - - @Override - public java.lang.String toString() { - java.lang.StringBuilder sb = new java.lang.StringBuilder("isActive_result("); - boolean first = true; - - sb.append("success:"); - sb.append(this.success); - first = false; - sb.append(")"); - return sb.toString(); - } - - public void validate() throws org.apache.thrift.TException { - // check for required fields - // check for sub-struct validity - } - - private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { - try { - write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); - } catch (org.apache.thrift.TException te) { - throw new java.io.IOException(te); - } - } - - private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException { - try { - // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor. - __isset_bitfield = 0; - read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); - } catch (org.apache.thrift.TException te) { - throw new java.io.IOException(te); - } - } - - private static class isActive_resultStandardSchemeFactory implements org.apache.thrift.scheme.SchemeFactory { - @Override - public isActive_resultStandardScheme getScheme() { - return new isActive_resultStandardScheme(); - } - } - - private static class isActive_resultStandardScheme extends org.apache.thrift.scheme.StandardScheme<isActive_result> { - - @Override - public void read(org.apache.thrift.protocol.TProtocol iprot, isActive_result struct) throws org.apache.thrift.TException { - org.apache.thrift.protocol.TField schemeField; - iprot.readStructBegin(); - while (true) - { - schemeField = iprot.readFieldBegin(); - if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { - break; - } - switch (schemeField.id) { - case 0: // SUCCESS - if (schemeField.type == org.apache.thrift.protocol.TType.BOOL) { - struct.success = iprot.readBool(); - struct.setSuccessIsSet(true); - } else { - org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); - } - break; - default: - org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); - } - iprot.readFieldEnd(); - } - iprot.readStructEnd(); - - // check for required fields of primitive type, which can't be checked in the validate method - struct.validate(); - } - - @Override - public void write(org.apache.thrift.protocol.TProtocol oprot, isActive_result struct) throws org.apache.thrift.TException { - struct.validate(); - - oprot.writeStructBegin(STRUCT_DESC); - if (struct.isSetSuccess()) { - oprot.writeFieldBegin(SUCCESS_FIELD_DESC); - oprot.writeBool(struct.success); - oprot.writeFieldEnd(); - } - oprot.writeFieldStop(); - oprot.writeStructEnd(); - } - - } - - private static class isActive_resultTupleSchemeFactory implements org.apache.thrift.scheme.SchemeFactory { - @Override - public isActive_resultTupleScheme getScheme() { - return new isActive_resultTupleScheme(); - } - } - - private static class isActive_resultTupleScheme extends org.apache.thrift.scheme.TupleScheme<isActive_result> { - - @Override - public void write(org.apache.thrift.protocol.TProtocol prot, isActive_result struct) throws org.apache.thrift.TException { - org.apache.thrift.protocol.TTupleProtocol oprot = (org.apache.thrift.protocol.TTupleProtocol) prot; - java.util.BitSet optionals = new java.util.BitSet(); - if (struct.isSetSuccess()) { - optionals.set(0); - } - oprot.writeBitSet(optionals, 1); - if (struct.isSetSuccess()) { - oprot.writeBool(struct.success); - } - } - - @Override - public void read(org.apache.thrift.protocol.TProtocol prot, isActive_result struct) throws org.apache.thrift.TException { - org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot; - java.util.BitSet incoming = iprot.readBitSet(1); - if (incoming.get(0)) { - struct.success = iprot.readBool(); - struct.setSuccessIsSet(true); - } - } - } - - private static <S extends org.apache.thrift.scheme.IScheme> S scheme(org.apache.thrift.protocol.TProtocol proto) { - return (org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme(); - } - } - @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) public static class ping_args implements org.apache.thrift.TBase<ping_args, ping_args._Fields>, java.io.Serializable, Cloneable, Comparable<ping_args> { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ping_args"); diff --git a/core/src/main/thrift/client.thrift b/core/src/main/thrift/client.thrift index 700ca7a6db..b7c270facd 100644 --- a/core/src/main/thrift/client.thrift +++ b/core/src/main/thrift/client.thrift @@ -135,12 +135,6 @@ service ClientService { string getInstanceId() string getZooKeepers() - // ensures that nobody is working on the transaction id above - bool isActive( - 1:TInfo tinfo - 2:i64 tid - ) - void ping( 2:security.TCredentials credentials ) throws ( diff --git a/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java b/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java index 0a416bc8db..271f86fe93 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java +++ b/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java @@ -64,20 +64,17 @@ import org.apache.accumulo.server.conf.store.SystemPropKey; import org.apache.accumulo.server.conf.store.TablePropKey; import org.apache.accumulo.server.security.SecurityOperation; import org.apache.accumulo.server.util.TableDiskUsage; -import org.apache.accumulo.server.zookeeper.TransactionWatcher; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ClientServiceHandler implements ClientService.Iface { private static final Logger log = LoggerFactory.getLogger(ClientServiceHandler.class); - protected final TransactionWatcher transactionWatcher; protected final ServerContext context; protected final SecurityOperation security; - public ClientServiceHandler(ServerContext context, TransactionWatcher transactionWatcher) { + public ClientServiceHandler(ServerContext context) { this.context = context; - this.transactionWatcher = transactionWatcher; this.security = context.getSecurityOperation(); } @@ -399,11 +396,6 @@ public class ClientServiceHandler implements ClientService.Iface { .orElseThrow(); } - @Override - public boolean isActive(TInfo tinfo, long tid) { - return transactionWatcher.isActive(tid); - } - @Override public boolean checkClass(TInfo tinfo, TCredentials credentials, String className, String interfaceMatch) throws TException { diff --git a/server/base/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java b/server/base/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java index 9a4d3e695c..7254fc56ac 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java +++ b/server/base/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java @@ -27,7 +27,6 @@ import org.apache.accumulo.core.iterators.Filter; import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; -import org.apache.accumulo.server.zookeeper.TransactionWatcher.Arbitrator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,7 +43,6 @@ public class MetadataBulkLoadFilter extends Filter { } Map<Long,Status> bulkTxStatusCache; - Arbitrator arbitrator; @Override public boolean accept(Key k, Value v) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java index f5914c4a3f..eada6aa11a 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java @@ -32,7 +32,6 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.clientImpl.thrift.ClientService; import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; @@ -182,16 +181,6 @@ public class LiveTServerSet implements Watcher { } } - public boolean isActive(long tid) throws TException { - ClientService.Client client = - ThriftUtil.getClient(ThriftClientTypes.CLIENT, address, context); - try { - return client.isActive(TraceUtil.traceInfo(), tid); - } finally { - ThriftUtil.returnClient(client, context); - } - } - } static class TServerInfo { diff --git a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java deleted file mode 100644 index 17a18eb6c8..0000000000 --- a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java +++ /dev/null @@ -1,195 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.server.zookeeper; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.accumulo.core.fate.zookeeper.ZooReader; -import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; -import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; -import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; -import org.apache.accumulo.server.ServerContext; -import org.apache.zookeeper.KeeperException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -//ELASTICITY_TODO remove this class -public class TransactionWatcher { - - private static final Logger log = LoggerFactory.getLogger(TransactionWatcher.class); - final private Map<Long,AtomicInteger> counts = new HashMap<>(); - final private Arbitrator arbitrator; - - public TransactionWatcher(Arbitrator arbitrator) { - this.arbitrator = arbitrator; - } - - public TransactionWatcher(ServerContext context) { - this(new ZooArbitrator(context)); - } - - public interface Arbitrator { - boolean transactionAlive(String type, long tid) throws Exception; - - boolean transactionComplete(String type, long tid) throws Exception; - } - - public static class ZooArbitrator implements Arbitrator { - - private ServerContext context; - private ZooReader rdr; - - public ZooArbitrator(ServerContext context) { - this.context = context; - rdr = context.getZooReader(); - } - - @Override - public boolean transactionAlive(String type, long tid) throws Exception { - String path = context.getZooKeeperRoot() + "/" + type + "/" + tid; - rdr.sync(path); - return rdr.exists(path); - } - - public static void start(ServerContext context, String type, long tid) - throws KeeperException, InterruptedException { - ZooReaderWriter writer = context.getZooReaderWriter(); - writer.putPersistentData(context.getZooKeeperRoot() + "/" + type, new byte[] {}, - NodeExistsPolicy.OVERWRITE); - writer.putPersistentData(context.getZooKeeperRoot() + "/" + type + "/" + tid, new byte[] {}, - NodeExistsPolicy.OVERWRITE); - writer.putPersistentData(context.getZooKeeperRoot() + "/" + type + "/" + tid + "-running", - new byte[] {}, NodeExistsPolicy.OVERWRITE); - } - - public static void stop(ServerContext context, String type, long tid) - throws KeeperException, InterruptedException { - ZooReaderWriter writer = context.getZooReaderWriter(); - writer.recursiveDelete(context.getZooKeeperRoot() + "/" + type + "/" + tid, - NodeMissingPolicy.SKIP); - } - - public static void cleanup(ServerContext context, String type, long tid) - throws KeeperException, InterruptedException { - ZooReaderWriter writer = context.getZooReaderWriter(); - writer.recursiveDelete(context.getZooKeeperRoot() + "/" + type + "/" + tid, - NodeMissingPolicy.SKIP); - writer.recursiveDelete(context.getZooKeeperRoot() + "/" + type + "/" + tid + "-running", - NodeMissingPolicy.SKIP); - } - - public static Set<Long> allTransactionsAlive(ServerContext context, String type) - throws KeeperException, InterruptedException { - final ZooReader reader = context.getZooReaderWriter(); - final Set<Long> result = new HashSet<>(); - final String parent = context.getZooKeeperRoot() + "/" + type; - reader.sync(parent); - if (reader.exists(parent)) { - List<String> children = reader.getChildren(parent); - for (String child : children) { - if (child.endsWith("-running")) { - continue; - } - result.add(Long.parseLong(child)); - } - } - return result; - } - - @Override - public boolean transactionComplete(String type, long tid) throws Exception { - String path = context.getZooKeeperRoot() + "/" + type + "/" + tid + "-running"; - rdr.sync(path); - return !rdr.exists(path); - } - } - - /** - * Run task only if transaction is still active in zookeeper. If the tx is no longer active then - * that task is not run and a debug message is logged indicating the task was ignored. - */ - public void runQuietly(String ztxBulk, long tid, Runnable task) { - synchronized (counts) { - try { - if (!arbitrator.transactionAlive(ztxBulk, tid)) { - log.debug("Transaction " + tid + " of type " + ztxBulk + " is no longer active."); - return; - } - } catch (Exception e) { - log.warn("Unable to check if transaction " + tid + " of type " + ztxBulk + " is alive ", e); - return; - } - increment(tid); - } - try { - task.run(); - } finally { - decrement(tid); - } - } - - public <T> T run(String ztxBulk, long tid, Callable<T> callable) throws Exception { - synchronized (counts) { - if (!arbitrator.transactionAlive(ztxBulk, tid)) { - throw new Exception("Transaction " + tid + " of type " + ztxBulk + " is no longer active"); - } - increment(tid); - } - try { - return callable.call(); - } finally { - decrement(tid); - } - } - - public boolean isActive(long tid) { - synchronized (counts) { - log.debug("Transactions in progress {}", counts); - AtomicInteger count = counts.get(tid); - return count != null && count.get() > 0; - } - } - - private void increment(long tid) { - AtomicInteger count = counts.get(tid); - if (count == null) { - counts.put(tid, count = new AtomicInteger()); - } - count.incrementAndGet(); - } - - private void decrement(long tid) { - synchronized (counts) { - AtomicInteger count = counts.get(tid); - if (count == null) { - log.error("unexpected missing count for transaction {}", tid); - } else { - if (count.decrementAndGet() == 0) { - counts.remove(tid); - } - } - } - } -} diff --git a/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java b/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java index 04bd9c6d64..5ffdc963f7 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java @@ -281,7 +281,7 @@ public class TServerUtilsTest { } private ServerAddress startServer() throws Exception { - ClientServiceHandler clientHandler = new ClientServiceHandler(context, null); + ClientServiceHandler clientHandler = new ClientServiceHandler(context); Iface rpcProxy = TraceUtil.wrapService(clientHandler); Processor<Iface> processor = new Processor<>(rpcProxy); // "localhost" explicitly to make sure we can always bind to that interface (avoids DNS diff --git a/server/base/src/test/java/org/apache/accumulo/server/zookeeper/TransactionWatcherTest.java b/server/base/src/test/java/org/apache/accumulo/server/zookeeper/TransactionWatcherTest.java deleted file mode 100644 index e61bad1f4e..0000000000 --- a/server/base/src/test/java/org/apache/accumulo/server/zookeeper/TransactionWatcherTest.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.server.zookeeper; - -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.junit.jupiter.api.Test; - -public class TransactionWatcherTest { - - static class SimpleArbitrator implements TransactionWatcher.Arbitrator { - Map<String,List<Long>> started = new HashMap<>(); - Map<String,List<Long>> cleanedUp = new HashMap<>(); - - public synchronized void start(String txType, Long txid) throws Exception { - List<Long> txids = started.get(txType); - if (txids == null) { - txids = new ArrayList<>(); - } - if (txids.contains(txid)) { - throw new Exception("transaction already started"); - } - txids.add(txid); - started.put(txType, txids); - - txids = cleanedUp.get(txType); - if (txids == null) { - txids = new ArrayList<>(); - } - if (txids.contains(txid)) { - throw new IllegalStateException("transaction was started but not cleaned up"); - } - txids.add(txid); - cleanedUp.put(txType, txids); - } - - public synchronized void stop(String txType, Long txid) throws Exception { - List<Long> txids = started.get(txType); - if (txids != null && txids.contains(txid)) { - txids.remove(txids.indexOf(txid)); - return; - } - throw new Exception("transaction does not exist"); - } - - public synchronized void cleanup(String txType, Long txid) throws Exception { - List<Long> txids = cleanedUp.get(txType); - if (txids != null && txids.contains(txid)) { - txids.remove(txids.indexOf(txid)); - return; - } - throw new Exception("transaction does not exist"); - } - - @Override - public synchronized boolean transactionAlive(String txType, long tid) { - List<Long> txids = started.get(txType); - if (txids == null) { - return false; - } - return txids.contains(tid); - } - - @Override - public boolean transactionComplete(String txType, long tid) { - List<Long> txids = cleanedUp.get(txType); - if (txids == null) { - return true; - } - return !txids.contains(tid); - } - - } - - @Test - public void testTransactionWatcher() throws Exception { - final String txType = "someName"; - final long txid = 7; - final SimpleArbitrator sa = new SimpleArbitrator(); - final TransactionWatcher txw = new TransactionWatcher(sa); - sa.start(txType, txid); - assertThrows(Exception.class, () -> sa.start(txType, txid), - "simple arbitrator did not throw an exception"); - txw.isActive(txid); - assertFalse(txw.isActive(txid)); - txw.run(txType, txid, () -> { - assertTrue(txw.isActive(txid)); - return null; - }); - assertFalse(txw.isActive(txid)); - assertFalse(sa.transactionComplete(txType, txid)); - sa.stop(txType, txid); - assertFalse(sa.transactionAlive(txType, txid)); - assertFalse(sa.transactionComplete(txType, txid)); - sa.cleanup(txType, txid); - assertTrue(sa.transactionComplete(txType, txid)); - assertThrows(Exception.class, () -> txw.run(txType, txid, () -> null), - "Should not be able to start a new work on a discontinued transaction"); - final long txid2 = 9; - sa.start(txType, txid2); - txw.run(txType, txid2, () -> { - assertTrue(txw.isActive(txid2)); - sa.stop(txType, txid2); - assertThrows(Exception.class, () -> txw.run(txType, txid2, () -> null), - "Should not be able to start a new work on a discontinued transaction"); - assertTrue(txw.isActive(txid2)); - return null; - }); - - } - -} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java index 38265107f8..4befb3499f 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java @@ -53,7 +53,6 @@ import org.apache.accumulo.manager.tableOps.Utils; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.tablets.UniqueNameAllocator; -import org.apache.accumulo.server.zookeeper.TransactionWatcher; import org.apache.commons.io.FilenameUtils; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; @@ -283,7 +282,5 @@ public class PrepBulkImport extends ManagerRepo { // unreserve sourceDir/error directories Utils.unreserveHdfsDirectory(environment, bulkInfo.sourceDir, tid); Utils.getReadLock(environment, bulkInfo.tableId, tid).unlock(); - TransactionWatcher.ZooArbitrator.cleanup(environment.getContext(), - Constants.BULK_ARBITRATOR_TYPE, tid); } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java index 4b53ac58b9..7f34d65b4f 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java @@ -104,7 +104,6 @@ import org.apache.accumulo.server.rpc.TServerUtils; import org.apache.accumulo.server.security.SecurityOperation; import org.apache.accumulo.server.tablets.ConditionCheckerContext; import org.apache.accumulo.server.tablets.ConditionCheckerContext.ConditionChecker; -import org.apache.accumulo.server.zookeeper.TransactionWatcher; import org.apache.accumulo.tserver.RowLocks.RowLock; import org.apache.accumulo.tserver.session.ConditionalSession; import org.apache.accumulo.tserver.session.SummarySession; @@ -132,16 +131,13 @@ public class TabletClientHandler implements TabletServerClientService.Iface, private static final Logger log = LoggerFactory.getLogger(TabletClientHandler.class); private final long MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS; private final TabletServer server; - protected final TransactionWatcher watcher; protected final ServerContext context; protected final SecurityOperation security; private final WriteTracker writeTracker; private final RowLocks rowLocks = new RowLocks(); - public TabletClientHandler(TabletServer server, TransactionWatcher watcher, - WriteTracker writeTracker) { + public TabletClientHandler(TabletServer server, WriteTracker writeTracker) { this.context = server.getContext(); - this.watcher = watcher; this.writeTracker = writeTracker; this.security = context.getSecurityOperation(); this.server = server; diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 8a68b2a7cc..cc06d8f01b 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -128,7 +128,6 @@ import org.apache.accumulo.server.security.SecurityUtil; import org.apache.accumulo.server.security.delegation.ZooAuthenticationKeyWatcher; import org.apache.accumulo.server.util.time.RelativeTime; import org.apache.accumulo.server.zookeeper.DistributedWorkQueue; -import org.apache.accumulo.server.zookeeper.TransactionWatcher; import org.apache.accumulo.tserver.log.DfsLogger; import org.apache.accumulo.tserver.log.LogSorter; import org.apache.accumulo.tserver.log.MutationReceiver; @@ -480,14 +479,13 @@ public class TabletServer extends AbstractServer implements TabletHostingServer return null; } - protected ClientServiceHandler newClientHandler(TransactionWatcher watcher) { - return new ClientServiceHandler(context, watcher); + protected ClientServiceHandler newClientHandler() { + return new ClientServiceHandler(context); } // exists to be overridden in tests - protected TabletClientHandler newTabletClientHandler(TransactionWatcher watcher, - WriteTracker writeTracker) { - return new TabletClientHandler(this, watcher, writeTracker); + protected TabletClientHandler newTabletClientHandler(WriteTracker writeTracker) { + return new TabletClientHandler(this, writeTracker); } protected ThriftScanClientHandler newThriftScanClientHandler(WriteTracker writeTracker) { @@ -500,10 +498,9 @@ public class TabletServer extends AbstractServer implements TabletHostingServer private HostAndPort startTabletClientService() throws UnknownHostException { // start listening for client connection last - TransactionWatcher watcher = new TransactionWatcher(context); WriteTracker writeTracker = new WriteTracker(); - clientHandler = newClientHandler(watcher); - thriftClientHandler = newTabletClientHandler(watcher, writeTracker); + clientHandler = newClientHandler(); + thriftClientHandler = newTabletClientHandler(writeTracker); scanClientHandler = newThriftScanClientHandler(writeTracker); TProcessor processor = diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java index 3c59eac337..2ee6c5d1e7 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java @@ -46,7 +46,6 @@ import java.util.TreeSet; import java.util.function.Supplier; import java.util.stream.Collectors; -import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.BatchWriter; @@ -75,7 +74,6 @@ import org.apache.accumulo.core.metadata.schema.TabletOperationType; import org.apache.accumulo.core.security.TablePermission; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.server.metadata.ConditionalTabletsMutatorImpl; -import org.apache.accumulo.server.zookeeper.TransactionWatcher; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.junit.jupiter.api.BeforeEach; @@ -278,9 +276,6 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness { assertEquals(Set.of(stf4), context.getAmple().readTablet(e1).getFiles()); - // without this the metadata constraint will not allow the bulk file to be added to metadata - TransactionWatcher.ZooArbitrator.start(context, Constants.BULK_ARBITRATOR_TYPE, 9L); - // simulate a bulk import var stf5 = StoredTabletFile .of(new Path("hdfs://localhost:8020/accumulo/tables/2a/b-0000009/I0000074.rf")); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkFailureIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkFailureIT.java index a70dfddfef..b273a4813c 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/BulkFailureIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkFailureIT.java @@ -75,7 +75,6 @@ import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.tablets.UniqueNameAllocator; -import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -226,7 +225,6 @@ public class BulkFailureIT extends AccumuloClusterHarness { KeyExtent extent = new KeyExtent(TableId.of(tableId), null, null); ServerContext asCtx = getServerContext(); - ZooArbitrator.start(asCtx, Constants.BULK_ARBITRATOR_TYPE, fateTxid); VolumeManager vm = asCtx.getVolumeManager(); @@ -273,9 +271,6 @@ public class BulkFailureIT extends AccumuloClusterHarness { assertEquals(Set.of(bulkLoadPath), getLoaded(c, extent)); assertEquals(testData, readTable(table, c)); - // After this, all load request should fail. - ZooArbitrator.stop(asCtx, Constants.BULK_ARBITRATOR_TYPE, fateTxid); - c.securityOperations().grantTablePermission(c.whoami(), MetadataTable.NAME, TablePermission.WRITE); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java index ec1b65a86f..7816b156d5 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java @@ -75,7 +75,6 @@ import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.manager.state.Assignment; import org.apache.accumulo.server.util.ManagerMetadataUtil; import org.apache.accumulo.server.util.MetadataTableUtil; -import org.apache.accumulo.server.zookeeper.TransactionWatcher; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.junit.jupiter.api.Disabled; @@ -175,7 +174,6 @@ public class SplitRecoveryIT extends ConfigurableMacBase { new DataFileValue(1000017 + i, 10000 + i)); int tid = 0; - TransactionWatcher.ZooArbitrator.start(context, Constants.BULK_ARBITRATOR_TYPE, tid); SortedMap<StoredTabletFile,DataFileValue> storedFiles = new TreeMap<>(MetadataTableUtil.updateTabletDataFile(tid, extent, dataFiles, new MetadataTime(0, TimeType.LOGICAL), context, zl)); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java index c9471fe2a5..a1ed4d4c52 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java @@ -49,7 +49,6 @@ import org.apache.accumulo.server.rpc.ServerAddress; import org.apache.accumulo.server.rpc.TServerUtils; import org.apache.accumulo.server.rpc.ThriftProcessorTypes; import org.apache.accumulo.server.rpc.ThriftServerType; -import org.apache.accumulo.server.zookeeper.TransactionWatcher; import org.apache.thrift.TMultiplexedProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -109,8 +108,7 @@ public class ZombieTServer { public static void main(String[] args) throws Exception { int port = RANDOM.get().nextInt(30000) + 2000; var context = new ServerContext(SiteConfiguration.auto()); - final ClientServiceHandler csh = - new ClientServiceHandler(context, new TransactionWatcher(context)); + final ClientServiceHandler csh = new ClientServiceHandler(context); final ZombieTServerThriftClientHandler tch = new ZombieTServerThriftClientHandler(); TMultiplexedProcessor muxProcessor = new TMultiplexedProcessor(); diff --git a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java index af8f9ddd02..792d186054 100644 --- a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java +++ b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java @@ -76,7 +76,6 @@ import org.apache.accumulo.server.manager.state.TabletStateStore; import org.apache.accumulo.server.rpc.TServerUtils; import org.apache.accumulo.server.rpc.ThriftProcessorTypes; import org.apache.accumulo.server.rpc.ThriftServerType; -import org.apache.accumulo.server.zookeeper.TransactionWatcher; import org.apache.thrift.TException; import org.apache.thrift.TMultiplexedProcessor; @@ -276,7 +275,7 @@ public class NullTserver { (int) DefaultConfiguration.getInstance().getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT); var siteConfig = SiteConfiguration.auto(); ServerContext context = ServerContext.override(siteConfig, opts.iname, opts.keepers, zkTimeOut); - ClientServiceHandler csh = new ClientServiceHandler(context, new TransactionWatcher(context)); + ClientServiceHandler csh = new ClientServiceHandler(context); NullTServerTabletClientHandler tch = new NullTServerTabletClientHandler(); TMultiplexedProcessor muxProcessor = new TMultiplexedProcessor();