Repository: accumulo Updated Branches: refs/heads/ACCUMULO-378 2425fd24b -> 9d9b5ed24
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3ef383d/core/src/main/java/org/apache/accumulo/core/replication/thrift/ReplicationServicer.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/replication/thrift/ReplicationServicer.java b/core/src/main/java/org/apache/accumulo/core/replication/thrift/ReplicationServicer.java index e5e26ca..e297445 100644 --- a/core/src/main/java/org/apache/accumulo/core/replication/thrift/ReplicationServicer.java +++ b/core/src/main/java/org/apache/accumulo/core/replication/thrift/ReplicationServicer.java @@ -50,17 +50,17 @@ import org.slf4j.LoggerFactory; public interface Iface { - public long replicateLog(int remoteTableId, WalEdits data) throws RemoteReplicationException, org.apache.thrift.TException; + public long replicateLog(int remoteTableId, WalEdits data, org.apache.accumulo.core.security.thrift.TCredentials credentials) throws RemoteReplicationException, org.apache.thrift.TException; - public long replicateKeyValues(int remoteTableId, KeyValues data) throws RemoteReplicationException, org.apache.thrift.TException; + public long replicateKeyValues(int remoteTableId, KeyValues data, org.apache.accumulo.core.security.thrift.TCredentials credentials) throws RemoteReplicationException, org.apache.thrift.TException; } public interface AsyncIface { - public void replicateLog(int remoteTableId, WalEdits data, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.replicateLog_call> resultHandler) throws org.apache.thrift.TException; + public void replicateLog(int remoteTableId, WalEdits data, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.replicateLog_call> resultHandler) throws org.apache.thrift.TException; - public void replicateKeyValues(int remoteTableId, KeyValues data, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.replicateKeyValues_call> resultHandler) throws org.apache.thrift.TException; + public void replicateKeyValues(int remoteTableId, KeyValues data, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.replicateKeyValues_call> resultHandler) throws org.apache.thrift.TException; } @@ -84,17 +84,18 @@ import org.slf4j.LoggerFactory; super(iprot, oprot); } - public long replicateLog(int remoteTableId, WalEdits data) throws RemoteReplicationException, org.apache.thrift.TException + public long replicateLog(int remoteTableId, WalEdits data, org.apache.accumulo.core.security.thrift.TCredentials credentials) throws RemoteReplicationException, org.apache.thrift.TException { - send_replicateLog(remoteTableId, data); + send_replicateLog(remoteTableId, data, credentials); return recv_replicateLog(); } - public void send_replicateLog(int remoteTableId, WalEdits data) throws org.apache.thrift.TException + public void send_replicateLog(int remoteTableId, WalEdits data, org.apache.accumulo.core.security.thrift.TCredentials credentials) throws org.apache.thrift.TException { replicateLog_args args = new replicateLog_args(); args.setRemoteTableId(remoteTableId); args.setData(data); + args.setCredentials(credentials); sendBase("replicateLog", args); } @@ -111,17 +112,18 @@ import org.slf4j.LoggerFactory; throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "replicateLog failed: unknown result"); } - public long replicateKeyValues(int remoteTableId, KeyValues data) throws RemoteReplicationException, org.apache.thrift.TException + public long replicateKeyValues(int remoteTableId, KeyValues data, org.apache.accumulo.core.security.thrift.TCredentials credentials) throws RemoteReplicationException, org.apache.thrift.TException { - send_replicateKeyValues(remoteTableId, data); + send_replicateKeyValues(remoteTableId, data, credentials); return recv_replicateKeyValues(); } - public void send_replicateKeyValues(int remoteTableId, KeyValues data) throws org.apache.thrift.TException + public void send_replicateKeyValues(int remoteTableId, KeyValues data, org.apache.accumulo.core.security.thrift.TCredentials credentials) throws org.apache.thrift.TException { replicateKeyValues_args args = new replicateKeyValues_args(); args.setRemoteTableId(remoteTableId); args.setData(data); + args.setCredentials(credentials); sendBase("replicateKeyValues", args); } @@ -156,9 +158,9 @@ import org.slf4j.LoggerFactory; super(protocolFactory, clientManager, transport); } - public void replicateLog(int remoteTableId, WalEdits data, org.apache.thrift.async.AsyncMethodCallback<replicateLog_call> resultHandler) throws org.apache.thrift.TException { + public void replicateLog(int remoteTableId, WalEdits data, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.thrift.async.AsyncMethodCallback<replicateLog_call> resultHandler) throws org.apache.thrift.TException { checkReady(); - replicateLog_call method_call = new replicateLog_call(remoteTableId, data, resultHandler, this, ___protocolFactory, ___transport); + replicateLog_call method_call = new replicateLog_call(remoteTableId, data, credentials, resultHandler, this, ___protocolFactory, ___transport); this.___currentMethod = method_call; ___manager.call(method_call); } @@ -166,10 +168,12 @@ import org.slf4j.LoggerFactory; public static class replicateLog_call extends org.apache.thrift.async.TAsyncMethodCall { private int remoteTableId; private WalEdits data; - public replicateLog_call(int remoteTableId, WalEdits data, org.apache.thrift.async.AsyncMethodCallback<replicateLog_call> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException { + private org.apache.accumulo.core.security.thrift.TCredentials credentials; + public replicateLog_call(int remoteTableId, WalEdits data, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.thrift.async.AsyncMethodCallback<replicateLog_call> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException { super(client, protocolFactory, transport, resultHandler, false); this.remoteTableId = remoteTableId; this.data = data; + this.credentials = credentials; } public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException { @@ -177,6 +181,7 @@ import org.slf4j.LoggerFactory; replicateLog_args args = new replicateLog_args(); args.setRemoteTableId(remoteTableId); args.setData(data); + args.setCredentials(credentials); args.write(prot); prot.writeMessageEnd(); } @@ -191,9 +196,9 @@ import org.slf4j.LoggerFactory; } } - public void replicateKeyValues(int remoteTableId, KeyValues data, org.apache.thrift.async.AsyncMethodCallback<replicateKeyValues_call> resultHandler) throws org.apache.thrift.TException { + public void replicateKeyValues(int remoteTableId, KeyValues data, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.thrift.async.AsyncMethodCallback<replicateKeyValues_call> resultHandler) throws org.apache.thrift.TException { checkReady(); - replicateKeyValues_call method_call = new replicateKeyValues_call(remoteTableId, data, resultHandler, this, ___protocolFactory, ___transport); + replicateKeyValues_call method_call = new replicateKeyValues_call(remoteTableId, data, credentials, resultHandler, this, ___protocolFactory, ___transport); this.___currentMethod = method_call; ___manager.call(method_call); } @@ -201,10 +206,12 @@ import org.slf4j.LoggerFactory; public static class replicateKeyValues_call extends org.apache.thrift.async.TAsyncMethodCall { private int remoteTableId; private KeyValues data; - public replicateKeyValues_call(int remoteTableId, KeyValues data, org.apache.thrift.async.AsyncMethodCallback<replicateKeyValues_call> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException { + private org.apache.accumulo.core.security.thrift.TCredentials credentials; + public replicateKeyValues_call(int remoteTableId, KeyValues data, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.thrift.async.AsyncMethodCallback<replicateKeyValues_call> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException { super(client, protocolFactory, transport, resultHandler, false); this.remoteTableId = remoteTableId; this.data = data; + this.credentials = credentials; } public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException { @@ -212,6 +219,7 @@ import org.slf4j.LoggerFactory; replicateKeyValues_args args = new replicateKeyValues_args(); args.setRemoteTableId(remoteTableId); args.setData(data); + args.setCredentials(credentials); args.write(prot); prot.writeMessageEnd(); } @@ -260,7 +268,7 @@ import org.slf4j.LoggerFactory; public replicateLog_result getResult(I iface, replicateLog_args args) throws org.apache.thrift.TException { replicateLog_result result = new replicateLog_result(); try { - result.success = iface.replicateLog(args.remoteTableId, args.data); + result.success = iface.replicateLog(args.remoteTableId, args.data, args.credentials); result.setSuccessIsSet(true); } catch (RemoteReplicationException e) { result.e = e; @@ -285,7 +293,7 @@ import org.slf4j.LoggerFactory; public replicateKeyValues_result getResult(I iface, replicateKeyValues_args args) throws org.apache.thrift.TException { replicateKeyValues_result result = new replicateKeyValues_result(); try { - result.success = iface.replicateKeyValues(args.remoteTableId, args.data); + result.success = iface.replicateKeyValues(args.remoteTableId, args.data, args.credentials); result.setSuccessIsSet(true); } catch (RemoteReplicationException e) { result.e = e; @@ -301,6 +309,7 @@ import org.slf4j.LoggerFactory; private static final org.apache.thrift.protocol.TField REMOTE_TABLE_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("remoteTableId", org.apache.thrift.protocol.TType.I32, (short)1); private static final org.apache.thrift.protocol.TField DATA_FIELD_DESC = new org.apache.thrift.protocol.TField("data", org.apache.thrift.protocol.TType.STRUCT, (short)2); + private static final org.apache.thrift.protocol.TField CREDENTIALS_FIELD_DESC = new org.apache.thrift.protocol.TField("credentials", org.apache.thrift.protocol.TType.STRUCT, (short)3); private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>(); static { @@ -310,11 +319,13 @@ import org.slf4j.LoggerFactory; public int remoteTableId; // required public WalEdits data; // required + public org.apache.accumulo.core.security.thrift.TCredentials credentials; // required /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ @SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum { REMOTE_TABLE_ID((short)1, "remoteTableId"), - DATA((short)2, "data"); + DATA((short)2, "data"), + CREDENTIALS((short)3, "credentials"); private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); @@ -333,6 +344,8 @@ import org.slf4j.LoggerFactory; return REMOTE_TABLE_ID; case 2: // DATA return DATA; + case 3: // CREDENTIALS + return CREDENTIALS; default: return null; } @@ -382,6 +395,8 @@ import org.slf4j.LoggerFactory; new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32))); tmpMap.put(_Fields.DATA, new org.apache.thrift.meta_data.FieldMetaData("data", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, WalEdits.class))); + tmpMap.put(_Fields.CREDENTIALS, new org.apache.thrift.meta_data.FieldMetaData("credentials", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, org.apache.accumulo.core.security.thrift.TCredentials.class))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(replicateLog_args.class, metaDataMap); } @@ -391,12 +406,14 @@ import org.slf4j.LoggerFactory; public replicateLog_args( int remoteTableId, - WalEdits data) + WalEdits data, + org.apache.accumulo.core.security.thrift.TCredentials credentials) { this(); this.remoteTableId = remoteTableId; setRemoteTableIdIsSet(true); this.data = data; + this.credentials = credentials; } /** @@ -408,6 +425,9 @@ import org.slf4j.LoggerFactory; if (other.isSetData()) { this.data = new WalEdits(other.data); } + if (other.isSetCredentials()) { + this.credentials = new org.apache.accumulo.core.security.thrift.TCredentials(other.credentials); + } } public replicateLog_args deepCopy() { @@ -419,6 +439,7 @@ import org.slf4j.LoggerFactory; setRemoteTableIdIsSet(false); this.remoteTableId = 0; this.data = null; + this.credentials = null; } public int getRemoteTableId() { @@ -468,6 +489,30 @@ import org.slf4j.LoggerFactory; } } + public org.apache.accumulo.core.security.thrift.TCredentials getCredentials() { + return this.credentials; + } + + public replicateLog_args setCredentials(org.apache.accumulo.core.security.thrift.TCredentials credentials) { + this.credentials = credentials; + return this; + } + + public void unsetCredentials() { + this.credentials = null; + } + + /** Returns true if field credentials is set (has been assigned a value) and false otherwise */ + public boolean isSetCredentials() { + return this.credentials != null; + } + + public void setCredentialsIsSet(boolean value) { + if (!value) { + this.credentials = null; + } + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case REMOTE_TABLE_ID: @@ -486,6 +531,14 @@ import org.slf4j.LoggerFactory; } break; + case CREDENTIALS: + if (value == null) { + unsetCredentials(); + } else { + setCredentials((org.apache.accumulo.core.security.thrift.TCredentials)value); + } + break; + } } @@ -497,6 +550,9 @@ import org.slf4j.LoggerFactory; case DATA: return getData(); + case CREDENTIALS: + return getCredentials(); + } throw new IllegalStateException(); } @@ -512,6 +568,8 @@ import org.slf4j.LoggerFactory; return isSetRemoteTableId(); case DATA: return isSetData(); + case CREDENTIALS: + return isSetCredentials(); } throw new IllegalStateException(); } @@ -547,6 +605,15 @@ import org.slf4j.LoggerFactory; return false; } + boolean this_present_credentials = true && this.isSetCredentials(); + boolean that_present_credentials = true && that.isSetCredentials(); + if (this_present_credentials || that_present_credentials) { + if (!(this_present_credentials && that_present_credentials)) + return false; + if (!this.credentials.equals(that.credentials)) + return false; + } + return true; } @@ -583,6 +650,16 @@ import org.slf4j.LoggerFactory; return lastComparison; } } + lastComparison = Boolean.valueOf(isSetCredentials()).compareTo(typedOther.isSetCredentials()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetCredentials()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.credentials, typedOther.credentials); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -614,6 +691,14 @@ import org.slf4j.LoggerFactory; sb.append(this.data); } first = false; + if (!first) sb.append(", "); + sb.append("credentials:"); + if (this.credentials == null) { + sb.append("null"); + } else { + sb.append(this.credentials); + } + first = false; sb.append(")"); return sb.toString(); } @@ -624,6 +709,9 @@ import org.slf4j.LoggerFactory; if (data != null) { data.validate(); } + if (credentials != null) { + credentials.validate(); + } } private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { @@ -679,6 +767,15 @@ import org.slf4j.LoggerFactory; org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 3: // CREDENTIALS + if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) { + struct.credentials = new org.apache.accumulo.core.security.thrift.TCredentials(); + struct.credentials.read(iprot); + struct.setCredentialsIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -702,6 +799,11 @@ import org.slf4j.LoggerFactory; struct.data.write(oprot); oprot.writeFieldEnd(); } + if (struct.credentials != null) { + oprot.writeFieldBegin(CREDENTIALS_FIELD_DESC); + struct.credentials.write(oprot); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -726,19 +828,25 @@ import org.slf4j.LoggerFactory; if (struct.isSetData()) { optionals.set(1); } - oprot.writeBitSet(optionals, 2); + if (struct.isSetCredentials()) { + optionals.set(2); + } + oprot.writeBitSet(optionals, 3); if (struct.isSetRemoteTableId()) { oprot.writeI32(struct.remoteTableId); } if (struct.isSetData()) { struct.data.write(oprot); } + if (struct.isSetCredentials()) { + struct.credentials.write(oprot); + } } @Override public void read(org.apache.thrift.protocol.TProtocol prot, replicateLog_args struct) throws org.apache.thrift.TException { TTupleProtocol iprot = (TTupleProtocol) prot; - BitSet incoming = iprot.readBitSet(2); + BitSet incoming = iprot.readBitSet(3); if (incoming.get(0)) { struct.remoteTableId = iprot.readI32(); struct.setRemoteTableIdIsSet(true); @@ -748,6 +856,11 @@ import org.slf4j.LoggerFactory; struct.data.read(iprot); struct.setDataIsSet(true); } + if (incoming.get(2)) { + struct.credentials = new org.apache.accumulo.core.security.thrift.TCredentials(); + struct.credentials.read(iprot); + struct.setCredentialsIsSet(true); + } } } @@ -1214,6 +1327,7 @@ import org.slf4j.LoggerFactory; private static final org.apache.thrift.protocol.TField REMOTE_TABLE_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("remoteTableId", org.apache.thrift.protocol.TType.I32, (short)1); private static final org.apache.thrift.protocol.TField DATA_FIELD_DESC = new org.apache.thrift.protocol.TField("data", org.apache.thrift.protocol.TType.STRUCT, (short)2); + private static final org.apache.thrift.protocol.TField CREDENTIALS_FIELD_DESC = new org.apache.thrift.protocol.TField("credentials", org.apache.thrift.protocol.TType.STRUCT, (short)3); private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>(); static { @@ -1223,11 +1337,13 @@ import org.slf4j.LoggerFactory; public int remoteTableId; // required public KeyValues data; // required + public org.apache.accumulo.core.security.thrift.TCredentials credentials; // required /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ @SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum { REMOTE_TABLE_ID((short)1, "remoteTableId"), - DATA((short)2, "data"); + DATA((short)2, "data"), + CREDENTIALS((short)3, "credentials"); private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); @@ -1246,6 +1362,8 @@ import org.slf4j.LoggerFactory; return REMOTE_TABLE_ID; case 2: // DATA return DATA; + case 3: // CREDENTIALS + return CREDENTIALS; default: return null; } @@ -1295,6 +1413,8 @@ import org.slf4j.LoggerFactory; new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32))); tmpMap.put(_Fields.DATA, new org.apache.thrift.meta_data.FieldMetaData("data", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, KeyValues.class))); + tmpMap.put(_Fields.CREDENTIALS, new org.apache.thrift.meta_data.FieldMetaData("credentials", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, org.apache.accumulo.core.security.thrift.TCredentials.class))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(replicateKeyValues_args.class, metaDataMap); } @@ -1304,12 +1424,14 @@ import org.slf4j.LoggerFactory; public replicateKeyValues_args( int remoteTableId, - KeyValues data) + KeyValues data, + org.apache.accumulo.core.security.thrift.TCredentials credentials) { this(); this.remoteTableId = remoteTableId; setRemoteTableIdIsSet(true); this.data = data; + this.credentials = credentials; } /** @@ -1321,6 +1443,9 @@ import org.slf4j.LoggerFactory; if (other.isSetData()) { this.data = new KeyValues(other.data); } + if (other.isSetCredentials()) { + this.credentials = new org.apache.accumulo.core.security.thrift.TCredentials(other.credentials); + } } public replicateKeyValues_args deepCopy() { @@ -1332,6 +1457,7 @@ import org.slf4j.LoggerFactory; setRemoteTableIdIsSet(false); this.remoteTableId = 0; this.data = null; + this.credentials = null; } public int getRemoteTableId() { @@ -1381,6 +1507,30 @@ import org.slf4j.LoggerFactory; } } + public org.apache.accumulo.core.security.thrift.TCredentials getCredentials() { + return this.credentials; + } + + public replicateKeyValues_args setCredentials(org.apache.accumulo.core.security.thrift.TCredentials credentials) { + this.credentials = credentials; + return this; + } + + public void unsetCredentials() { + this.credentials = null; + } + + /** Returns true if field credentials is set (has been assigned a value) and false otherwise */ + public boolean isSetCredentials() { + return this.credentials != null; + } + + public void setCredentialsIsSet(boolean value) { + if (!value) { + this.credentials = null; + } + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case REMOTE_TABLE_ID: @@ -1399,6 +1549,14 @@ import org.slf4j.LoggerFactory; } break; + case CREDENTIALS: + if (value == null) { + unsetCredentials(); + } else { + setCredentials((org.apache.accumulo.core.security.thrift.TCredentials)value); + } + break; + } } @@ -1410,6 +1568,9 @@ import org.slf4j.LoggerFactory; case DATA: return getData(); + case CREDENTIALS: + return getCredentials(); + } throw new IllegalStateException(); } @@ -1425,6 +1586,8 @@ import org.slf4j.LoggerFactory; return isSetRemoteTableId(); case DATA: return isSetData(); + case CREDENTIALS: + return isSetCredentials(); } throw new IllegalStateException(); } @@ -1460,6 +1623,15 @@ import org.slf4j.LoggerFactory; return false; } + boolean this_present_credentials = true && this.isSetCredentials(); + boolean that_present_credentials = true && that.isSetCredentials(); + if (this_present_credentials || that_present_credentials) { + if (!(this_present_credentials && that_present_credentials)) + return false; + if (!this.credentials.equals(that.credentials)) + return false; + } + return true; } @@ -1496,6 +1668,16 @@ import org.slf4j.LoggerFactory; return lastComparison; } } + lastComparison = Boolean.valueOf(isSetCredentials()).compareTo(typedOther.isSetCredentials()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetCredentials()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.credentials, typedOther.credentials); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -1527,6 +1709,14 @@ import org.slf4j.LoggerFactory; sb.append(this.data); } first = false; + if (!first) sb.append(", "); + sb.append("credentials:"); + if (this.credentials == null) { + sb.append("null"); + } else { + sb.append(this.credentials); + } + first = false; sb.append(")"); return sb.toString(); } @@ -1537,6 +1727,9 @@ import org.slf4j.LoggerFactory; if (data != null) { data.validate(); } + if (credentials != null) { + credentials.validate(); + } } private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { @@ -1592,6 +1785,15 @@ import org.slf4j.LoggerFactory; org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 3: // CREDENTIALS + if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) { + struct.credentials = new org.apache.accumulo.core.security.thrift.TCredentials(); + struct.credentials.read(iprot); + struct.setCredentialsIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -1615,6 +1817,11 @@ import org.slf4j.LoggerFactory; struct.data.write(oprot); oprot.writeFieldEnd(); } + if (struct.credentials != null) { + oprot.writeFieldBegin(CREDENTIALS_FIELD_DESC); + struct.credentials.write(oprot); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -1639,19 +1846,25 @@ import org.slf4j.LoggerFactory; if (struct.isSetData()) { optionals.set(1); } - oprot.writeBitSet(optionals, 2); + if (struct.isSetCredentials()) { + optionals.set(2); + } + oprot.writeBitSet(optionals, 3); if (struct.isSetRemoteTableId()) { oprot.writeI32(struct.remoteTableId); } if (struct.isSetData()) { struct.data.write(oprot); } + if (struct.isSetCredentials()) { + struct.credentials.write(oprot); + } } @Override public void read(org.apache.thrift.protocol.TProtocol prot, replicateKeyValues_args struct) throws org.apache.thrift.TException { TTupleProtocol iprot = (TTupleProtocol) prot; - BitSet incoming = iprot.readBitSet(2); + BitSet incoming = iprot.readBitSet(3); if (incoming.get(0)) { struct.remoteTableId = iprot.readI32(); struct.setRemoteTableIdIsSet(true); @@ -1661,6 +1874,11 @@ import org.slf4j.LoggerFactory; struct.data.read(iprot); struct.setDataIsSet(true); } + if (incoming.get(2)) { + struct.credentials = new org.apache.accumulo.core.security.thrift.TCredentials(); + struct.credentials.read(iprot); + struct.setCredentialsIsSet(true); + } } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3ef383d/core/src/main/thrift/replication.thrift ---------------------------------------------------------------------- diff --git a/core/src/main/thrift/replication.thrift b/core/src/main/thrift/replication.thrift index a5d1836..392e913 100644 --- a/core/src/main/thrift/replication.thrift +++ b/core/src/main/thrift/replication.thrift @@ -19,6 +19,7 @@ namespace java org.apache.accumulo.core.replication.thrift namespace cpp org.apache.accumulo.core.replication.thrift include "data.thrift" +include "security.thrift" struct WalEdits { 1:list<binary> edits @@ -28,21 +29,35 @@ struct KeyValues { 1:list<data.TKeyValue> keyValues } -exception RemoteCoordinationException { - 1:i32 code, +enum RemoteReplicationErrorCode { + COULD_NOT_DESERIALIZE + COULD_NOT_APPLY + TABLE_DOES_NOT_EXIST + CANNOT_AUTHENTICATE + CANNOT_INSTANTIATE_REPLAYER +} + +enum ReplicationCoordinatorErrorCode { + NO_AVAILABLE_SERVERS + SERVICE_CONFIGURATION_UNAVAILABLE + CANNOT_AUTHENTICATE +} + +exception ReplicationCoordinatorException { + 1:ReplicationCoordinatorErrorCode code, 2:string reason } exception RemoteReplicationException { - 1:i32 code, + 1:RemoteReplicationErrorCode code, 2:string reason } service ReplicationCoordinator { - string getServicerAddress(1:i32 remoteTableId) throws (1:RemoteCoordinationException e), + string getServicerAddress(1:i32 remoteTableId, 2:security.TCredentials credentials) throws (1:ReplicationCoordinatorException e), } service ReplicationServicer { - i64 replicateLog(1:i32 remoteTableId, 2:WalEdits data) throws (1:RemoteReplicationException e), - i64 replicateKeyValues(1:i32 remoteTableId, 2:KeyValues data) throws (1:RemoteReplicationException e) + i64 replicateLog(1:i32 remoteTableId, 2:WalEdits data, 3:security.TCredentials credentials) throws (1:RemoteReplicationException e), + i64 replicateKeyValues(1:i32 remoteTableId, 2:KeyValues data, 3:security.TCredentials credentials) throws (1:RemoteReplicationException e) } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3ef383d/server/master/src/main/java/org/apache/accumulo/master/replication/MasterReplicationCoordinator.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/MasterReplicationCoordinator.java b/server/master/src/main/java/org/apache/accumulo/master/replication/MasterReplicationCoordinator.java index 9331075..974aaa9 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/replication/MasterReplicationCoordinator.java +++ b/server/master/src/main/java/org/apache/accumulo/master/replication/MasterReplicationCoordinator.java @@ -23,13 +23,17 @@ import java.util.Set; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.replication.ReplicationCoordinatorErrorCode; -import org.apache.accumulo.core.replication.thrift.RemoteCoordinationException; +import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; import org.apache.accumulo.core.replication.thrift.ReplicationCoordinator; +import org.apache.accumulo.core.replication.thrift.ReplicationCoordinatorErrorCode; +import org.apache.accumulo.core.replication.thrift.ReplicationCoordinatorException; +import org.apache.accumulo.core.security.thrift.TCredentials; import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.fate.zookeeper.ZooReader; import org.apache.accumulo.master.Master; import org.apache.accumulo.server.master.state.TServerInstance; +import org.apache.accumulo.server.security.SecurityOperation; +import org.apache.accumulo.server.security.SystemCredentials; import org.apache.thrift.TException; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; @@ -47,6 +51,7 @@ public class MasterReplicationCoordinator implements ReplicationCoordinator.Ifac private final Instance inst; private final Random rand; private final ZooReader reader; + private final SecurityOperation security; public MasterReplicationCoordinator(Master master) { this(master, new ZooReader(master.getInstance().getZooKeepers(), master.getInstance().getZooKeepersSessionTimeOut())); @@ -57,15 +62,22 @@ public class MasterReplicationCoordinator implements ReplicationCoordinator.Ifac this.rand = new Random(358923462l); this.inst = master.getInstance(); this.reader = reader; - + this.security = SecurityOperation.getInstance(inst.getInstanceID(), false); } @Override - public String getServicerAddress(int remoteTableId) throws RemoteCoordinationException, TException { + public String getServicerAddress(int remoteTableId, TCredentials creds) throws ReplicationCoordinatorException, TException { + try { + security.authenticateUser(SystemCredentials.get().toThrift(inst), creds); + } catch (ThriftSecurityException e) { + log.error("{} failed to authenticate for replication to {}", creds.getPrincipal(), remoteTableId); + throw new ReplicationCoordinatorException(ReplicationCoordinatorErrorCode.CANNOT_AUTHENTICATE, "Could not authenticate " + creds.getPrincipal()); + } + Set<TServerInstance> tservers = master.onlineTabletServers(); if (tservers.isEmpty()) { - throw new RemoteCoordinationException(ReplicationCoordinatorErrorCode.NO_AVAILABLE_SERVERS.ordinal(), "No tservers are available for replication"); + throw new ReplicationCoordinatorException(ReplicationCoordinatorErrorCode.NO_AVAILABLE_SERVERS, "No tservers are available for replication"); } TServerInstance tserver = getRandomTServer(tservers, rand.nextInt(tservers.size())); @@ -74,7 +86,7 @@ public class MasterReplicationCoordinator implements ReplicationCoordinator.Ifac replServiceAddr = new String(reader.getData(ZooUtil.getRoot(inst) + Constants.ZREPLICATION_TSERVERS + "/" + tserver.hostPort(), null), StandardCharsets.UTF_8); } catch (KeeperException | InterruptedException e) { log.error("Could not fetch repliation service port for tserver", e); - throw new RemoteCoordinationException(ReplicationCoordinatorErrorCode.SERVICE_CONFIGURATION_UNAVAILABLE.ordinal(), + throw new ReplicationCoordinatorException(ReplicationCoordinatorErrorCode.SERVICE_CONFIGURATION_UNAVAILABLE, "Could not determine port for replication service running at " + tserver.hostPort()); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3ef383d/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java index ca1382f..c6b266f 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashSet; +import java.util.Map; import java.util.Set; import org.apache.accumulo.core.client.AccumuloException; @@ -34,6 +35,7 @@ import org.apache.accumulo.core.client.impl.ClientExecReturn; import org.apache.accumulo.core.client.impl.ReplicationClient; import org.apache.accumulo.core.client.impl.ServerConfigurationUtil; import org.apache.accumulo.core.client.replication.ReplicaSystem; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Mutation; @@ -45,6 +47,8 @@ import org.apache.accumulo.core.replication.thrift.ReplicationCoordinator; import org.apache.accumulo.core.replication.thrift.ReplicationServicer; import org.apache.accumulo.core.replication.thrift.ReplicationServicer.Client; import org.apache.accumulo.core.replication.thrift.WalEdits; +import org.apache.accumulo.core.security.Credentials; +import org.apache.accumulo.core.security.thrift.TCredentials; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.server.client.HdfsZooInstance; import org.apache.accumulo.server.conf.ServerConfiguration; @@ -137,8 +141,10 @@ public class AccumuloReplicaSystem implements ReplicaSystem { @Override public Status replicate(final Path p, final Status status, final ReplicationTarget target) { - Instance localInstance = HdfsZooInstance.getInstance(); - AccumuloConfiguration localConf = ServerConfigurationUtil.getConfiguration(localInstance); + final Instance localInstance = HdfsZooInstance.getInstance(); + final AccumuloConfiguration localConf = ServerConfigurationUtil.getConfiguration(localInstance); + Credentials credentialsForPeer = getCredentialsForPeer(localConf, target); + final TCredentials tCredsForPeer = credentialsForPeer.toThrift(localInstance); Instance peerInstance = getPeerInstance(target); // Remote identifier is an integer (table id) in this case. @@ -154,7 +160,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem { @Override public String execute(ReplicationCoordinator.Client client) throws Exception { - return client.getServicerAddress(remoteTableId); + return client.getServicerAddress(remoteTableId, tCredsForPeer); } }); @@ -184,7 +190,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem { if (p.getName().endsWith(RFILE_SUFFIX)) { RFileReplication kvs = getKeyValues(target, p, status, sizeLimit); if (0 < kvs.keyValues.getKeyValuesSize()) { - long entriesReplicated = client.replicateKeyValues(remoteTableId, kvs.keyValues); + long entriesReplicated = client.replicateKeyValues(remoteTableId, kvs.keyValues, tCredsForPeer); if (entriesReplicated != kvs.keyValues.getKeyValuesSize()) { log.warn("Sent {} KeyValue entries for replication but only {} were reported as replicated", kvs.keyValues.getKeyValuesSize(), entriesReplicated); @@ -198,7 +204,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem { // If we have some edits to send if (0 < edits.walEdits.getEditsSize()) { - long entriesReplicated = client.replicateLog(remoteTableId, edits.walEdits); + long entriesReplicated = client.replicateLog(remoteTableId, edits.walEdits, tCredsForPeer); if (entriesReplicated != edits.numUpdates) { log.warn("Sent {} WAL entries for replication but {} were reported as replicated", edits.numUpdates, entriesReplicated); } @@ -241,6 +247,24 @@ public class AccumuloReplicaSystem implements ReplicaSystem { return status; } + protected Credentials getCredentialsForPeer(AccumuloConfiguration conf, ReplicationTarget target) { + Preconditions.checkNotNull(conf); + Preconditions.checkNotNull(target); + + String peerName = target.getPeerName(); + String userKey = Property.REPLICATION_PEER_USER.getKey() + peerName, passwordKey = Property.REPLICATION_PEER_PASSWORD.getKey() + peerName; + Map<String,String> peerUsers = conf.getAllPropertiesWithPrefix(Property.REPLICATION_PEER_USER); + Map<String,String> peerPasswords = conf.getAllPropertiesWithPrefix(Property.REPLICATION_PEER_PASSWORD); + + String user = peerUsers.get(userKey); + String password = peerPasswords.get(passwordKey); + if (null == user || null == password) { + throw new IllegalArgumentException(userKey + " and " + passwordKey + " not configured, cannot replicate"); + } + + return new Credentials(user, new PasswordToken(password)); + } + protected Instance getPeerInstance(ReplicationTarget target) { return new ZooKeeperInstance(instanceName, zookeepers); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3ef383d/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java index ea50199..8b1a402 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java @@ -29,8 +29,8 @@ import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.replication.AccumuloReplicationReplayer; -import org.apache.accumulo.core.replication.RemoteReplicationErrorCode; import org.apache.accumulo.core.replication.thrift.KeyValues; +import org.apache.accumulo.core.replication.thrift.RemoteReplicationErrorCode; import org.apache.accumulo.core.replication.thrift.RemoteReplicationException; import org.apache.accumulo.core.replication.thrift.WalEdits; import org.apache.accumulo.server.client.HdfsZooInstance; @@ -64,7 +64,7 @@ public class BatchWriterReplicationReplayer implements AccumuloReplicationReplay value.readFields(dis); } catch (IOException e) { log.error("Could not deserialize edit from stream", e); - throw new RemoteReplicationException(RemoteReplicationErrorCode.COULD_NOT_DESERIALIZE.ordinal(), "Could not deserialize edit from stream"); + throw new RemoteReplicationException(RemoteReplicationErrorCode.COULD_NOT_DESERIALIZE, "Could not deserialize edit from stream"); } // Create the batchScanner if we don't already have one. @@ -74,7 +74,7 @@ public class BatchWriterReplicationReplayer implements AccumuloReplicationReplay try { bw = conn.createBatchWriter(tableName, bwConfig); } catch (TableNotFoundException e) { - throw new RemoteReplicationException(RemoteReplicationErrorCode.TABLE_DOES_NOT_EXIST.ordinal(), "Table " + tableName + " does not exist"); + throw new RemoteReplicationException(RemoteReplicationErrorCode.TABLE_DOES_NOT_EXIST, "Table " + tableName + " does not exist"); } } @@ -84,7 +84,7 @@ public class BatchWriterReplicationReplayer implements AccumuloReplicationReplay bw.addMutations(value.mutations); } catch (MutationsRejectedException e) { log.error("Could not apply mutations to {}", tableName); - throw new RemoteReplicationException(RemoteReplicationErrorCode.COULD_NOT_APPLY.ordinal(), "Could not apply mutations to " + tableName); + throw new RemoteReplicationException(RemoteReplicationErrorCode.COULD_NOT_APPLY, "Could not apply mutations to " + tableName); } mutationsApplied += value.mutations.size(); @@ -95,7 +95,7 @@ public class BatchWriterReplicationReplayer implements AccumuloReplicationReplay bw.close(); } catch (MutationsRejectedException e) { log.error("Could not apply mutations to {}", tableName); - throw new RemoteReplicationException(RemoteReplicationErrorCode.COULD_NOT_APPLY.ordinal(), "Could not apply mutations to " + tableName); + throw new RemoteReplicationException(RemoteReplicationErrorCode.COULD_NOT_APPLY, "Could not apply mutations to " + tableName); } } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3ef383d/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java index 820c586..3a9bf9b 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java @@ -28,13 +28,13 @@ import org.apache.accumulo.core.client.impl.Tables; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.replication.AccumuloReplicationReplayer; -import org.apache.accumulo.core.replication.RemoteReplicationErrorCode; import org.apache.accumulo.core.replication.thrift.KeyValues; +import org.apache.accumulo.core.replication.thrift.RemoteReplicationErrorCode; import org.apache.accumulo.core.replication.thrift.RemoteReplicationException; import org.apache.accumulo.core.replication.thrift.ReplicationServicer.Iface; import org.apache.accumulo.core.replication.thrift.WalEdits; import org.apache.accumulo.core.security.Credentials; -import org.apache.accumulo.server.security.SystemCredentials; +import org.apache.accumulo.core.security.thrift.TCredentials; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,11 +52,11 @@ public class ReplicationServicerHandler implements Iface { } @Override - public long replicateLog(int remoteTableId, WalEdits data) throws RemoteReplicationException, TException { + public long replicateLog(int remoteTableId, WalEdits data, TCredentials tcreds) throws RemoteReplicationException, TException { log.debug("Got replication request to tableID {} with {} edits", remoteTableId, data.getEditsSize()); String tableId = Integer.toString(remoteTableId); - Credentials creds = SystemCredentials.get(); + Credentials creds = Credentials.fromThrift(tcreds); Connector conn; String tableName; @@ -64,14 +64,14 @@ public class ReplicationServicerHandler implements Iface { conn = inst.getConnector(creds.getPrincipal(), creds.getToken()); } catch (AccumuloException | AccumuloSecurityException e) { log.error("Could not get connection", e); - throw new RemoteReplicationException(RemoteReplicationErrorCode.CANNOT_AUTHENTICATE.ordinal(), "Cannot get connector"); + throw new RemoteReplicationException(RemoteReplicationErrorCode.CANNOT_AUTHENTICATE, "Cannot get connector as " + creds.getPrincipal()); } try { tableName = Tables.getTableName(inst, tableId); } catch (TableNotFoundException e) { log.error("Could not find table with id {}", tableId); - throw new RemoteReplicationException(RemoteReplicationErrorCode.TABLE_DOES_NOT_EXIST.ordinal(), "Table with id " + tableId + " does not exist"); + throw new RemoteReplicationException(RemoteReplicationErrorCode.TABLE_DOES_NOT_EXIST, "Table with id " + tableId + " does not exist"); } AccumuloConfiguration conf = ServerConfigurationUtil.getConfiguration(inst); @@ -96,7 +96,7 @@ public class ReplicationServicerHandler implements Iface { clz = untypedClz.asSubclass(AccumuloReplicationReplayer.class); } catch (ClassNotFoundException e) { log.error("Could not instantiate replayer class {}", handlerClassForTable, e); - throw new RemoteReplicationException(RemoteReplicationErrorCode.CANNOT_INSTANTIATE_REPLAYER.ordinal(), "Could not instantiate replayer class " + throw new RemoteReplicationException(RemoteReplicationErrorCode.CANNOT_INSTANTIATE_REPLAYER, "Could not instantiate replayer class " + handlerClassForTable); } @@ -106,7 +106,7 @@ public class ReplicationServicerHandler implements Iface { replayer = clz.newInstance(); } catch (InstantiationException | IllegalAccessException e1) { log.error("Could not instantiate replayer class {}", clz.getName()); - throw new RemoteReplicationException(RemoteReplicationErrorCode.CANNOT_INSTANTIATE_REPLAYER.ordinal(), "Could not instantiate replayer class" + throw new RemoteReplicationException(RemoteReplicationErrorCode.CANNOT_INSTANTIATE_REPLAYER, "Could not instantiate replayer class" + clz.getName()); } @@ -116,7 +116,7 @@ public class ReplicationServicerHandler implements Iface { } @Override - public long replicateKeyValues(int remoteTableId, KeyValues data) throws RemoteReplicationException, TException { + public long replicateKeyValues(int remoteTableId, KeyValues data, TCredentials creds) throws RemoteReplicationException, TException { throw new UnsupportedOperationException(); }