ACCUMULO-1783 Fix up 1.5 build

Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/cb720e85
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/cb720e85
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/cb720e85

Branch: refs/heads/ACCUMULO-1783-1.5
Commit: cb720e850f01668a857b0f22f15ca57f34ddffe9
Parents: 0ce6fb3
Author: Josh Elser <els...@apache.org>
Authored: Sat Nov 23 14:46:37 2013 -0500
Committer: Josh Elser <els...@apache.org>
Committed: Sat Nov 23 14:46:37 2013 -0500

----------------------------------------------------------------------
 .../accumulo/pig/AbstractAccumuloStorage.java   | 44 +++++++++++---------
 .../pig/AbstractAccumuloStorageTest.java        |  1 +
 2 files changed, 26 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/cb720e85/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java 
b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
index 890abf3..801b31f 100644
--- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -79,7 +79,9 @@ public abstract class AbstractAccumuloStorage extends 
LoadFunc implements StoreF
   private static final Log LOG = 
LogFactory.getLog(AbstractAccumuloStorage.class);
 
   private static final String COLON = ":", COMMA = ",";
+
   private static final String INPUT_PREFIX = 
AccumuloInputFormat.class.getSimpleName();
+  private static final String OUTPUT_PREFIX = 
AccumuloOutputFormat.class.getSimpleName();
 
   private RecordReader<Key,Value> reader;
   private RecordWriter<Text,Mutation> writer;
@@ -110,6 +112,10 @@ public abstract class AbstractAccumuloStorage extends 
LoadFunc implements StoreF
   protected Map<String,String> getInputFormatEntries(Configuration conf) {
     return getEntries(conf, INPUT_PREFIX);
   }
+  
+  protected Map<String,String> getOutputFormatEntries(Configuration conf) {
+    return getEntries(conf, OUTPUT_PREFIX);
+  }
 
   @Override
   public Tuple getNext() throws IOException {
@@ -298,30 +304,30 @@ public abstract class AbstractAccumuloStorage extends 
LoadFunc implements StoreF
 
   public void setStoreLocation(String location, Job job) throws IOException {
     setLocationFromUri(location);
+    
+    Map<String,String> entries = 
getOutputFormatEntries(job.getConfiguration());
+    for (String key : entries.keySet()) {
+      job.getConfiguration().unset(key);
+    }
 
-    // If Pig ever uses an approach like they handle inputs (load), this will 
fall apart.
-    // Currently, it appears that multiple stores will get new m/r jobs
-    if (job.getConfiguration().get(AccumuloOutputFormat.class.getSimpleName() 
+ ".configured") == null) {
-      try {
-        AccumuloOutputFormat.setConnectorInfo(job, user, new 
PasswordToken(password));
-      } catch (AccumuloSecurityException e) {
-        throw new IOException(e);
-      }
+    try {
+      AccumuloOutputFormat.setConnectorInfo(job, user, new 
PasswordToken(password));
+    } catch (AccumuloSecurityException e) {
+      throw new IOException(e);
+    }
 
-      // AccumuloOutputFormat.setCreateTables(job, true);
-      // AccumuloOutputFormat.setDefaultTableName(job, table);
-      AccumuloOutputFormat.setZooKeeperInstance(job, inst, zookeepers);
+    AccumuloOutputFormat.setCreateTables(job, true);
+    AccumuloOutputFormat.setZooKeeperInstance(job, inst, zookeepers);
 
-      BatchWriterConfig bwConfig = new BatchWriterConfig();
-      bwConfig.setMaxLatency(maxLatency, TimeUnit.MILLISECONDS);
-      bwConfig.setMaxMemory(maxMutationBufferSize);
-      bwConfig.setMaxWriteThreads(maxWriteThreads);
-      AccumuloOutputFormat.setBatchWriterOptions(job, bwConfig);
+    BatchWriterConfig bwConfig = new BatchWriterConfig();
+    bwConfig.setMaxLatency(maxLatency, TimeUnit.MILLISECONDS);
+    bwConfig.setMaxMemory(maxMutationBufferSize);
+    bwConfig.setMaxWriteThreads(maxWriteThreads);
+    AccumuloOutputFormat.setBatchWriterOptions(job, bwConfig);
 
-      LOG.info("Writing data to " + table);
+    LOG.info("Writing data to " + table);
 
-      configureOutputFormat(job);
-    }
+    configureOutputFormat(job);
   }
 
   @SuppressWarnings("rawtypes")

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/cb720e85/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java 
b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
index 21d4fc7..7302a82 100644
--- a/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AbstractAccumuloStorageTest.java
@@ -94,6 +94,7 @@ public class AbstractAccumuloStorageTest {
     }
     
     AccumuloOutputFormat.setZooKeeperInstance(expected, inst, zookeepers);
+    AccumuloOutputFormat.setCreateTables(expected, true);
     
     BatchWriterConfig bwConfig = new BatchWriterConfig();
     bwConfig.setMaxLatency(maxWriteLatencyMS, TimeUnit.MILLISECONDS);

Reply via email to