ACCUMULO-2583 Get compilation working with slf4j dependencies. Lift the retry 
count into a Property.

Left a todo for myself as well.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/c7b6062c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/c7b6062c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/c7b6062c

Branch: refs/heads/ACCUMULO-378
Commit: c7b6062c315ecde8321f83e50afa757003c24a97
Parents: baa7a4f
Author: Josh Elser <els...@apache.org>
Authored: Wed May 7 00:55:15 2014 -0400
Committer: Josh Elser <els...@apache.org>
Committed: Wed May 7 00:55:15 2014 -0400

----------------------------------------------------------------------
 .../org/apache/accumulo/core/conf/Property.java |   2 +
 .../core/replication/AccumuloReplicaSystem.java | 124 -----------------
 server/base/pom.xml                             |  18 ++-
 .../replication/AccumuloReplicaSystem.java      | 137 +++++++++++++++++++
 server/tserver/pom.xml                          |  18 ++-
 5 files changed, 155 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/c7b6062c/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index f31cd46..6d3b00c 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -462,6 +462,8 @@ public enum Property {
   REPLICATION_WORKER_THREADS("replication.worker.threads", "4", 
PropertyType.COUNT, "Size of the threadpool that each tabletserver devotes to 
replicating data"),
   @Experimental
   REPLICATION_RECEIPT_SERVICE_PORT("replication.receipt.service.port", 
"10001", PropertyType.PORT, "Listen port used by thrift service in tserver 
listening for replication"),
+  @Experimental
+  REPLICATION_WORK_ATTEMPTS("replication.work.attempts", "10", 
PropertyType.COUNT, "Number of attempts to try to replicate some data before 
giving up and letting it naturally be retried later"),
 
   ;
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c7b6062c/core/src/test/java/org/apache/accumulo/core/replication/AccumuloReplicaSystem.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/accumulo/core/replication/AccumuloReplicaSystem.java
 
b/core/src/test/java/org/apache/accumulo/core/replication/AccumuloReplicaSystem.java
deleted file mode 100644
index ca66b93..0000000
--- 
a/core/src/test/java/org/apache/accumulo/core/replication/AccumuloReplicaSystem.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.replication;
-
-import java.nio.ByteBuffer;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
-import org.apache.accumulo.core.client.impl.ClientExecReturn;
-import org.apache.accumulo.core.client.impl.ReplicationClient;
-import org.apache.accumulo.core.client.replication.ReplicaSystem;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
-import org.apache.accumulo.core.replication.thrift.ReplicationCoordinator;
-import org.apache.accumulo.core.replication.thrift.ReplicationServicer;
-import org.apache.accumulo.core.replication.thrift.ReplicationServicer.Client;
-import org.apache.accumulo.core.util.ByteBufferUtil;
-import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.hadoop.fs.Path;
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.protobuf.InvalidProtocolBufferException;
-
-/**
- * 
- */
-public class AccumuloReplicaSystem implements ReplicaSystem {
-  private static final Logger log = 
LoggerFactory.getLogger(AccumuloReplicaSystem.class);
-
-  private String instanceName, zookeepers;
-
-  @Override
-  public void configure(String configuration) {
-    Preconditions.checkNotNull(configuration);
-
-    int index = configuration.indexOf(',');
-    if (-1 == index) {
-      throw new IllegalArgumentException("Expected comma in configuration 
string");
-    }
-
-    instanceName = configuration.substring(0, index);
-    zookeepers = configuration.substring(index + 1);
-  }
-
-  @Override
-  public Status replicate(Path p, Status status, ReplicationTarget target) {
-    Instance peerInstance = getPeerInstance(target);
-    // Remote identifier is an integer (table id) in this case.
-    final int remoteTableId = Integer.parseInt(target.getRemoteIdentifier());
-
-    for (int i = 0; i < 10; i++) {
-      String peerTserver;
-      try {
-        // Ask the master on the remote what TServer we should talk with to 
replicate the data
-        peerTserver = 
ReplicationClient.executeCoordinatorWithReturn(peerInstance, new 
ClientExecReturn<String,ReplicationCoordinator.Client>() {
-  
-          @Override
-          public String execute(ReplicationCoordinator.Client client) throws 
Exception {
-            return client.getServicerAddress(remoteTableId);
-          }
-          
-        });
-      } catch (AccumuloException | AccumuloSecurityException e) {
-        // No progress is made
-        log.error("Could not connect to master at {}, cannot proceed with 
replication. Will retry", target, e);
-        continue;
-      }
-  
-      if (null == peerTserver) {
-        // Something went wrong, and we didn't get a valid tserver from the 
remote for some reason
-        log.warn("Did not receive tserver from master at {}, cannot proceed 
with replication. Will retry.", target);
-        continue;
-      }
-  
-      // We have a tserver on the remote -- send the data its way.
-      ByteBuffer result;
-      try {
-        result = ReplicationClient.executeServicerWithReturn(peerInstance, 
peerTserver, new ClientExecReturn<ByteBuffer,ReplicationServicer.Client>() {
-          @Override
-          public ByteBuffer execute(Client client) throws Exception {
-            return client.replicateLog(remoteTableId, null);
-          }
-        });
-
-        // We need to be able to parse the returned Status,
-        // if we can't, we don't know what the server actually parsed.
-        try {
-          return Status.parseFrom(ByteBufferUtil.toBytes(result));
-        } catch (InvalidProtocolBufferException e) {
-          log.error("Could not parse return Status from {}", peerTserver, e);
-          throw new RuntimeException("Could not parse returned Status from " + 
peerTserver, e);
-        }
-      } catch (TTransportException | AccumuloException | 
AccumuloSecurityException e) {
-        log.warn("Could not connect to remote server {}, will retry", 
peerTserver, e);
-        UtilWaitThread.sleep(250);
-      }
-    }
-
-    // We made no status, punt on it for now, and let it re-queue itself for 
work
-    return status;
-  }
-
-  public Instance getPeerInstance(ReplicationTarget target) {
-    return new ZooKeeperInstance(instanceName, zookeepers);
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c7b6062c/server/base/pom.xml
----------------------------------------------------------------------
diff --git a/server/base/pom.xml b/server/base/pom.xml
index f698621..672fe91 100644
--- a/server/base/pom.xml
+++ b/server/base/pom.xml
@@ -92,23 +92,21 @@
       <artifactId>zookeeper</artifactId>
     </dependency>
     <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <scope>test</scope>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
     </dependency>
     <dependency>
-      <groupId>org.easymock</groupId>
-      <artifactId>easymock</artifactId>
-      <scope>test</scope>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
     </dependency>
     <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-log4j12</artifactId>
+      <groupId>org.easymock</groupId>
+      <artifactId>easymock</artifactId>
       <scope>test</scope>
     </dependency>
   </dependencies>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c7b6062c/server/base/src/main/java/org/apache/accumulo/server/replication/AccumuloReplicaSystem.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/replication/AccumuloReplicaSystem.java
 
b/server/base/src/main/java/org/apache/accumulo/server/replication/AccumuloReplicaSystem.java
new file mode 100644
index 0000000..138eb71
--- /dev/null
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/replication/AccumuloReplicaSystem.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.replication;
+
+import java.nio.ByteBuffer;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.impl.ClientExecReturn;
+import org.apache.accumulo.core.client.impl.ReplicationClient;
+import org.apache.accumulo.core.client.impl.ServerConfigurationUtil;
+import org.apache.accumulo.core.client.replication.ReplicaSystem;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.replication.ReplicationTarget;
+import org.apache.accumulo.core.replication.proto.Replication.Status;
+import org.apache.accumulo.core.replication.thrift.ReplicationCoordinator;
+import org.apache.accumulo.core.replication.thrift.ReplicationServicer;
+import org.apache.accumulo.core.replication.thrift.ReplicationServicer.Client;
+import org.apache.accumulo.core.util.ByteBufferUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.hadoop.fs.Path;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * 
+ */
+public class AccumuloReplicaSystem implements ReplicaSystem {
+  private static final Logger log = 
LoggerFactory.getLogger(AccumuloReplicaSystem.class);
+
+  private String instanceName, zookeepers;
+
+  @Override
+  public void configure(String configuration) {
+    Preconditions.checkNotNull(configuration);
+
+    int index = configuration.indexOf(',');
+    if (-1 == index) {
+      throw new IllegalArgumentException("Expected comma in configuration 
string");
+    }
+
+    instanceName = configuration.substring(0, index);
+    zookeepers = configuration.substring(index + 1);
+  }
+
+  @Override
+  public Status replicate(Path p, Status status, ReplicationTarget target) {
+    Instance localInstance = HdfsZooInstance.getInstance();
+    AccumuloConfiguration localConf = 
ServerConfigurationUtil.getConfiguration(localInstance);
+    
+    Instance peerInstance = getPeerInstance(target);
+    // Remote identifier is an integer (table id) in this case.
+    final int remoteTableId = Integer.parseInt(target.getRemoteIdentifier());
+
+    // Attempt the replication of this status a number of times before giving 
up and
+    // trying to replicate it again later some other time.
+    for (int i = 0; i < 
localConf.getCount(Property.REPLICATION_WORK_ATTEMPTS); i++) {
+      String peerTserver;
+      try {
+        // Ask the master on the remote what TServer we should talk with to 
replicate the data
+        peerTserver = 
ReplicationClient.executeCoordinatorWithReturn(peerInstance, new 
ClientExecReturn<String,ReplicationCoordinator.Client>() {
+  
+          @Override
+          public String execute(ReplicationCoordinator.Client client) throws 
Exception {
+            return client.getServicerAddress(remoteTableId);
+          }
+          
+        });
+      } catch (AccumuloException | AccumuloSecurityException e) {
+        // No progress is made
+        log.error("Could not connect to master at {}, cannot proceed with 
replication. Will retry", target, e);
+        continue;
+      }
+  
+      if (null == peerTserver) {
+        // Something went wrong, and we didn't get a valid tserver from the 
remote for some reason
+        log.warn("Did not receive tserver from master at {}, cannot proceed 
with replication. Will retry.", target);
+        continue;
+      }
+  
+      // We have a tserver on the remote -- send the data its way.
+      ByteBuffer result;
+      //TODO should chunk up the given file into some configurable sizes 
instead of just sending the entire file all at once
+      //     configuration should probably just be size based.
+      try {
+        result = ReplicationClient.executeServicerWithReturn(peerInstance, 
peerTserver, new ClientExecReturn<ByteBuffer,ReplicationServicer.Client>() {
+          @Override
+          public ByteBuffer execute(Client client) throws Exception {
+            //TODO This needs to actually send the appropriate data, and 
choose replicateLog or replicateKeyValues
+            return client.replicateLog(remoteTableId, null);
+          }
+        });
+
+        // We need to be able to parse the returned Status,
+        // if we can't, we don't know what the server actually parsed.
+        try {
+          return Status.parseFrom(ByteBufferUtil.toBytes(result));
+        } catch (InvalidProtocolBufferException e) {
+          log.error("Could not parse return Status from {}", peerTserver, e);
+          throw new RuntimeException("Could not parse returned Status from " + 
peerTserver, e);
+        }
+      } catch (TTransportException | AccumuloException | 
AccumuloSecurityException e) {
+        log.warn("Could not connect to remote server {}, will retry", 
peerTserver, e);
+        UtilWaitThread.sleep(250);
+      }
+    }
+
+    // We made no status, punt on it for now, and let it re-queue itself for 
work
+    return status;
+  }
+
+  public Instance getPeerInstance(ReplicationTarget target) {
+    return new ZooKeeperInstance(instanceName, zookeepers);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c7b6062c/server/tserver/pom.xml
----------------------------------------------------------------------
diff --git a/server/tserver/pom.xml b/server/tserver/pom.xml
index 100bc2d..e09f348 100644
--- a/server/tserver/pom.xml
+++ b/server/tserver/pom.xml
@@ -88,23 +88,21 @@
       <artifactId>zookeeper</artifactId>
     </dependency>
     <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <scope>test</scope>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
     </dependency>
     <dependency>
-      <groupId>org.easymock</groupId>
-      <artifactId>easymock</artifactId>
-      <scope>test</scope>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
     </dependency>
     <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-log4j12</artifactId>
+      <groupId>org.easymock</groupId>
+      <artifactId>easymock</artifactId>
       <scope>test</scope>
     </dependency>
   </dependencies>

Reply via email to