ACCUMULO-1783 Fix up 1.5 build
Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/cb720e85 Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/cb720e85 Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/cb720e85 Branch: refs/heads/ACCUMULO-1783-1.5 Commit: cb720e850f01668a857b0f22f15ca57f34ddffe9 Parents: 0ce6fb3 Author: Josh Elser <els...@apache.org> Authored: Sat Nov 23 14:46:37 2013 -0500 Committer: Josh Elser <els...@apache.org> Committed: Sat Nov 23 14:46:37 2013 -0500 ---------------------------------------------------------------------- .../accumulo/pig/AbstractAccumuloStorage.java | 44 +++++++++++--------- .../pig/AbstractAccumuloStorageTest.java | 1 + 2 files changed, 26 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/cb720e85/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java index 890abf3..801b31f 100644 --- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java +++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java @@ -79,7 +79,9 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF private static final Log LOG = LogFactory.getLog(AbstractAccumuloStorage.class); private static final String COLON = ":", COMMA = ","; + private static final String INPUT_PREFIX = AccumuloInputFormat.class.getSimpleName(); + private static final String OUTPUT_PREFIX = AccumuloOutputFormat.class.getSimpleName(); private RecordReader<Key,Value> reader; private RecordWriter<Text,Mutation> writer; @@ -110,6 +112,10 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF protected Map<String,String> getInputFormatEntries(Configuration conf) { return getEntries(conf, INPUT_PREFIX); } + + protected Map<String,String> getOutputFormatEntries(Configuration conf) { + return getEntries(conf, OUTPUT_PREFIX); + } @Override public Tuple getNext() throws IOException { @@ -298,30 +304,30 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF public void setStoreLocation(String location, Job job) throws IOException { setLocationFromUri(location); + + Map<String,String> entries = getOutputFormatEntries(job.getConfiguration()); + for (String key : entries.keySet()) { + job.getConfiguration().unset(key); + } - // If Pig ever uses an approach like they handle inputs (load), this will fall apart. - // Currently, it appears that multiple stores will get new m/r jobs - if (job.getConfiguration().get(AccumuloOutputFormat.class.getSimpleName() + ".configured") == null) { - try { - AccumuloOutputFormat.setConnectorInfo(job, user, new PasswordToken(password)); - } catch (AccumuloSecurityException e) { - throw new IOException(e); - } + try { + AccumuloOutputFormat.setConnectorInfo(job, user, new PasswordToken(password)); + } catch (AccumuloSecurityException e) { + throw new IOException(e); + } - // AccumuloOutputFormat.setCreateTables(job, true); - // AccumuloOutputFormat.setDefaultTableName(job, table); - AccumuloOutputFormat.setZooKeeperInstance(job, inst, zookeepers); + AccumuloOutputFormat.setCreateTables(job, true); + AccumuloOutputFormat.setZooKeeperInstance(job, inst, zookeepers); - BatchWriterConfig bwConfig = new BatchWriterConfig(); - bwConfig.setMaxLatency(maxLatency, TimeUnit.MILLISECONDS); - bwConfig.setMaxMemory(maxMutationBufferSize); - bwConfig.setMaxWriteThreads(maxWriteThreads); - AccumuloOutputFormat.setBatchWriterOptions(job, bwConfig); + BatchWriterConfig bwConfig = new BatchWriterConfig(); + bwConfig.setMaxLatency(maxLatency, TimeUnit.MILLISECONDS); + bwConfig.setMaxMemory(maxMutationBufferSize); + bwConfig.setMaxWriteThreads(maxWriteThreads); + AccumuloOutputFormat.setBatchWriterOptions(job, bwConfig); - LOG.info("Writing data to " + table); + LOG.info("Writing data to " + table); - configureOutputFormat(job); - } + configureOutputFormat(job); } @SuppressWarnings("rawtypes") http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/cb720e85/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java index 21d4fc7..7302a82 100644 --- a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java +++ b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java @@ -94,6 +94,7 @@ public class AbstractAccumuloStorageTest { } AccumuloOutputFormat.setZooKeeperInstance(expected, inst, zookeepers); + AccumuloOutputFormat.setCreateTables(expected, true); BatchWriterConfig bwConfig = new BatchWriterConfig(); bwConfig.setMaxLatency(maxWriteLatencyMS, TimeUnit.MILLISECONDS);