ACCUMULO-378 Fix up some tests that failed on jenkins and remove now unnecessary test
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f247c8e2 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f247c8e2 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f247c8e2 Branch: refs/heads/ACCUMULO-378 Commit: f247c8e251abb99765ed28b98873f22152d52f58 Parents: 91396e5 Author: Josh Elser <els...@apache.org> Authored: Wed Apr 30 17:30:44 2014 -0400 Committer: Josh Elser <els...@apache.org> Committed: Wed Apr 30 17:30:44 2014 -0400 ---------------------------------------------------------------------- .../replication/ReplicationDeadlockTest.java | 3 - .../ReplicationTableTimestampIT.java | 245 ------------------- .../test/replication/ReplicationWithGCIT.java | 2 +- .../replication/ReplicationWithMakerTest.java | 40 ++- 4 files changed, 32 insertions(+), 258 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/f247c8e2/test/src/test/java/org/apache/accumulo/test/replication/ReplicationDeadlockTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationDeadlockTest.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationDeadlockTest.java index 9713c8c..d43aa32 100644 --- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationDeadlockTest.java +++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationDeadlockTest.java @@ -17,7 +17,6 @@ package org.apache.accumulo.test.replication; import java.util.Arrays; -import java.util.Collection; import java.util.HashSet; import java.util.Map.Entry; import java.util.Set; @@ -36,9 +35,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Lo import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.TablePermission; import org.apache.accumulo.core.tabletserver.log.LogEntry; -import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; -import org.apache.accumulo.minicluster.impl.ProcessReference; import org.apache.accumulo.server.replication.ReplicationTable; import org.apache.accumulo.test.functional.ConfigurableMacIT; import org.apache.hadoop.conf.Configuration; http://git-wip-us.apache.org/repos/asf/accumulo/blob/f247c8e2/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTableTimestampIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTableTimestampIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTableTimestampIT.java deleted file mode 100644 index 3111164..0000000 --- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTableTimestampIT.java +++ /dev/null @@ -1,245 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.replication; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.BatchWriterConfig; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; -import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; -import org.apache.accumulo.core.replication.proto.Replication.Status; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.tabletserver.log.LogEntry; -import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; -import org.apache.accumulo.server.replication.ReplicationTable; -import org.apache.accumulo.test.functional.ConfigurableMacIT; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RawLocalFileSystem; -import org.junit.Assert; -import org.junit.Test; - -import com.google.common.collect.HashMultimap; -import com.google.common.collect.LinkedListMultimap; -import com.google.common.collect.Multimap; - -/** - * Integration Tests that attempt to evaluate the accuracy of the internal bookkeeping performed on the accumulo "master" instance. Does not send data to any - * remote instance, merely tracks what is stored locally. - */ -public class ReplicationTableTimestampIT extends ConfigurableMacIT { - @Override - public int defaultTimeoutSeconds() { - return 300; - } - - @Override - public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "0s"); - cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1M"); - cfg.setProperty(Property.GC_CYCLE_START, "1s"); - cfg.setProperty(Property.GC_CYCLE_DELAY, "1s"); - cfg.setNumTservers(1); - hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); - } - - private Multimap<String,String> getLogs(Connector conn) throws TableNotFoundException { - Multimap<String,String> logs = HashMultimap.create(); - Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); - scanner.fetchColumnFamily(LogColumnFamily.NAME); - scanner.setRange(new Range()); - for (Entry<Key,Value> entry : scanner) { - if (Thread.interrupted()) { - return logs; - } - - LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue()); - - for (String log : logEntry.logSet) { - logs.put(new Path(log).toString(), logEntry.extent.getTableId().toString()); - } - } - return logs; - } - - @Test - public void closedReplicationStatusStayClosed() throws Exception { - final Connector conn = getConnector(); - String table1 = "table1", table2 = "table2", table3 = "table3"; - final Multimap<String,String> metadataTableWals = HashMultimap.create(); - final AtomicBoolean keepRunning = new AtomicBoolean(true); - - Thread t = new Thread(new Runnable() { - @Override - public void run() { - // Should really be able to interrupt here, but the Scanner throws a fit to the logger - // when that happens - while (keepRunning.get()) { - try { - metadataTableWals.putAll(getLogs(conn)); - } catch (TableNotFoundException e) { - log.error("Metadata table doesn't exist"); - } - } - } - - }); - - t.start(); - - conn.tableOperations().create(table1); - conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true"); - conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1"); - Thread.sleep(1000); - - // Write some data to table1 - BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig()); - for (int rows = 0; rows < 200; rows++) { - Mutation m = new Mutation(Integer.toString(rows)); - for (int cols = 0; cols < 500; cols++) { - String value = Integer.toString(cols); - m.put(value, "", value); - } - bw.addMutation(m); - } - - bw.close(); - - conn.tableOperations().create(table2); - conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true"); - conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1"); - Thread.sleep(1000); - - // Write some data to table2 - bw = conn.createBatchWriter(table2, new BatchWriterConfig()); - for (int rows = 0; rows < 200; rows++) { - Mutation m = new Mutation(Integer.toString(rows)); - for (int cols = 0; cols < 500; cols++) { - String value = Integer.toString(cols); - m.put(value, "", value); - } - bw.addMutation(m); - } - - bw.close(); - - conn.tableOperations().create(table3); - conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true"); - conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1"); - Thread.sleep(1000); - - // Write some data to table3 - bw = conn.createBatchWriter(table3, new BatchWriterConfig()); - for (int rows = 0; rows < 200; rows++) { - Mutation m = new Mutation(Integer.toString(rows)); - for (int cols = 0; cols < 500; cols++) { - String value = Integer.toString(cols); - m.put(value, "", value); - } - bw.addMutation(m); - } - - bw.close(); - - // Stop the thread which is constantly scanning metadata to track all WALs seen - keepRunning.set(false); - t.join(5000); - - // See which files we have for replication in the replication table - Scanner s = ReplicationTable.getScanner(conn); - StatusSection.limit(s); - Set<String> replFiles = new HashSet<>(); - for (Entry<Key,Value> entry : s) { - replFiles.add(entry.getKey().getRow().toString()); - } - - // We might have a WAL that was used solely for the replication table - // We want to remove that from our list as it should not appear in the replication table - String replicationTableId = conn.tableOperations().tableIdMap().get(ReplicationTable.NAME); - Iterator<Entry<String,String>> observedLogs = metadataTableWals.entries().iterator(); - while (observedLogs.hasNext()) { - Entry<String,String> observedLog = observedLogs.next(); - if (replicationTableId.equals(observedLog.getValue())) { - observedLogs.remove(); - } - } - - // We should have *some* reference to each log that was seen in the metadata table - // They might not yet all be closed though (might be newfile) - Assert.assertEquals("Metadata log distribution: " + metadataTableWals, metadataTableWals.keySet(), replFiles); - - LinkedListMultimap<String,Entry<Key,Value>> kvByRow = LinkedListMultimap.create(); - s = ReplicationTable.getScanner(conn); - StatusSection.limit(s); - for (Entry<Key,Value> entry : s) { - kvByRow.put(entry.getKey().getRow().toString(), entry); - } - - for (String row : kvByRow.keySet()) { - ArrayList<Entry<Key,Value>> kvs = new ArrayList<>(kvByRow.get(row)); - Collections.sort(kvs, new Comparator<Entry<Key,Value>>() { - @Override - public int compare(Entry<Key,Value> o1, Entry<Key,Value> o2) { - return (new Long(o1.getKey().getTimestamp())).compareTo(new Long(o2.getKey().getTimestamp())); - } - }); - - Key closedKey = null; - boolean observedClosed = false; - for (Entry<Key,Value> kv : kvs) { - Status status = Status.parseFrom(kv.getValue().get()); - - // Once we get a closed record, every subsequent record should *also* be closed - // A file cannot be "re-opened" - if (!observedClosed) { - if (status.getClosed()) { - closedKey = kv.getKey(); - observedClosed = true; - } - } else { - Assert.assertTrue("Found a non-closed Status (" + kv.getKey().toStringNoTruncate() + ") after a closed Status (" + closedKey.toStringNoTruncate() + ") was observed", status.getClosed()); - } - } - - } - - for (String replFile : replFiles) { - Path p = new Path(replFile); - FileSystem fs = p.getFileSystem(new Configuration()); - Assert.assertTrue("File does not exist anymore, it was likely incorrectly garbage collected: " + p, fs.exists(p)); - } - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/f247c8e2/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java index 3f6b8dc..449827b 100644 --- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java +++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java @@ -496,7 +496,7 @@ public class ReplicationWithGCIT extends ConfigurableMacIT { s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); s.setRange(ReplicationSection.getRange()); recordsFound = 0; - for (@SuppressWarnings("unused") Entry<Key,Value> entry : s) { + for (Entry<Key,Value> entry : s) { recordsFound++; log.info(entry.getKey().toStringNoTruncate() + " " + Status.parseFrom(entry.getValue().get()).toString().replace("\n", ", ")); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/f247c8e2/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java index 1ffb2a2..1c056ed 100644 --- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java +++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java @@ -33,7 +33,9 @@ import org.apache.accumulo.core.replication.ReplicationTarget; import org.apache.accumulo.core.replication.StatusUtil; import org.apache.accumulo.core.replication.proto.Replication.Status; import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.minicluster.impl.ProcessReference; import org.apache.accumulo.server.replication.ReplicationTable; import org.apache.accumulo.test.functional.ConfigurableMacIT; import org.apache.hadoop.conf.Configuration; @@ -58,12 +60,20 @@ public class ReplicationWithMakerTest extends ConfigurableMacIT { public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1M"); + // Run the process in the master which writes replication records from metadata to replication + // repeatedly without pause cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "0"); cfg.setNumTservers(1); } @Test public void singleTableSingleTarget() throws Exception { + // We want to kill the GC so it doesn't come along and close Status records and mess up the comparisons + // against expected Status messages. + for (ProcessReference proc : cluster.getProcesses().get(ServerType.GARBAGE_COLLECTOR)) { + cluster.killProcess(ServerType.GARBAGE_COLLECTOR, proc); + } + Connector conn = getConnector(); String table1 = "table1"; @@ -80,7 +90,7 @@ public class ReplicationWithMakerTest extends ConfigurableMacIT { conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true"); // Replicate table1 to cluster1 in the table with id of '4' conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "4"); - attempts = 0; + break; } catch (Exception e) { attempts--; if (attempts <= 0) { @@ -113,26 +123,39 @@ public class ReplicationWithMakerTest extends ConfigurableMacIT { attempts--; } } while (!exists && attempts > 0); - Assert.assertTrue("Replication table did not exist", exists); + Assert.assertTrue("Replication table was never created", exists); // ACCUMULO-2743 The Observer in the tserver has to be made aware of the change to get the combiner (made by the master) for (int i = 0; i < 5 && !conn.tableOperations().listIterators(ReplicationTable.NAME).keySet().contains(ReplicationTable.COMBINER_NAME); i++) { UtilWaitThread.sleep(1000); } - Assert.assertTrue("Did not find expected combiner", conn.tableOperations().listIterators(ReplicationTable.NAME).keySet().contains(ReplicationTable.COMBINER_NAME)); + Assert.assertTrue("Combiner was never set on replication table", + conn.tableOperations().listIterators(ReplicationTable.NAME).keySet().contains(ReplicationTable.COMBINER_NAME)); + + // Trigger the minor compaction, waiting for it to finish. + // This should write the entry to metadata that the file has data + conn.tableOperations().flush(ReplicationTable.NAME, null, null, true); // Make sure that we have one status element, should be a new file Scanner s = ReplicationTable.getScanner(conn); StatusSection.limit(s); Entry<Key,Value> entry = null; attempts = 5; + // This record will move from new to new with infinite length because of the minc (flush) + Status expectedStatus = StatusUtil.openWithUnknownLength(); while (null == entry && attempts > 0) { - try{ + try { entry = Iterables.getOnlyElement(s); + if (!expectedStatus.equals(Status.parseFrom(entry.getValue().get()))) { + entry = null; + // the master process didn't yet fire and write the new mutation, wait for it to do + // so and try to read it again + Thread.sleep(1000); + } } catch (NoSuchElementException e) { entry = null; - Thread.sleep(200); + Thread.sleep(500); } catch (IllegalArgumentException e) { // saw this contain 2 elements once s = ReplicationTable.getScanner(conn); @@ -146,8 +169,8 @@ public class ReplicationWithMakerTest extends ConfigurableMacIT { } } - Assert.assertNotNull(entry); - Assert.assertEquals(StatusUtil.openWithUnknownLength(), Status.parseFrom(entry.getValue().get())); + Assert.assertNotNull("Could not find expected entry in replication table", entry); + Assert.assertEquals("Expected to find a replication entry that is open with infinite length", expectedStatus, Status.parseFrom(entry.getValue().get())); // Try a couple of times to watch for the work record to be created boolean notFound = true; @@ -276,8 +299,7 @@ public class ReplicationWithMakerTest extends ConfigurableMacIT { Text expectedColqual = ReplicationTarget.toText(new ReplicationTarget("cluster1", "4")); Assert.assertEquals(expectedColqual, e.getKey().getColumnQualifier()); notFound = false; - } catch (NoSuchElementException e) { - } catch (IllegalArgumentException e) { + } catch (NoSuchElementException e) {} catch (IllegalArgumentException e) { s = ReplicationTable.getScanner(conn); for (Entry<Key,Value> content : s) { log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());