This is an automated email from the ASF dual-hosted git repository. jmark99 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo-examples.git
The following commit(s) were added to refs/heads/main by this push: new a0cac7c Comment out portions of filedata example (#108) a0cac7c is described below commit a0cac7cbb0cc8c62c690e33e12acbeab9c517cca Author: Mark Owens <jmar...@apache.org> AuthorDate: Tue Nov 22 16:23:50 2022 -0500 Comment out portions of filedata example (#108) With the example repo now targeting Accumulo-3.0.0-SNAPSHOT, the filedata example now longer works. In addition, it prevents the project from building. Several files have been commented in order to allow the project to build. A follow-on issue will be created to address the non-working example. --- .../examples/filedata/CharacterHistogram.java | 224 +++---- .../examples/filedata/ChunkInputFormat.java | 169 ++--- .../examples/filedata/ChunkInputFormatIT.java | 688 +++++++++++---------- 3 files changed, 542 insertions(+), 539 deletions(-) diff --git a/src/main/java/org/apache/accumulo/examples/filedata/CharacterHistogram.java b/src/main/java/org/apache/accumulo/examples/filedata/CharacterHistogram.java index 01c08bc..d24c4c9 100644 --- a/src/main/java/org/apache/accumulo/examples/filedata/CharacterHistogram.java +++ b/src/main/java/org/apache/accumulo/examples/filedata/CharacterHistogram.java @@ -1,112 +1,112 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.examples.filedata; - -import java.io.IOException; -import java.io.InputStream; -import java.util.Arrays; -import java.util.List; -import java.util.Map.Entry; -import java.util.Properties; - -import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.iterators.user.SummingArrayCombiner; -import org.apache.accumulo.core.security.ColumnVisibility; -import org.apache.accumulo.examples.cli.ClientOpts; -import org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; - -import com.beust.jcommander.Parameter; - -/** - * A MapReduce that computes a histogram of byte frequency for each file and stores the histogram - * alongside the file data. The {@link ChunkInputFormat} is used to read the file data from - * Accumulo. - */ -public class CharacterHistogram { - - private static final String VIS = "vis"; - - public static class HistMapper extends Mapper<List<Entry<Key,Value>>,InputStream,Text,Mutation> { - private ColumnVisibility cv; - - @Override - public void map(List<Entry<Key,Value>> k, InputStream v, Context context) - throws IOException, InterruptedException { - Long[] hist = new Long[256]; - Arrays.fill(hist, 0L); - int b = v.read(); - while (b >= 0) { - hist[b] += 1L; - b = v.read(); - } - v.close(); - Mutation m = new Mutation(k.get(0).getKey().getRow()); - m.put("info", "hist", cv, - new Value(SummingArrayCombiner.STRING_ARRAY_ENCODER.encode(Arrays.asList(hist)))); - context.write(new Text(), m); - } - - @Override - protected void setup(Context context) { - cv = new ColumnVisibility(context.getConfiguration().get(VIS, "")); - } - } - - static class Opts extends ClientOpts { - @Parameter(names = {"-t", "--table"}, required = true, description = "table to use") - String tableName; - @Parameter(names = "--vis") - String visibilities = ""; - } - - @SuppressWarnings("deprecation") - public static void main(String[] args) throws Exception { - Opts opts = new Opts(); - opts.parseArgs(CharacterHistogram.class.getName(), args); - - Job job = Job.getInstance(opts.getHadoopConfig()); - job.setJobName(CharacterHistogram.class.getSimpleName()); - job.setJarByClass(CharacterHistogram.class); - job.setInputFormatClass(ChunkInputFormat.class); - job.getConfiguration().set(VIS, opts.visibilities); - job.setMapperClass(HistMapper.class); - job.setMapOutputKeyClass(Text.class); - job.setMapOutputValueClass(Mutation.class); - - job.setNumReduceTasks(0); - - Properties props = opts.getClientProperties(); - ChunkInputFormat.setZooKeeperInstance(job, props.getProperty("instance.name"), - props.getProperty("instance.zookeepers")); - PasswordToken token = new PasswordToken(props.getProperty("auth.token")); - ChunkInputFormat.setConnectorInfo(job, props.getProperty("auth.principal"), token); - ChunkInputFormat.setInputTableName(job, opts.tableName); - ChunkInputFormat.setScanAuthorizations(job, opts.auths); - - job.setOutputFormatClass(AccumuloOutputFormat.class); - AccumuloOutputFormat.configure().clientProperties(opts.getClientProperties()) - .defaultTable(opts.tableName).createTables(true).store(job); - - System.exit(job.waitForCompletion(true) ? 0 : 1); - } -} +/// * +// * Licensed to the Apache Software Foundation (ASF) under one or more +// * contributor license agreements. See the NOTICE file distributed with +// * this work for additional information regarding copyright ownership. +// * The ASF licenses this file to You under the Apache License, Version 2.0 +// * (the "License"); you may not use this file except in compliance with +// * the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +// package org.apache.accumulo.examples.filedata; +// +// import java.io.IOException; +// import java.io.InputStream; +// import java.util.Arrays; +// import java.util.List; +// import java.util.Map.Entry; +// import java.util.Properties; +// +// import org.apache.accumulo.core.client.security.tokens.PasswordToken; +// import org.apache.accumulo.core.data.Key; +// import org.apache.accumulo.core.data.Mutation; +// import org.apache.accumulo.core.data.Value; +// import org.apache.accumulo.core.iterators.user.SummingArrayCombiner; +// import org.apache.accumulo.core.security.ColumnVisibility; +// import org.apache.accumulo.examples.cli.ClientOpts; +// import org.apache.accumulo.hadoop.mapreduce.AccumuloOutputFormat; +// import org.apache.hadoop.io.Text; +// import org.apache.hadoop.mapreduce.Job; +// import org.apache.hadoop.mapreduce.Mapper; +// +// import com.beust.jcommander.Parameter; +// +/// ** +// * A MapReduce that computes a histogram of byte frequency for each file and stores the histogram +// * alongside the file data. The {@link ChunkInputFormat} is used to read the file data from +// * Accumulo. +// */ +// public class CharacterHistogram { +// +// private static final String VIS = "vis"; +// +// public static class HistMapper extends Mapper<List<Entry<Key,Value>>,InputStream,Text,Mutation> { +// private ColumnVisibility cv; +// +// @Override +// public void map(List<Entry<Key,Value>> k, InputStream v, Context context) +// throws IOException, InterruptedException { +// Long[] hist = new Long[256]; +// Arrays.fill(hist, 0L); +// int b = v.read(); +// while (b >= 0) { +// hist[b] += 1L; +// b = v.read(); +// } +// v.close(); +// Mutation m = new Mutation(k.get(0).getKey().getRow()); +// m.put("info", "hist", cv, +// new Value(SummingArrayCombiner.STRING_ARRAY_ENCODER.encode(Arrays.asList(hist)))); +// context.write(new Text(), m); +// } +// +// @Override +// protected void setup(Context context) { +// cv = new ColumnVisibility(context.getConfiguration().get(VIS, "")); +// } +// } +// +// static class Opts extends ClientOpts { +// @Parameter(names = {"-t", "--table"}, required = true, description = "table to use") +// String tableName; +// @Parameter(names = "--vis") +// String visibilities = ""; +// } +// +// @SuppressWarnings("deprecation") +// public static void main(String[] args) throws Exception { +// Opts opts = new Opts(); +// opts.parseArgs(CharacterHistogram.class.getName(), args); +// +// Job job = Job.getInstance(opts.getHadoopConfig()); +// job.setJobName(CharacterHistogram.class.getSimpleName()); +// job.setJarByClass(CharacterHistogram.class); +// job.setInputFormatClass(ChunkInputFormat.class); +// job.getConfiguration().set(VIS, opts.visibilities); +// job.setMapperClass(HistMapper.class); +// job.setMapOutputKeyClass(Text.class); +// job.setMapOutputValueClass(Mutation.class); +// +// job.setNumReduceTasks(0); +// +// Properties props = opts.getClientProperties(); +// ChunkInputFormat.setZooKeeperInstance(job, props.getProperty("instance.name"), +// props.getProperty("instance.zookeepers")); +// PasswordToken token = new PasswordToken(props.getProperty("auth.token")); +// ChunkInputFormat.setConnectorInfo(job, props.getProperty("auth.principal"), token); +// ChunkInputFormat.setInputTableName(job, opts.tableName); +// ChunkInputFormat.setScanAuthorizations(job, opts.auths); +// +// job.setOutputFormatClass(AccumuloOutputFormat.class); +// AccumuloOutputFormat.configure().clientProperties(opts.getClientProperties()) +// .defaultTable(opts.tableName).createTables(true).store(job); +// +// System.exit(job.waitForCompletion(true) ? 0 : 1); +// } +// } diff --git a/src/main/java/org/apache/accumulo/examples/filedata/ChunkInputFormat.java b/src/main/java/org/apache/accumulo/examples/filedata/ChunkInputFormat.java index 914375f..00965a2 100644 --- a/src/main/java/org/apache/accumulo/examples/filedata/ChunkInputFormat.java +++ b/src/main/java/org/apache/accumulo/examples/filedata/ChunkInputFormat.java @@ -1,84 +1,85 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.examples.filedata; - -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.List; -import java.util.Map.Entry; - -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.examples.util.FormatUtil; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -import com.google.common.collect.Iterators; -import com.google.common.collect.PeekingIterator; - -/** - * An InputFormat that turns the file data ingested with {@link FileDataIngest} into an InputStream - * using {@link ChunkInputStream}. Mappers used with this InputFormat must close the InputStream. - */ -@SuppressWarnings("deprecation") -public class ChunkInputFormat extends - org.apache.accumulo.core.client.mapreduce.InputFormatBase<List<Entry<Key,Value>>,InputStream> { - @Override - public RecordReader<List<Entry<Key,Value>>,InputStream> createRecordReader(InputSplit split, - TaskAttemptContext context) { - return new RecordReaderBase<>() { - private PeekingIterator<Entry<Key,Value>> peekingScannerIterator; - - @Override - public void initialize(InputSplit inSplit, TaskAttemptContext attempt) throws IOException { - super.initialize(inSplit, attempt); - peekingScannerIterator = Iterators.peekingIterator(scannerIterator); - currentK = new ArrayList<>(); - currentV = new ChunkInputStream(); - } - - @Override - public boolean nextKeyValue() throws IOException { - log.debug("nextKeyValue called"); - - currentK.clear(); - if (peekingScannerIterator.hasNext()) { - ++numKeysRead; - Entry<Key,Value> entry = peekingScannerIterator.peek(); - while (!entry.getKey().getColumnFamily().equals(FileDataIngest.CHUNK_CF)) { - currentK.add(entry); - peekingScannerIterator.next(); - if (!peekingScannerIterator.hasNext()) { - return true; - } - entry = peekingScannerIterator.peek(); - } - currentKey = entry.getKey(); - ((ChunkInputStream) currentV).setSource(peekingScannerIterator); - if (log.isTraceEnabled()) { - log.trace("Processing key/value pair: " + FormatUtil.formatTableEntry(entry, true)); - } - - return true; - } - return false; - } - }; - } -} +/// * +// * Licensed to the Apache Software Foundation (ASF) under one or more +// * contributor license agreements. See the NOTICE file distributed with +// * this work for additional information regarding copyright ownership. +// * The ASF licenses this file to You under the Apache License, Version 2.0 +// * (the "License"); you may not use this file except in compliance with +// * the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +// package org.apache.accumulo.examples.filedata; +// +// import java.io.IOException; +// import java.io.InputStream; +// import java.util.ArrayList; +// import java.util.List; +// import java.util.Map.Entry; +// +// import org.apache.accumulo.core.data.Key; +// import org.apache.accumulo.core.data.Value; +// import org.apache.accumulo.examples.util.FormatUtil; +// import org.apache.hadoop.mapreduce.InputSplit; +// import org.apache.hadoop.mapreduce.RecordReader; +// import org.apache.hadoop.mapreduce.TaskAttemptContext; +// +// import com.google.common.collect.Iterators; +// import com.google.common.collect.PeekingIterator; +// +/// ** +// * An InputFormat that turns the file data ingested with {@link FileDataIngest} into an +/// InputStream +// * using {@link ChunkInputStream}. Mappers used with this InputFormat must close the InputStream. +// */ +// @SuppressWarnings("deprecation") +// public class ChunkInputFormat extends +// org.apache.accumulo.core.client.mapreduce.InputFormatBase<List<Entry<Key,Value>>,InputStream> { +// @Override +// public RecordReader<List<Entry<Key,Value>>,InputStream> createRecordReader(InputSplit split, +// TaskAttemptContext context) { +// return new RecordReaderBase<>() { +// private PeekingIterator<Entry<Key,Value>> peekingScannerIterator; +// +// @Override +// public void initialize(InputSplit inSplit, TaskAttemptContext attempt) throws IOException { +// super.initialize(inSplit, attempt); +// peekingScannerIterator = Iterators.peekingIterator(scannerIterator); +// currentK = new ArrayList<>(); +// currentV = new ChunkInputStream(); +// } +// +// @Override +// public boolean nextKeyValue() throws IOException { +// log.debug("nextKeyValue called"); +// +// currentK.clear(); +// if (peekingScannerIterator.hasNext()) { +// ++numKeysRead; +// Entry<Key,Value> entry = peekingScannerIterator.peek(); +// while (!entry.getKey().getColumnFamily().equals(FileDataIngest.CHUNK_CF)) { +// currentK.add(entry); +// peekingScannerIterator.next(); +// if (!peekingScannerIterator.hasNext()) { +// return true; +// } +// entry = peekingScannerIterator.peek(); +// } +// currentKey = entry.getKey(); +// ((ChunkInputStream) currentV).setSource(peekingScannerIterator); +// if (log.isTraceEnabled()) { +// log.trace("Processing key/value pair: " + FormatUtil.formatTableEntry(entry, true)); +// } +// +// return true; +// } +// return false; +// } +// }; +// } +// } diff --git a/src/test/java/org/apache/accumulo/examples/filedata/ChunkInputFormatIT.java b/src/test/java/org/apache/accumulo/examples/filedata/ChunkInputFormatIT.java index f24aa15..d5263aa 100644 --- a/src/test/java/org/apache/accumulo/examples/filedata/ChunkInputFormatIT.java +++ b/src/test/java/org/apache/accumulo/examples/filedata/ChunkInputFormatIT.java @@ -1,343 +1,345 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.accumulo.examples.filedata; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.fail; - -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.List; -import java.util.Map.Entry; - -import org.apache.accumulo.core.client.Accumulo; -import org.apache.accumulo.core.client.AccumuloClient; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.BatchWriterConfig; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.security.ColumnVisibility; -import org.apache.accumulo.harness.AccumuloClusterHarness; -import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.Multimap; - -public class ChunkInputFormatIT extends AccumuloClusterHarness { - @Override - public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - cfg.setProperty(Property.TSERV_NATIVEMAP_ENABLED, "false"); - } - - // track errors in the map reduce job; jobs insert a dummy error for the map and cleanup tasks (to - // ensure test correctness), so error tests should check to see if there is at least one error - // (could be more depending on the test) rather than zero - private static final Multimap<String,AssertionError> assertionErrors = ArrayListMultimap.create(); - - private static final Authorizations AUTHS = new Authorizations("A", "B", "C", "D"); - - private static List<Entry<Key,Value>> data; - private static List<Entry<Key,Value>> baddata; - - private AccumuloClient client; - private String tableName; - - @BeforeEach - public void setupInstance() throws Exception { - client = Accumulo.newClient().from(getClientProps()).build(); - tableName = getUniqueNames(1)[0]; - client.securityOperations().changeUserAuthorizations(client.whoami(), AUTHS); - } - - @AfterEach - public void teardown() { - client.close(); - } - - @BeforeAll - public static void setupClass() { - System.setProperty("hadoop.tmp.dir", System.getProperty("user.dir") + "/target/hadoop-tmp"); - - data = new ArrayList<>(); - ChunkInputStreamIT.addData(data, "a", "refs", "ida\0ext", "A&B", "ext"); - ChunkInputStreamIT.addData(data, "a", "refs", "ida\0name", "A&B", "name"); - ChunkInputStreamIT.addData(data, "a", "~chunk", 100, 0, "A&B", "asdfjkl;"); - ChunkInputStreamIT.addData(data, "a", "~chunk", 100, 1, "A&B", ""); - ChunkInputStreamIT.addData(data, "b", "refs", "ida\0ext", "A&B", "ext"); - ChunkInputStreamIT.addData(data, "b", "refs", "ida\0name", "A&B", "name"); - ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 0, "A&B", "qwertyuiop"); - ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 0, "B&C", "qwertyuiop"); - ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 1, "A&B", ""); - ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 1, "B&C", ""); - ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 1, "D", ""); - baddata = new ArrayList<>(); - ChunkInputStreamIT.addData(baddata, "c", "refs", "ida\0ext", "A&B", "ext"); - ChunkInputStreamIT.addData(baddata, "c", "refs", "ida\0name", "A&B", "name"); - } - - public static void entryEquals(Entry<Key,Value> e1, Entry<Key,Value> e2) { - assertEquals(e1.getKey(), e2.getKey()); - assertEquals(e1.getValue(), e2.getValue()); - } - - public static class CIFTester extends Configured implements Tool { - public static class TestMapper - extends Mapper<List<Entry<Key,Value>>,InputStream,List<Entry<Key,Value>>,InputStream> { - int count = 0; - - @Override - protected void map(List<Entry<Key,Value>> key, InputStream value, Context context) - throws IOException { - String table = context.getConfiguration().get("MRTester_tableName"); - assertNotNull(table); - - byte[] b = new byte[20]; - int read; - try (value) { - switch (count) { - case 0: - assertEquals(key.size(), 2); - entryEquals(key.get(0), data.get(0)); - entryEquals(key.get(1), data.get(1)); - assertEquals(read = value.read(b), 8); - assertEquals(new String(b, 0, read), "asdfjkl;"); - assertEquals(read = value.read(b), -1); - break; - case 1: - assertEquals(key.size(), 2); - entryEquals(key.get(0), data.get(4)); - entryEquals(key.get(1), data.get(5)); - assertEquals(read = value.read(b), 10); - assertEquals(new String(b, 0, read), "qwertyuiop"); - assertEquals(read = value.read(b), -1); - break; - default: - fail(); - } - } catch (AssertionError e) { - assertionErrors.put(table, e); - } - count++; - } - - @Override - protected void cleanup(Context context) { - String table = context.getConfiguration().get("MRTester_tableName"); - assertNotNull(table); - - try { - assertEquals(2, count); - } catch (AssertionError e) { - assertionErrors.put(table, e); - } - } - } - - public static class TestNoClose - extends Mapper<List<Entry<Key,Value>>,InputStream,List<Entry<Key,Value>>,InputStream> { - int count = 0; - - @Override - protected void map(List<Entry<Key,Value>> key, InputStream value, Context context) - throws IOException, InterruptedException { - String table = context.getConfiguration().get("MRTester_tableName"); - assertNotNull(table); - - byte[] b = new byte[5]; - int read; - try { - switch (count) { - case 0: - assertEquals(read = value.read(b), 5); - assertEquals(new String(b, 0, read), "asdfj"); - break; - default: - fail(); - } - } catch (AssertionError e) { - assertionErrors.put(table, e); - } - count++; - try { - context.nextKeyValue(); - fail(); - } catch (IOException ioe) { - assertionErrors.put(table + "_map_ioexception", new AssertionError(toString(), ioe)); - } - } - } - - public static class TestBadData - extends Mapper<List<Entry<Key,Value>>,InputStream,List<Entry<Key,Value>>,InputStream> { - @Override - protected void map(List<Entry<Key,Value>> key, InputStream value, Context context) { - String table = context.getConfiguration().get("MRTester_tableName"); - assertNotNull(table); - - byte[] b = new byte[20]; - try { - assertEquals(key.size(), 2); - entryEquals(key.get(0), baddata.get(0)); - entryEquals(key.get(1), baddata.get(1)); - } catch (AssertionError e) { - assertionErrors.put(table, e); - } - try { - assertFalse(value.read(b) > 0); - try { - fail(); - } catch (AssertionError e) { - assertionErrors.put(table, e); - } - } catch (Exception e) { - // expected, ignore - } - try { - value.close(); - try { - fail(); - } catch (AssertionError e) { - assertionErrors.put(table, e); - } - } catch (Exception e) { - // expected, ignore - } - } - } - - @SuppressWarnings("deprecation") - @Override - public int run(String[] args) throws Exception { - if (args.length != 2) { - throw new IllegalArgumentException( - "Usage : " + CIFTester.class.getName() + " <table> <mapperClass>"); - } - - String table = args[0]; - assertionErrors.put(table, new AssertionError("Dummy")); - assertionErrors.put(table + "_map_ioexception", new AssertionError("Dummy_ioexception")); - getConf().set("MRTester_tableName", table); - - Job job = Job.getInstance(getConf()); - job.setJobName(this.getClass().getSimpleName() + "_" + System.currentTimeMillis()); - job.setJarByClass(this.getClass()); - - job.setInputFormatClass(ChunkInputFormat.class); - - ChunkInputFormat.setZooKeeperInstance(job, getCluster().getClientConfig()); - ChunkInputFormat.setConnectorInfo(job, getAdminPrincipal(), getAdminToken()); - ChunkInputFormat.setInputTableName(job, table); - ChunkInputFormat.setScanAuthorizations(job, AUTHS); - - @SuppressWarnings("unchecked") - Class<? extends Mapper<?,?,?,?>> forName = (Class<? extends Mapper<?,?,?,?>>) Class - .forName(args[1]); - job.setMapperClass(forName); - job.setMapOutputKeyClass(Key.class); - job.setMapOutputValueClass(Value.class); - job.setOutputFormatClass(NullOutputFormat.class); - - job.setNumReduceTasks(0); - - job.waitForCompletion(true); - - return job.isSuccessful() ? 0 : 1; - } - - public static int main(String... args) throws Exception { - Configuration conf = new Configuration(); - conf.set("mapreduce.framework.name", "local"); - conf.set("mapreduce.cluster.local.dir", - new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath()); - return ToolRunner.run(conf, new CIFTester(), args); - } - } - - @Test - public void test() throws Exception { - client.tableOperations().create(tableName); - BatchWriter bw = client.createBatchWriter(tableName, new BatchWriterConfig()); - - for (Entry<Key,Value> e : data) { - Key k = e.getKey(); - Mutation m = new Mutation(k.getRow()); - m.put(k.getColumnFamily(), k.getColumnQualifier(), - new ColumnVisibility(k.getColumnVisibility()), k.getTimestamp(), e.getValue()); - bw.addMutation(m); - } - bw.close(); - - assertEquals(0, CIFTester.main(tableName, CIFTester.TestMapper.class.getName())); - assertEquals(1, assertionErrors.get(tableName).size()); - } - - @Test - public void testErrorOnNextWithoutClose() throws Exception { - client.tableOperations().create(tableName); - BatchWriter bw = client.createBatchWriter(tableName, new BatchWriterConfig()); - - for (Entry<Key,Value> e : data) { - Key k = e.getKey(); - Mutation m = new Mutation(k.getRow()); - m.put(k.getColumnFamily(), k.getColumnQualifier(), - new ColumnVisibility(k.getColumnVisibility()), k.getTimestamp(), e.getValue()); - bw.addMutation(m); - } - bw.close(); - - assertEquals(1, CIFTester.main(tableName, CIFTester.TestNoClose.class.getName())); - assertEquals(1, assertionErrors.get(tableName).size()); - // this should actually exist, in addition to the dummy entry - assertEquals(2, assertionErrors.get(tableName + "_map_ioexception").size()); - } - - @Test - public void testInfoWithoutChunks() throws Exception { - client.tableOperations().create(tableName); - BatchWriter bw = client.createBatchWriter(tableName, new BatchWriterConfig()); - for (Entry<Key,Value> e : baddata) { - Key k = e.getKey(); - Mutation m = new Mutation(k.getRow()); - m.put(k.getColumnFamily(), k.getColumnQualifier(), - new ColumnVisibility(k.getColumnVisibility()), k.getTimestamp(), e.getValue()); - bw.addMutation(m); - } - bw.close(); - - assertEquals(0, CIFTester.main(tableName, CIFTester.TestBadData.class.getName())); - assertEquals(1, assertionErrors.get(tableName).size()); - } -} +/// * +// * Licensed to the Apache Software Foundation (ASF) under one or more +// * contributor license agreements. See the NOTICE file distributed with +// * this work for additional information regarding copyright ownership. +// * The ASF licenses this file to You under the Apache License, Version 2.0 +// * (the "License"); you may not use this file except in compliance with +// * the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +// +// package org.apache.accumulo.examples.filedata; +// +// import static org.junit.jupiter.api.Assertions.assertEquals; +// import static org.junit.jupiter.api.Assertions.assertFalse; +// import static org.junit.jupiter.api.Assertions.assertNotNull; +// import static org.junit.jupiter.api.Assertions.fail; +// +// import java.io.File; +// import java.io.IOException; +// import java.io.InputStream; +// import java.util.ArrayList; +// import java.util.List; +// import java.util.Map.Entry; +// +// import org.apache.accumulo.core.client.Accumulo; +// import org.apache.accumulo.core.client.AccumuloClient; +// import org.apache.accumulo.core.client.BatchWriter; +// import org.apache.accumulo.core.client.BatchWriterConfig; +// import org.apache.accumulo.core.conf.Property; +// import org.apache.accumulo.core.data.Key; +// import org.apache.accumulo.core.data.Mutation; +// import org.apache.accumulo.core.data.Value; +// import org.apache.accumulo.core.security.Authorizations; +// import org.apache.accumulo.core.security.ColumnVisibility; +// import org.apache.accumulo.harness.AccumuloClusterHarness; +// import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +// import org.apache.hadoop.conf.Configuration; +// import org.apache.hadoop.conf.Configured; +// import org.apache.hadoop.mapreduce.Job; +// import org.apache.hadoop.mapreduce.Mapper; +// import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +// import org.apache.hadoop.util.Tool; +// import org.apache.hadoop.util.ToolRunner; +// import org.junit.jupiter.api.AfterEach; +// import org.junit.jupiter.api.BeforeAll; +// import org.junit.jupiter.api.BeforeEach; +// import org.junit.jupiter.api.Test; +// +// import com.google.common.collect.ArrayListMultimap; +// import com.google.common.collect.Multimap; +// +// public class ChunkInputFormatIT extends AccumuloClusterHarness { +// @Override +// public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { +// cfg.setProperty(Property.TSERV_NATIVEMAP_ENABLED, "false"); +// } +// +// // track errors in the map reduce job; jobs insert a dummy error for the map and cleanup tasks +/// (to +// // ensure test correctness), so error tests should check to see if there is at least one error +// // (could be more depending on the test) rather than zero +// private static final Multimap<String,AssertionError> assertionErrors = +/// ArrayListMultimap.create(); +// +// private static final Authorizations AUTHS = new Authorizations("A", "B", "C", "D"); +// +// private static List<Entry<Key,Value>> data; +// private static List<Entry<Key,Value>> baddata; +// +// private AccumuloClient client; +// private String tableName; +// +// @BeforeEach +// public void setupInstance() throws Exception { +// client = Accumulo.newClient().from(getClientProps()).build(); +// tableName = getUniqueNames(1)[0]; +// client.securityOperations().changeUserAuthorizations(client.whoami(), AUTHS); +// } +// +// @AfterEach +// public void teardown() { +// client.close(); +// } +// +// @BeforeAll +// public static void setupClass() { +// System.setProperty("hadoop.tmp.dir", System.getProperty("user.dir") + "/target/hadoop-tmp"); +// +// data = new ArrayList<>(); +// ChunkInputStreamIT.addData(data, "a", "refs", "ida\0ext", "A&B", "ext"); +// ChunkInputStreamIT.addData(data, "a", "refs", "ida\0name", "A&B", "name"); +// ChunkInputStreamIT.addData(data, "a", "~chunk", 100, 0, "A&B", "asdfjkl;"); +// ChunkInputStreamIT.addData(data, "a", "~chunk", 100, 1, "A&B", ""); +// ChunkInputStreamIT.addData(data, "b", "refs", "ida\0ext", "A&B", "ext"); +// ChunkInputStreamIT.addData(data, "b", "refs", "ida\0name", "A&B", "name"); +// ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 0, "A&B", "qwertyuiop"); +// ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 0, "B&C", "qwertyuiop"); +// ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 1, "A&B", ""); +// ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 1, "B&C", ""); +// ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 1, "D", ""); +// baddata = new ArrayList<>(); +// ChunkInputStreamIT.addData(baddata, "c", "refs", "ida\0ext", "A&B", "ext"); +// ChunkInputStreamIT.addData(baddata, "c", "refs", "ida\0name", "A&B", "name"); +// } +// +// public static void entryEquals(Entry<Key,Value> e1, Entry<Key,Value> e2) { +// assertEquals(e1.getKey(), e2.getKey()); +// assertEquals(e1.getValue(), e2.getValue()); +// } +// +// public static class CIFTester extends Configured implements Tool { +// public static class TestMapper +// extends Mapper<List<Entry<Key,Value>>,InputStream,List<Entry<Key,Value>>,InputStream> { +// int count = 0; +// +// @Override +// protected void map(List<Entry<Key,Value>> key, InputStream value, Context context) +// throws IOException { +// String table = context.getConfiguration().get("MRTester_tableName"); +// assertNotNull(table); +// +// byte[] b = new byte[20]; +// int read; +// try (value) { +// switch (count) { +// case 0: +// assertEquals(key.size(), 2); +// entryEquals(key.get(0), data.get(0)); +// entryEquals(key.get(1), data.get(1)); +// assertEquals(read = value.read(b), 8); +// assertEquals(new String(b, 0, read), "asdfjkl;"); +// assertEquals(read = value.read(b), -1); +// break; +// case 1: +// assertEquals(key.size(), 2); +// entryEquals(key.get(0), data.get(4)); +// entryEquals(key.get(1), data.get(5)); +// assertEquals(read = value.read(b), 10); +// assertEquals(new String(b, 0, read), "qwertyuiop"); +// assertEquals(read = value.read(b), -1); +// break; +// default: +// fail(); +// } +// } catch (AssertionError e) { +// assertionErrors.put(table, e); +// } +// count++; +// } +// +// @Override +// protected void cleanup(Context context) { +// String table = context.getConfiguration().get("MRTester_tableName"); +// assertNotNull(table); +// +// try { +// assertEquals(2, count); +// } catch (AssertionError e) { +// assertionErrors.put(table, e); +// } +// } +// } +// +// public static class TestNoClose +// extends Mapper<List<Entry<Key,Value>>,InputStream,List<Entry<Key,Value>>,InputStream> { +// int count = 0; +// +// @Override +// protected void map(List<Entry<Key,Value>> key, InputStream value, Context context) +// throws IOException, InterruptedException { +// String table = context.getConfiguration().get("MRTester_tableName"); +// assertNotNull(table); +// +// byte[] b = new byte[5]; +// int read; +// try { +// switch (count) { +// case 0: +// assertEquals(read = value.read(b), 5); +// assertEquals(new String(b, 0, read), "asdfj"); +// break; +// default: +// fail(); +// } +// } catch (AssertionError e) { +// assertionErrors.put(table, e); +// } +// count++; +// try { +// context.nextKeyValue(); +// fail(); +// } catch (IOException ioe) { +// assertionErrors.put(table + "_map_ioexception", new AssertionError(toString(), ioe)); +// } +// } +// } +// +// public static class TestBadData +// extends Mapper<List<Entry<Key,Value>>,InputStream,List<Entry<Key,Value>>,InputStream> { +// @Override +// protected void map(List<Entry<Key,Value>> key, InputStream value, Context context) { +// String table = context.getConfiguration().get("MRTester_tableName"); +// assertNotNull(table); +// +// byte[] b = new byte[20]; +// try { +// assertEquals(key.size(), 2); +// entryEquals(key.get(0), baddata.get(0)); +// entryEquals(key.get(1), baddata.get(1)); +// } catch (AssertionError e) { +// assertionErrors.put(table, e); +// } +// try { +// assertFalse(value.read(b) > 0); +// try { +// fail(); +// } catch (AssertionError e) { +// assertionErrors.put(table, e); +// } +// } catch (Exception e) { +// // expected, ignore +// } +// try { +// value.close(); +// try { +// fail(); +// } catch (AssertionError e) { +// assertionErrors.put(table, e); +// } +// } catch (Exception e) { +// // expected, ignore +// } +// } +// } +// +// @SuppressWarnings("deprecation") +// @Override +// public int run(String[] args) throws Exception { +// if (args.length != 2) { +// throw new IllegalArgumentException( +// "Usage : " + CIFTester.class.getName() + " <table> <mapperClass>"); +// } +// +// String table = args[0]; +// assertionErrors.put(table, new AssertionError("Dummy")); +// assertionErrors.put(table + "_map_ioexception", new AssertionError("Dummy_ioexception")); +// getConf().set("MRTester_tableName", table); +// +// Job job = Job.getInstance(getConf()); +// job.setJobName(this.getClass().getSimpleName() + "_" + System.currentTimeMillis()); +// job.setJarByClass(this.getClass()); +// +// job.setInputFormatClass(ChunkInputFormat.class); +// +// ChunkInputFormat.setZooKeeperInstance(job, getCluster().getClientConfig()); +// ChunkInputFormat.setConnectorInfo(job, getAdminPrincipal(), getAdminToken()); +// ChunkInputFormat.setInputTableName(job, table); +// ChunkInputFormat.setScanAuthorizations(job, AUTHS); +// +// @SuppressWarnings("unchecked") +// Class<? extends Mapper<?,?,?,?>> forName = (Class<? extends Mapper<?,?,?,?>>) Class +// .forName(args[1]); +// job.setMapperClass(forName); +// job.setMapOutputKeyClass(Key.class); +// job.setMapOutputValueClass(Value.class); +// job.setOutputFormatClass(NullOutputFormat.class); +// +// job.setNumReduceTasks(0); +// +// job.waitForCompletion(true); +// +// return job.isSuccessful() ? 0 : 1; +// } +// +// public static int main(String... args) throws Exception { +// Configuration conf = new Configuration(); +// conf.set("mapreduce.framework.name", "local"); +// conf.set("mapreduce.cluster.local.dir", +// new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath()); +// return ToolRunner.run(conf, new CIFTester(), args); +// } +// } +// +// @Test +// public void test() throws Exception { +// client.tableOperations().create(tableName); +// BatchWriter bw = client.createBatchWriter(tableName, new BatchWriterConfig()); +// +// for (Entry<Key,Value> e : data) { +// Key k = e.getKey(); +// Mutation m = new Mutation(k.getRow()); +// m.put(k.getColumnFamily(), k.getColumnQualifier(), +// new ColumnVisibility(k.getColumnVisibility()), k.getTimestamp(), e.getValue()); +// bw.addMutation(m); +// } +// bw.close(); +// +// assertEquals(0, CIFTester.main(tableName, CIFTester.TestMapper.class.getName())); +// assertEquals(1, assertionErrors.get(tableName).size()); +// } +// +// @Test +// public void testErrorOnNextWithoutClose() throws Exception { +// client.tableOperations().create(tableName); +// BatchWriter bw = client.createBatchWriter(tableName, new BatchWriterConfig()); +// +// for (Entry<Key,Value> e : data) { +// Key k = e.getKey(); +// Mutation m = new Mutation(k.getRow()); +// m.put(k.getColumnFamily(), k.getColumnQualifier(), +// new ColumnVisibility(k.getColumnVisibility()), k.getTimestamp(), e.getValue()); +// bw.addMutation(m); +// } +// bw.close(); +// +// assertEquals(1, CIFTester.main(tableName, CIFTester.TestNoClose.class.getName())); +// assertEquals(1, assertionErrors.get(tableName).size()); +// // this should actually exist, in addition to the dummy entry +// assertEquals(2, assertionErrors.get(tableName + "_map_ioexception").size()); +// } +// +// @Test +// public void testInfoWithoutChunks() throws Exception { +// client.tableOperations().create(tableName); +// BatchWriter bw = client.createBatchWriter(tableName, new BatchWriterConfig()); +// for (Entry<Key,Value> e : baddata) { +// Key k = e.getKey(); +// Mutation m = new Mutation(k.getRow()); +// m.put(k.getColumnFamily(), k.getColumnQualifier(), +// new ColumnVisibility(k.getColumnVisibility()), k.getTimestamp(), e.getValue()); +// bw.addMutation(m); +// } +// bw.close(); +// +// assertEquals(0, CIFTester.main(tableName, CIFTester.TestBadData.class.getName())); +// assertEquals(1, assertionErrors.get(tableName).size()); +// } +// }