Repository: accumulo Updated Branches: refs/heads/master f73e79399 -> 0e68954ab
ACCUMULO-3709: Update both output formats. Add unit test based on Mock Accumulo that exercise the mutations rejection exception Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/6d91115f Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/6d91115f Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/6d91115f Branch: refs/heads/master Commit: 6d91115fe526fe07ede0f189581b561428a93251 Parents: 9132611 Author: phrocker <marc.par...@gmail.com> Authored: Sun Apr 12 16:23:29 2015 -0400 Committer: phrocker <marc.par...@gmail.com> Committed: Sun Apr 12 16:46:06 2015 -0400 ---------------------------------------------------------------------- .../client/mapred/AccumuloOutputFormat.java | 1 + .../client/mapreduce/AccumuloOutputFormat.java | 1 + .../accumulo/test/AccumuloOutputFormatTest.java | 167 +++++++++++++++++++ 3 files changed, 169 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/6d91115f/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java index 637d895..7647e68 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java @@ -478,6 +478,7 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> { if (e.getConstraintViolationSummaries().size() > 0) { log.error("Constraint violations : " + e.getConstraintViolationSummaries().size()); } + throw new IOException(e); } } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6d91115f/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java index b905ce4..c572974 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java @@ -482,6 +482,7 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { if (e.getConstraintViolationSummaries().size() > 0) { log.error("Constraint violations : " + e.getConstraintViolationSummaries().size()); } + throw new IOException(e); } } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6d91115f/test/src/test/java/org/apache/accumulo/test/AccumuloOutputFormatTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/AccumuloOutputFormatTest.java b/test/src/test/java/org/apache/accumulo/test/AccumuloOutputFormatTest.java new file mode 100644 index 0000000..8db91f5 --- /dev/null +++ b/test/src/test/java/org/apache/accumulo/test/AccumuloOutputFormatTest.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + +import static com.google.common.base.Charsets.UTF_8; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.mapred.AccumuloOutputFormat; +import org.apache.accumulo.core.client.mapreduce.lib.util.OutputConfigurator; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.minicluster.MiniAccumuloCluster; +import org.apache.accumulo.minicluster.MiniAccumuloConfig; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import com.google.common.collect.Maps; + +/** + * Exists because mock instance doesn't produce this error when dynamically changing the table permissions. + */ +public class AccumuloOutputFormatTest { + + private static final String TABLE = "abc"; + public static TemporaryFolder folder = new TemporaryFolder(); + private MiniAccumuloCluster accumulo; + private String secret = "secret"; + + @Before + public void setUp() throws Exception { + folder.create(); + MiniAccumuloConfig config = new MiniAccumuloConfig(folder.getRoot(), secret); + + Map<String,String> configMap = Maps.newHashMap(); + configMap.put(Property.TSERV_SESSION_MAXIDLE.toString(), "1"); + config.setSiteConfig(configMap); + config.setNumTservers(1); + accumulo = new MiniAccumuloCluster(config); + + accumulo.start(); + } + + @After + public void tearDown() throws Exception { + accumulo.stop(); + folder.delete(); + } + + protected void writeToTable() + { + + } + + @Test(expected = IOException.class) + public void testMapred() throws Exception { + + ZooKeeperInstance instance = new ZooKeeperInstance(accumulo.getInstanceName(), accumulo.getZooKeepers()); + Connector connector = instance.getConnector("root", new PasswordToken(secret)); + // create a table and put some data in it + connector.tableOperations().create(TABLE); + + JobConf job = new JobConf(); + BatchWriterConfig batchConfig = new BatchWriterConfig(); + // no flushes!!!!! + batchConfig.setMaxLatency(0, TimeUnit.MILLISECONDS); + // use a single thread to ensure our update session times out + batchConfig.setMaxWriteThreads(1); + // set the max memory so that we ensure we don't flush on the write. + batchConfig.setMaxMemory(Long.MAX_VALUE); + OutputConfigurator.setBatchWriterOptions(AccumuloOutputFormat.class, job, batchConfig); + AccumuloOutputFormat outputFormat = new AccumuloOutputFormat(); + AccumuloOutputFormat.setZooKeeperInstance(job, accumulo.getInstanceName(), accumulo.getZooKeepers()); + AccumuloOutputFormat.setConnectorInfo(job, "root", new PasswordToken(secret)); + RecordWriter<Text,Mutation> writer = outputFormat.getRecordWriter(null, job, "Test", null); + + try { + for (int i = 0; i < 3; i++) { + Mutation m = new Mutation(new Text(String.format("%08d", i))); + for (int j = 0; j < 3; j++) + m.put(new Text("cf1"), new Text("cq" + j), new Value((i + "_" + j).getBytes(UTF_8))); + + writer.write(new Text(TABLE), m); + } + } catch (Exception e) { + e.printStackTrace(); + // we don't want the exception to come from write + } + connector.securityOperations().revokeTablePermission("root", TABLE, TablePermission.WRITE); + writer.close(null); + connector.tableOperations().delete(TABLE); + } + + // this isn't really necessary, but let's cover both + @Test(expected = IOException.class) + public void testMapreduce() throws Exception { + + ZooKeeperInstance instance = new ZooKeeperInstance(accumulo.getInstanceName(), accumulo.getZooKeepers()); + Connector connector = instance.getConnector("root", new PasswordToken(secret)); + // create a table and put some data in it + connector.tableOperations().create(TABLE); + + JobConf job = new JobConf(); + BatchWriterConfig batchConfig = new BatchWriterConfig(); + // no flushes!!!!! + batchConfig.setMaxLatency(0, TimeUnit.MILLISECONDS); + // use a single thread to ensure our update session times out + batchConfig.setMaxWriteThreads(1); + // set the max memory so that we ensure we don't flush on the write. + batchConfig.setMaxMemory(Long.MAX_VALUE); + OutputConfigurator.setBatchWriterOptions(org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat.class, job, batchConfig); + org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat outputFormat = new org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat(); + org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat.setZooKeeperInstance(job, accumulo.getInstanceName(), accumulo.getZooKeepers()); + + Job stubbedJob = new Job(job); + + org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat.setConnectorInfo(stubbedJob, "root", new PasswordToken(secret)); + TaskAttemptContext context = new TaskAttemptContext(stubbedJob.getConfiguration(),new TaskAttemptID()); + org.apache.hadoop.mapreduce.RecordWriter<Text,Mutation> writer = outputFormat.getRecordWriter(context); + + try { + for (int i = 0; i < 3; i++) { + Mutation m = new Mutation(new Text(String.format("%08d", i))); + for (int j = 0; j < 3; j++) + m.put(new Text("cf1"), new Text("cq" + j), new Value((i + "_" + j).getBytes(UTF_8))); + + writer.write(new Text(TABLE), m); + } + } catch (Exception e) { + e.printStackTrace(); + // we don't want the exception to come from write + } + connector.securityOperations().revokeTablePermission("root", TABLE, TablePermission.WRITE); + writer.close(null); + connector.tableOperations().delete(TABLE); + } +}