ACCUMULO-2583 Get compilation working with slf4j dependencies. Lift the retry count into a Property.
Left a todo for myself as well. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/c7b6062c Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/c7b6062c Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/c7b6062c Branch: refs/heads/ACCUMULO-378 Commit: c7b6062c315ecde8321f83e50afa757003c24a97 Parents: baa7a4f Author: Josh Elser <els...@apache.org> Authored: Wed May 7 00:55:15 2014 -0400 Committer: Josh Elser <els...@apache.org> Committed: Wed May 7 00:55:15 2014 -0400 ---------------------------------------------------------------------- .../org/apache/accumulo/core/conf/Property.java | 2 + .../core/replication/AccumuloReplicaSystem.java | 124 ----------------- server/base/pom.xml | 18 ++- .../replication/AccumuloReplicaSystem.java | 137 +++++++++++++++++++ server/tserver/pom.xml | 18 ++- 5 files changed, 155 insertions(+), 144 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/c7b6062c/core/src/main/java/org/apache/accumulo/core/conf/Property.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index f31cd46..6d3b00c 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -462,6 +462,8 @@ public enum Property { REPLICATION_WORKER_THREADS("replication.worker.threads", "4", PropertyType.COUNT, "Size of the threadpool that each tabletserver devotes to replicating data"), @Experimental REPLICATION_RECEIPT_SERVICE_PORT("replication.receipt.service.port", "10001", PropertyType.PORT, "Listen port used by thrift service in tserver listening for replication"), + @Experimental + REPLICATION_WORK_ATTEMPTS("replication.work.attempts", "10", PropertyType.COUNT, "Number of attempts to try to replicate some data before giving up and letting it naturally be retried later"), ; http://git-wip-us.apache.org/repos/asf/accumulo/blob/c7b6062c/core/src/test/java/org/apache/accumulo/core/replication/AccumuloReplicaSystem.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/replication/AccumuloReplicaSystem.java b/core/src/test/java/org/apache/accumulo/core/replication/AccumuloReplicaSystem.java deleted file mode 100644 index ca66b93..0000000 --- a/core/src/test/java/org/apache/accumulo/core/replication/AccumuloReplicaSystem.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.core.replication; - -import java.nio.ByteBuffer; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.ZooKeeperInstance; -import org.apache.accumulo.core.client.impl.ClientExecReturn; -import org.apache.accumulo.core.client.impl.ReplicationClient; -import org.apache.accumulo.core.client.replication.ReplicaSystem; -import org.apache.accumulo.core.replication.proto.Replication.Status; -import org.apache.accumulo.core.replication.thrift.ReplicationCoordinator; -import org.apache.accumulo.core.replication.thrift.ReplicationServicer; -import org.apache.accumulo.core.replication.thrift.ReplicationServicer.Client; -import org.apache.accumulo.core.util.ByteBufferUtil; -import org.apache.accumulo.core.util.UtilWaitThread; -import org.apache.hadoop.fs.Path; -import org.apache.thrift.transport.TTransportException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Preconditions; -import com.google.protobuf.InvalidProtocolBufferException; - -/** - * - */ -public class AccumuloReplicaSystem implements ReplicaSystem { - private static final Logger log = LoggerFactory.getLogger(AccumuloReplicaSystem.class); - - private String instanceName, zookeepers; - - @Override - public void configure(String configuration) { - Preconditions.checkNotNull(configuration); - - int index = configuration.indexOf(','); - if (-1 == index) { - throw new IllegalArgumentException("Expected comma in configuration string"); - } - - instanceName = configuration.substring(0, index); - zookeepers = configuration.substring(index + 1); - } - - @Override - public Status replicate(Path p, Status status, ReplicationTarget target) { - Instance peerInstance = getPeerInstance(target); - // Remote identifier is an integer (table id) in this case. - final int remoteTableId = Integer.parseInt(target.getRemoteIdentifier()); - - for (int i = 0; i < 10; i++) { - String peerTserver; - try { - // Ask the master on the remote what TServer we should talk with to replicate the data - peerTserver = ReplicationClient.executeCoordinatorWithReturn(peerInstance, new ClientExecReturn<String,ReplicationCoordinator.Client>() { - - @Override - public String execute(ReplicationCoordinator.Client client) throws Exception { - return client.getServicerAddress(remoteTableId); - } - - }); - } catch (AccumuloException | AccumuloSecurityException e) { - // No progress is made - log.error("Could not connect to master at {}, cannot proceed with replication. Will retry", target, e); - continue; - } - - if (null == peerTserver) { - // Something went wrong, and we didn't get a valid tserver from the remote for some reason - log.warn("Did not receive tserver from master at {}, cannot proceed with replication. Will retry.", target); - continue; - } - - // We have a tserver on the remote -- send the data its way. - ByteBuffer result; - try { - result = ReplicationClient.executeServicerWithReturn(peerInstance, peerTserver, new ClientExecReturn<ByteBuffer,ReplicationServicer.Client>() { - @Override - public ByteBuffer execute(Client client) throws Exception { - return client.replicateLog(remoteTableId, null); - } - }); - - // We need to be able to parse the returned Status, - // if we can't, we don't know what the server actually parsed. - try { - return Status.parseFrom(ByteBufferUtil.toBytes(result)); - } catch (InvalidProtocolBufferException e) { - log.error("Could not parse return Status from {}", peerTserver, e); - throw new RuntimeException("Could not parse returned Status from " + peerTserver, e); - } - } catch (TTransportException | AccumuloException | AccumuloSecurityException e) { - log.warn("Could not connect to remote server {}, will retry", peerTserver, e); - UtilWaitThread.sleep(250); - } - } - - // We made no status, punt on it for now, and let it re-queue itself for work - return status; - } - - public Instance getPeerInstance(ReplicationTarget target) { - return new ZooKeeperInstance(instanceName, zookeepers); - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/c7b6062c/server/base/pom.xml ---------------------------------------------------------------------- diff --git a/server/base/pom.xml b/server/base/pom.xml index f698621..672fe91 100644 --- a/server/base/pom.xml +++ b/server/base/pom.xml @@ -92,23 +92,21 @@ <artifactId>zookeeper</artifactId> </dependency> <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> </dependency> <dependency> - <groupId>org.easymock</groupId> - <artifactId>easymock</artifactId> - <scope>test</scope> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> </dependency> <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> + <groupId>junit</groupId> + <artifactId>junit</artifactId> <scope>test</scope> </dependency> <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> + <groupId>org.easymock</groupId> + <artifactId>easymock</artifactId> <scope>test</scope> </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/accumulo/blob/c7b6062c/server/base/src/main/java/org/apache/accumulo/server/replication/AccumuloReplicaSystem.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/replication/AccumuloReplicaSystem.java b/server/base/src/main/java/org/apache/accumulo/server/replication/AccumuloReplicaSystem.java new file mode 100644 index 0000000..138eb71 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/replication/AccumuloReplicaSystem.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.replication; + +import java.nio.ByteBuffer; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.impl.ClientExecReturn; +import org.apache.accumulo.core.client.impl.ReplicationClient; +import org.apache.accumulo.core.client.impl.ServerConfigurationUtil; +import org.apache.accumulo.core.client.replication.ReplicaSystem; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.replication.ReplicationTarget; +import org.apache.accumulo.core.replication.proto.Replication.Status; +import org.apache.accumulo.core.replication.thrift.ReplicationCoordinator; +import org.apache.accumulo.core.replication.thrift.ReplicationServicer; +import org.apache.accumulo.core.replication.thrift.ReplicationServicer.Client; +import org.apache.accumulo.core.util.ByteBufferUtil; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.hadoop.fs.Path; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.protobuf.InvalidProtocolBufferException; + +/** + * + */ +public class AccumuloReplicaSystem implements ReplicaSystem { + private static final Logger log = LoggerFactory.getLogger(AccumuloReplicaSystem.class); + + private String instanceName, zookeepers; + + @Override + public void configure(String configuration) { + Preconditions.checkNotNull(configuration); + + int index = configuration.indexOf(','); + if (-1 == index) { + throw new IllegalArgumentException("Expected comma in configuration string"); + } + + instanceName = configuration.substring(0, index); + zookeepers = configuration.substring(index + 1); + } + + @Override + public Status replicate(Path p, Status status, ReplicationTarget target) { + Instance localInstance = HdfsZooInstance.getInstance(); + AccumuloConfiguration localConf = ServerConfigurationUtil.getConfiguration(localInstance); + + Instance peerInstance = getPeerInstance(target); + // Remote identifier is an integer (table id) in this case. + final int remoteTableId = Integer.parseInt(target.getRemoteIdentifier()); + + // Attempt the replication of this status a number of times before giving up and + // trying to replicate it again later some other time. + for (int i = 0; i < localConf.getCount(Property.REPLICATION_WORK_ATTEMPTS); i++) { + String peerTserver; + try { + // Ask the master on the remote what TServer we should talk with to replicate the data + peerTserver = ReplicationClient.executeCoordinatorWithReturn(peerInstance, new ClientExecReturn<String,ReplicationCoordinator.Client>() { + + @Override + public String execute(ReplicationCoordinator.Client client) throws Exception { + return client.getServicerAddress(remoteTableId); + } + + }); + } catch (AccumuloException | AccumuloSecurityException e) { + // No progress is made + log.error("Could not connect to master at {}, cannot proceed with replication. Will retry", target, e); + continue; + } + + if (null == peerTserver) { + // Something went wrong, and we didn't get a valid tserver from the remote for some reason + log.warn("Did not receive tserver from master at {}, cannot proceed with replication. Will retry.", target); + continue; + } + + // We have a tserver on the remote -- send the data its way. + ByteBuffer result; + //TODO should chunk up the given file into some configurable sizes instead of just sending the entire file all at once + // configuration should probably just be size based. + try { + result = ReplicationClient.executeServicerWithReturn(peerInstance, peerTserver, new ClientExecReturn<ByteBuffer,ReplicationServicer.Client>() { + @Override + public ByteBuffer execute(Client client) throws Exception { + //TODO This needs to actually send the appropriate data, and choose replicateLog or replicateKeyValues + return client.replicateLog(remoteTableId, null); + } + }); + + // We need to be able to parse the returned Status, + // if we can't, we don't know what the server actually parsed. + try { + return Status.parseFrom(ByteBufferUtil.toBytes(result)); + } catch (InvalidProtocolBufferException e) { + log.error("Could not parse return Status from {}", peerTserver, e); + throw new RuntimeException("Could not parse returned Status from " + peerTserver, e); + } + } catch (TTransportException | AccumuloException | AccumuloSecurityException e) { + log.warn("Could not connect to remote server {}, will retry", peerTserver, e); + UtilWaitThread.sleep(250); + } + } + + // We made no status, punt on it for now, and let it re-queue itself for work + return status; + } + + public Instance getPeerInstance(ReplicationTarget target) { + return new ZooKeeperInstance(instanceName, zookeepers); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/c7b6062c/server/tserver/pom.xml ---------------------------------------------------------------------- diff --git a/server/tserver/pom.xml b/server/tserver/pom.xml index 100bc2d..e09f348 100644 --- a/server/tserver/pom.xml +++ b/server/tserver/pom.xml @@ -88,23 +88,21 @@ <artifactId>zookeeper</artifactId> </dependency> <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> </dependency> <dependency> - <groupId>org.easymock</groupId> - <artifactId>easymock</artifactId> - <scope>test</scope> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> </dependency> <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> + <groupId>junit</groupId> + <artifactId>junit</artifactId> <scope>test</scope> </dependency> <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> + <groupId>org.easymock</groupId> + <artifactId>easymock</artifactId> <scope>test</scope> </dependency> </dependencies>