[
https://issues.apache.org/jira/browse/MAPREDUCE-7435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17728456#comment-17728456
]
ASF GitHub Bot commented on MAPREDUCE-7435:
-------------------------------------------
steveloughran commented on code in PR #5519:
URL: https://github.com/apache/hadoop/pull/5519#discussion_r1213540062
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/TestEntryFileIO.java:
##########
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import
org.apache.hadoop.mapreduce.lib.output.committer.manifest.AbstractManifestCommitterTest;
+import
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
+import org.apache.hadoop.util.functional.TaskPool;
+
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.apache.hadoop.util.functional.RemoteIterators.foreach;
+import static
org.apache.hadoop.util.functional.RemoteIterators.rangeExcludingIterator;
+
+/**
+ * Test {@link EntryFileIO}.
+ */
+public class TestEntryFileIO extends AbstractManifestCommitterTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ TestEntryFileIO.class);
+
+ /**
+ * Entry to save.
+ */
+ public static final FileEntry ENTRY = new FileEntry("source", "dest", 100,
"etag");
+
+ /**
+ * Entry file instance.
+ */
+ private EntryFileIO entryFileIO;
+
+ /**
+ * Path to a test entry file.
+ */
+ private File entryFile;
+
+ /**
+ * Create an entry file during setup.
+ */
+ @Before
+ public void setup() throws Exception {
+ entryFileIO = new EntryFileIO(new Configuration());
+ createEntryFile();
+ }
+
+ /**
+ * Teardown deletes any entry file.
+ * @throws Exception on any failure
+ */
+ @After
+ public void teardown() throws Exception {
+ Thread.currentThread().setName("teardown");
+ if (getEntryFile() != null) {
+ getEntryFile().delete();
+ }
+ }
+
+ /**
+ * Create a temp entry file and set the entryFile field to it.
+ * @throws IOException creation failure
+ */
+ private void createEntryFile() throws IOException {
+ setEntryFile(File.createTempFile("entry", ".seq"));
+ }
+
+ /**
+ * reference to any temp file created.
+ */
+ private File getEntryFile() {
+ return entryFile;
+ }
+
+ private void setEntryFile(File entryFile) {
+ this.entryFile = entryFile;
+ }
+
+ /**
+ * Create a file with one entry, then read it back
+ * via all the mechanisms available.
+ */
+ @Test
+ public void testCreateWriteReadFileOneEntry() throws Throwable {
+
+ final FileEntry source = ENTRY;
+
+ // do an explicit close to help isolate any failure.
+ SequenceFile.Writer writer = createWriter();
+ writer.append(NullWritable.get(), source);
+ writer.flush();
+ writer.close();
+
+ FileEntry readBack = new FileEntry();
+ try (SequenceFile.Reader reader = readEntryFile()) {
+ reader.next(NullWritable.get(), readBack);
+ }
+ Assertions.assertThat(readBack)
+ .describedAs("entry read back from sequence file")
+ .isEqualTo(source);
+
+ // now use the iterator to access it.
+ final RemoteIterator<FileEntry> it =
+ iterateOverEntryFile();
+ List<FileEntry> files = new ArrayList<>();
+ foreach(it, files::add);
+ Assertions.assertThat(files)
+ .describedAs("iteration over the entry file")
+ .hasSize(1)
+ .element(0)
+ .isEqualTo(source);
+ final EntryFileIO.EntryIterator et = (EntryFileIO.EntryIterator) it;
+ Assertions.assertThat(et)
+ .describedAs("entry iterator %s", et)
+ .matches(p -> p.isClosed())
+ .extracting(p -> p.getCount())
+ .isEqualTo(1);
+ }
+
+ /**
+ * Create a writer.
+ * @return a writer
+ * @throws IOException failure to create the file.
+ */
+ private SequenceFile.Writer createWriter() throws IOException {
+ return entryFileIO.createWriter(getEntryFile());
+ }
+
+ /**
+ * Create an iterator over the records in the (non empty) entry file.
+ * @return an iterator over entries.
+ * @throws IOException failure to open the file
+ */
+ private RemoteIterator<FileEntry> iterateOverEntryFile() throws IOException {
+ return entryFileIO.iterateOver(readEntryFile());
+ }
+
+ /**
+ * Create a reader for the (non empty) entry file.
+ * @return a reader.
+ * @throws IOException failure to open the file
+ */
+ private SequenceFile.Reader readEntryFile() throws IOException {
+ assertEntryFileNonEmpty();
+
+ return entryFileIO.createReader(getEntryFile());
+ }
+
+ /**
+ * Create a file with one entry.
+ */
+ @Test
+ public void testCreateEmptyFile() throws Throwable {
+
+ final File file = getEntryFile();
+
+ entryFileIO.createWriter(file).close();
+
+ // now use the iterator to access it.
+ List<FileEntry> files = new ArrayList<>();
+ Assertions.assertThat(foreach(iterateOverEntryFile(), files::add))
+ .isEqualTo(0);
Review Comment:
added a description
> ManifestCommitter OOM on azure job
> ----------------------------------
>
> Key: MAPREDUCE-7435
> URL: https://issues.apache.org/jira/browse/MAPREDUCE-7435
> Project: Hadoop Map/Reduce
> Issue Type: Bug
> Components: client
> Affects Versions: 3.3.5
> Reporter: Steve Loughran
> Assignee: Steve Loughran
> Priority: Major
> Labels: pull-request-available
>
> I've got some reports of spark jobs OOM if the manifest committer is used
> through abfs.
> either the manifests are using too much memory, or something is not working
> with azure stream memory use (or both).
> before proposing a solution, first step should be to write a test to load
> many, many manifests, each with lots of dirs and files to see what breaks.
> note: we did have OOM issues with the s3a committer, on teragen but those
> structures have to include every etag of every block, so the manifest size is
> O(blocks); the new committer is O(files + dirs).
> {code}
> java.lang.OutOfMemoryError: Java heap space
> at
> org.apache.hadoop.fs.azurebfs.services.AbfsInputStream.readOneBlock(AbfsInputStream.java:314)
> at
> org.apache.hadoop.fs.azurebfs.services.AbfsInputStream.read(AbfsInputStream.java:267)
> at java.io.DataInputStream.read(DataInputStream.java:149)
> at
> com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.ensureLoaded(ByteSourceJsonBootstrapper.java:539)
> at
> com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.detectEncoding(ByteSourceJsonBootstrapper.java:133)
> at
> com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.constructParser(ByteSourceJsonBootstrapper.java:256)
> at com.fasterxml.jackson.core.JsonFactory._createParser(JsonFactory.java:1656)
> at com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:1085)
> at
> com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3585)
> at
> org.apache.hadoop.util.JsonSerialization.fromJsonStream(JsonSerialization.java:164)
> at org.apache.hadoop.util.JsonSerialization.load(JsonSerialization.java:279)
> at
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest.load(TaskManifest.java:361)
> at
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperationsThroughFileSystem.loadTaskManifest(ManifestStoreOperationsThroughFileSystem.java:133)
> at
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbstractJobOrTaskStage.lambda$loadManifest$6(AbstractJobOrTaskStage.java:493)
> at
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbstractJobOrTaskStage$$Lambda$231/1813048085.apply(Unknown
> Source)
> at
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(IOStatisticsBinding.java:543)
> at
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:524)
> at
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding$$Lambda$217/489150849.apply(Unknown
> Source)
> at
> org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:445)
> at
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbstractJobOrTaskStage.loadManifest(AbstractJobOrTaskStage.java:492)
> at
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.LoadManifestsStage.fetchTaskManifest(LoadManifestsStage.java:170)
> at
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.LoadManifestsStage.processOneManifest(LoadManifestsStage.java:138)
> at
> org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.LoadManifestsStage$$Lambda$229/137752948.run(Unknown
> Source)
> at
> org.apache.hadoop.util.functional.TaskPool$Builder.lambda$runParallel$0(TaskPool.java:410)
> at
> org.apache.hadoop.util.functional.TaskPool$Builder$$Lambda$230/467893357.run(Unknown
> Source)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:750)
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]