This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 268c1d0 (chores) camel-file: prevent leaking the array with the input files 268c1d0 is described below commit 268c1d01ccdfa12c376763b83df5131a88e51dbc Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Thu Oct 14 11:00:08 2021 +0200 (chores) camel-file: prevent leaking the array with the input files --- .../apache/camel/component/file/FileConsumer.java | 8 +-- .../file/consumer/FileConsumerResumeStrategy.java | 4 +- .../component/file/consumer/FileResumeInfo.java | 43 ------------- .../component/file/consumer/FileResumeSet.java | 74 ++++++++++++++++++++++ .../FileConsumerResumeFromOffsetStrategyTest.java | 8 +-- .../file/FileConsumerResumeStrategyTest.java | 20 +----- 6 files changed, 86 insertions(+), 71 deletions(-) diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java b/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java index f38f0ea..c4d2d20 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java @@ -32,7 +32,7 @@ import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.component.file.consumer.FileConsumerResumeStrategy; -import org.apache.camel.component.file.consumer.FileResumeInfo; +import org.apache.camel.component.file.consumer.FileResumeSet; import org.apache.camel.util.FileUtil; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; @@ -172,10 +172,10 @@ public class FileConsumer extends GenericFileConsumer<File> { } if (resumeStrategy != null) { - FileResumeInfo resumeInfo = new FileResumeInfo(dirFiles); - resumeStrategy.resume(resumeInfo); + FileResumeSet resumeSet = new FileResumeSet(dirFiles); + resumeStrategy.resume(resumeSet); - return resumeInfo.getOutputFiles(); + return resumeSet.hasResumables() ? resumeSet.resumedFiles() : dirFiles; } return dirFiles; diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileConsumerResumeStrategy.java b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileConsumerResumeStrategy.java index eedd3e2..ccd4c05 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileConsumerResumeStrategy.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileConsumerResumeStrategy.java @@ -24,7 +24,7 @@ import org.apache.camel.ResumeStrategy; /** * Defines resume strategy for consumers of the file component. */ -public interface FileConsumerResumeStrategy extends ResumeStrategy<FileResumeInfo> { +public interface FileConsumerResumeStrategy extends ResumeStrategy<FileResumeSet> { /** * Returns the last offset read for the given file. @@ -40,5 +40,5 @@ public interface FileConsumerResumeStrategy extends ResumeStrategy<FileResumeInf * @param resumeInfo resume information */ @Override - void resume(FileResumeInfo resumeInfo); + void resume(FileResumeSet resumeInfo); } diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeInfo.java b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeInfo.java deleted file mode 100644 index ae6e0eb..0000000 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeInfo.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.camel.component.file.consumer; - -import java.io.File; -import java.util.Objects; - -public final class FileResumeInfo { - private final File[] inputFiles; - private File[] outputFiles; - - public FileResumeInfo(File[] inputFiles) { - Objects.requireNonNull(inputFiles, "A list of input files must be provided for the resume info"); - this.inputFiles = inputFiles; - } - - public File[] getInputFiles() { - return inputFiles; - } - - public File[] getOutputFiles() { - return outputFiles; - } - - public void setOutputFiles(File[] outputFiles) { - this.outputFiles = outputFiles; - } -} diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeSet.java b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeSet.java new file mode 100644 index 0000000..738dafe --- /dev/null +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeSet.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.file.consumer; + +import java.io.File; +import java.util.Arrays; +import java.util.Objects; +import java.util.function.Function; + +/** + * This contains the input/output file set for resume operations. + */ +public final class FileResumeSet { + private final File[] inputFiles; + private File[] outputFiles; + + public FileResumeSet(File[] inputFiles) { + Objects.requireNonNull(inputFiles, "A list of input files must be provided for the resume info"); + this.inputFiles = inputFiles; + } + + /** + * Iterates over the set of input files checking if they should be resumed or not + * + * @param resumableCheck a checker method that returns true if the file should be resumed or false otherwise + */ + public void resumeEach(Function<File, Boolean> resumableCheck) { + this.outputFiles = null; + File[] tmp = Arrays.copyOf(inputFiles, inputFiles.length); + int count = 0; + + for (File file : inputFiles) { + if (resumableCheck.apply(file)) { + tmp[count] = file; + count++; + } + } + + this.outputFiles = Arrays.copyOf(tmp, count); + } + + /** + * Gets the files that should be resumed + * + * @return an array with the files that should be resumed + */ + public File[] resumedFiles() { + return outputFiles; + } + + /** + * Whether there are resumable files to process + * + * @return true if there are resumable files or false otherwise + */ + public boolean hasResumables() { + return outputFiles != null && outputFiles.length > 0; + } +} diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java index 8ddc246..ac18a95 100644 --- a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java @@ -23,7 +23,7 @@ import org.apache.camel.Exchange; import org.apache.camel.RuntimeCamelException; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.file.consumer.FileConsumerResumeStrategy; -import org.apache.camel.component.file.consumer.FileResumeInfo; +import org.apache.camel.component.file.consumer.FileResumeSet; import org.apache.camel.component.mock.MockEndpoint; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -45,10 +45,8 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport } @Override - public void resume(FileResumeInfo resumeInfo) { - File[] input = resumeInfo.getInputFiles(); - - resumeInfo.setOutputFiles(input); + public void resume(FileResumeSet resumeSet) { + // NO-OP } } diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java index 7ecbff5..0269485 100644 --- a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java @@ -24,7 +24,7 @@ import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.file.consumer.FileConsumerResumeStrategy; -import org.apache.camel.component.file.consumer.FileResumeInfo; +import org.apache.camel.component.file.consumer.FileResumeSet; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.util.IOHelper; import org.junit.jupiter.api.Test; @@ -42,24 +42,10 @@ public class FileConsumerResumeStrategyTest extends ContextTestSupport { } @Override - public void resume(FileResumeInfo resumeInfo) { + public void resume(FileResumeSet resumeSet) { List<String> processedFiles = Arrays.asList("0.txt", "1.txt", "2.txt"); - int count = 0; - File[] input = resumeInfo.getInputFiles(); - File[] tmp = Arrays.copyOf(input, input.length); - - for (File file : resumeInfo.getInputFiles()) { - if (!processedFiles.contains(file.getName())) { - LOG.info("Including file {}", file); - tmp[count] = file; - count++; - } else { - LOG.info("Skipping file {}", file); - } - } - - resumeInfo.setOutputFiles(Arrays.copyOf(tmp, count)); + resumeSet.resumeEach(f -> !processedFiles.contains(f.getName())); } }