ACCUMULO-2583 Implement a basic application of replicated data to the remote table.
Very little consideration given to error handling, but it works as a first step. Does not record progress on the remote side yet. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/e117a78c Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/e117a78c Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/e117a78c Branch: refs/heads/ACCUMULO-378 Commit: e117a78cb9b6d4324bf14dfdaecb04a86dc92cda Parents: e84879c Author: Josh Elser <els...@apache.org> Authored: Thu May 8 22:58:07 2014 -0400 Committer: Josh Elser <els...@apache.org> Committed: Thu May 8 22:58:07 2014 -0400 ---------------------------------------------------------------------- .../org/apache/accumulo/core/conf/Property.java | 2 +- .../replication/RemoteReplicationErrorCode.java | 27 +++++ .../apache/accumulo/tserver/TabletServer.java | 2 +- .../replication/ReplicationServicerHandler.java | 103 ++++++++++++++++++- .../test/replication/ReplicationIT.java | 61 +++++++---- 5 files changed, 170 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/e117a78c/core/src/main/java/org/apache/accumulo/core/conf/Property.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index a87b1b4..38e8cc3 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -461,7 +461,7 @@ public enum Property { @Experimental REPLICATION_WORK_ASSIGNMENT_SLEEP("replication.work.assignment.sleep", "30s", PropertyType.TIMEDURATION, "Amount of time to sleep between replication work assignment"), @Experimental - REPLICATION_WORKER_THREADS("replication.worker.threads", "4", PropertyType.COUNT, "Size of the threadpool that each tabletserver devotes to replicating data"), + REPLICATION_WORKER_THREADS("replication.worker.threads", "1", PropertyType.COUNT, "Size of the threadpool that each tabletserver devotes to replicating data"), @Experimental REPLICATION_RECEIPT_SERVICE_PORT("replication.receipt.service.port", "10002", PropertyType.PORT, "Listen port used by thrift service in tserver listening for replication"), @Experimental http://git-wip-us.apache.org/repos/asf/accumulo/blob/e117a78c/core/src/main/java/org/apache/accumulo/core/replication/RemoteReplicationErrorCode.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/replication/RemoteReplicationErrorCode.java b/core/src/main/java/org/apache/accumulo/core/replication/RemoteReplicationErrorCode.java new file mode 100644 index 0000000..b089307 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/replication/RemoteReplicationErrorCode.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.replication; + +/** + * + */ +public enum RemoteReplicationErrorCode { + COULD_NOT_DESERIALIZE, + COULD_NOT_APPLY, + TABLE_DOES_NOT_EXIST, + CANNOT_AUTHENTICATE +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/e117a78c/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 0b5c6bf..f04bf6b 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -3122,7 +3122,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu } private HostAndPort startReplicationService() throws UnknownHostException { - ReplicationServicer.Iface repl = TraceWrap.service(new ReplicationServicerHandler(this)); + ReplicationServicer.Iface repl = TraceWrap.service(new ReplicationServicerHandler(HdfsZooInstance.getInstance(), getSystemConfiguration())); ReplicationServicer.Processor<ReplicationServicer.Iface> processor = new ReplicationServicer.Processor<ReplicationServicer.Iface>(repl); AccumuloConfiguration conf = getSystemConfiguration(); Property maxMessageSizeProperty = (conf.get(Property.TSERV_MAX_MESSAGE_SIZE) != null ? Property.TSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE); http://git-wip-us.apache.org/repos/asf/accumulo/blob/e117a78c/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java index 5036cab..336bf87 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java @@ -16,10 +16,30 @@ */ package org.apache.accumulo.tserver.replication; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map.Entry; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.replication.RemoteReplicationErrorCode; import org.apache.accumulo.core.replication.thrift.KeyValues; import org.apache.accumulo.core.replication.thrift.RemoteReplicationException; import org.apache.accumulo.core.replication.thrift.ReplicationServicer.Iface; import org.apache.accumulo.core.replication.thrift.WalEdits; +import org.apache.accumulo.core.security.Credentials; +import org.apache.accumulo.server.security.SystemCredentials; +import org.apache.accumulo.tserver.logger.LogFileKey; +import org.apache.accumulo.tserver.logger.LogFileValue; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,12 +50,93 @@ import org.slf4j.LoggerFactory; public class ReplicationServicerHandler implements Iface { private static final Logger log = LoggerFactory.getLogger(ReplicationServicerHandler.class); + private Instance inst; + private AccumuloConfiguration conf; + + public ReplicationServicerHandler(Instance inst, AccumuloConfiguration conf) { + this.inst = inst; + this.conf = conf; + } + + @Override public long replicateLog(int remoteTableId, WalEdits data) throws RemoteReplicationException, TException { - log.error("Got replication request to tableID {} with {} edits", remoteTableId, data.getEditsSize()); + log.debug("Got replication request to tableID {} with {} edits", remoteTableId, data.getEditsSize()); + + String tableId = Integer.toString(remoteTableId); + LogFileKey key = new LogFileKey(); + LogFileValue value = new LogFileValue(); + + BatchWriter bw = null; + try { + for (ByteBuffer edit : data.getEdits()) { + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(edit.array())); + try { + key.readFields(dis); + value.readFields(dis); + } catch (IOException e) { + log.error("Could not deserialize edit from stream", e); + throw new RemoteReplicationException(RemoteReplicationErrorCode.COULD_NOT_DESERIALIZE.ordinal(), "Could not deserialize edit from stream"); + } + + // Create the batchScanner if we don't already have one. + if (null == bw) { + bw = getBatchWriter(tableId); + } + + try { + bw.addMutations(value.mutations); + } catch (MutationsRejectedException e) { + log.error("Could not apply mutations to {}", remoteTableId); + throw new RemoteReplicationException(RemoteReplicationErrorCode.COULD_NOT_APPLY.ordinal(), "Could not apply mutations to " + remoteTableId); + } + } + } finally { + if (null != bw) { + try { + bw.close(); + } catch (MutationsRejectedException e) { + log.error("Could not apply mutations to {}", remoteTableId); + throw new RemoteReplicationException(RemoteReplicationErrorCode.COULD_NOT_APPLY.ordinal(), "Could not apply mutations to " + remoteTableId); + } + } + } + + return data.getEditsSize(); } + protected BatchWriter getBatchWriter(String tableId) throws RemoteReplicationException { + Credentials creds = SystemCredentials.get(); + Connector conn; + String tableName = null; + try { + conn = inst.getConnector(creds.getPrincipal(), creds.getToken()); + } catch (AccumuloException | AccumuloSecurityException e) { + log.error("Could not get connector with system credentials. Something is very wrong", e); + throw new RemoteReplicationException(RemoteReplicationErrorCode.CANNOT_AUTHENTICATE.ordinal(), "Cannot get Connector with system credentials"); + } + + for (Entry<String,String> nameToId : conn.tableOperations().tableIdMap().entrySet()) { + if (tableId.equals(nameToId.getValue())) { + tableName = nameToId.getKey(); + break; + } + } + + if (null == tableName) { + log.error("Table with id of " + tableId + " does not exist"); + throw new RemoteReplicationException(RemoteReplicationErrorCode.TABLE_DOES_NOT_EXIST.ordinal(), "Table with id of " + tableId + " does not exist"); + } + + try { + return conn.createBatchWriter(tableName, new BatchWriterConfig()); + } catch (TableNotFoundException e) { + log.error("Table with id of " + tableId + " does not exist"); + throw new RemoteReplicationException(RemoteReplicationErrorCode.TABLE_DOES_NOT_EXIST.ordinal(), "Table with id of " + tableId + " does not exist"); + } + } + @Override public long replicateKeyValues(int remoteTableId, KeyValues data) throws RemoteReplicationException, TException { throw new UnsupportedOperationException(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/e117a78c/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java index 0ad8066..3c1ebac 100644 --- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java +++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java @@ -26,21 +26,25 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection; +import org.apache.accumulo.core.replication.StatusUtil; import org.apache.accumulo.core.replication.proto.Replication.Status; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl; import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.monitor.Monitor; import org.apache.accumulo.server.replication.ReplicationTable; import org.apache.accumulo.test.functional.ConfigurableMacIT; import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RawLocalFileSystem; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.protobuf.TextFormat; +import com.google.common.collect.Iterables; /** * @@ -51,7 +55,7 @@ public class ReplicationIT extends ConfigurableMacIT { @Override public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { cfg.setNumTservers(1); - cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "32M"); + cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "2M"); cfg.setProperty(Property.GC_CYCLE_START, "1s"); cfg.setProperty(Property.GC_CYCLE_DELAY, "5s"); cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "5s"); @@ -59,18 +63,22 @@ public class ReplicationIT extends ConfigurableMacIT { hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); } - @Test//(timeout = 60 * 1000) - public void test() throws Exception { + @Test(timeout = 60 * 5000) + public void dataWasReplicatedToThePeer() throws Exception { MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"), ROOT_PASSWORD); peerCfg.setNumTservers(1); peerCfg.setInstanceName("peer"); + peerCfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M"); peerCfg.setProperty(Property.MASTER_REPLICATION_COORDINATOR_PORT, "10003"); peerCfg.setProperty(Property.REPLICATION_RECEIPT_SERVICE_PORT, "10004"); + peerCfg.setProperty(Property.REPLICATION_THREADCHECK, "5m"); MiniAccumuloClusterImpl peerCluster = peerCfg.build(); peerCluster.start(); + Process monitor = peerCluster.exec(Monitor.class); + Connector connMaster = getConnector(); Connector connPeer = peerCluster.getConnector("root", ROOT_PASSWORD); @@ -90,23 +98,23 @@ public class ReplicationIT extends ConfigurableMacIT { String peerTableId = connPeer.tableOperations().tableIdMap().get(peerTable); Assert.assertNotNull(peerTableId); - // Replicate this table to the peerClusterName in a table with the peerTableId table id + // Replicate this table to the peerClusterName in a table with the peerTableId table id connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION.getKey(), "true"); connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION_TARGETS.getKey() + peerClusterName, peerTableId); // Write some data to table1 BatchWriter bw = connMaster.createBatchWriter(masterTable, new BatchWriterConfig()); - for (int i = 0; i < 100; i++) { - for (int rows = 0; rows < 1000; rows++) { - Mutation m = new Mutation(i + "_" + Integer.toString(rows)); - for (int cols = 0; cols < 400; cols++) { - String value = Integer.toString(cols); - m.put(value, "", value); - } - bw.addMutation(m); + for (int rows = 0; rows < 5000; rows++) { + Mutation m = new Mutation(Integer.toString(rows)); + for (int cols = 0; cols < 100; cols++) { + String value = Integer.toString(cols); + m.put(value, "", value); } + bw.addMutation(m); } + log.info("Wrote all data to master cluster"); + bw.close(); while (!connMaster.tableOperations().exists(ReplicationTable.NAME)) { @@ -114,19 +122,28 @@ public class ReplicationIT extends ConfigurableMacIT { } connMaster.tableOperations().compact(masterTable, null, null, true, false); - for (int i = 0; i < 10; i++) { + + // Wait until we fully replicated something + boolean fullyReplicated = false; + for (int i = 0; i < 10 && !fullyReplicated; i++) { + UtilWaitThread.sleep(2000); + Scanner s = ReplicationTable.getScanner(connMaster); - for (Entry<Key,Value> e : s) { - Path p = new Path(e.getKey().getRow().toString()); - log.info(p.getName() + " " + e.getKey().getColumnFamily() + " " + e.getKey().getColumnQualifier() + " " + TextFormat.shortDebugString(Status.parseFrom(e.getValue().get()))); + WorkSection.limit(s); + for (Entry<Key,Value> entry : s) { + Status status = Status.parseFrom(entry.getValue().get()); + if (StatusUtil.isFullyReplicated(status)) { + fullyReplicated |= true; + } } + } - log.info(""); - log.info(""); + Assert.assertNotEquals(0, fullyReplicated); - Thread.sleep(3000); - } + // Once we fully replicated some file, we are guaranteed to have data on the remote + Assert.assertTrue(0 < Iterables.size(connPeer.createScanner(peerTable, Authorizations.EMPTY))); + monitor.destroy(); peerCluster.stop(); }