This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 8ce3f5a45280b5822e632a65e579d012a7f4106b Author: Jan Bednář <m...@janbednar.eu> AuthorDate: Mon Jul 8 20:21:41 2019 +0200 Revert "CAMEL-13628: Dropped directory-watcher dependency and related stuff" This reverts commit 15b5f846 --- components/camel-file-watch/pom.xml | 7 +- .../src/main/docs/file-watch-component.adoc | 20 +-- .../component/file/watch/FileWatchComponent.java | 14 ++ .../component/file/watch/FileWatchConsumer.java | 160 ++++++++------------- .../component/file/watch/FileWatchEndpoint.java | 56 ++++++++ .../component/file/watch/constants/FileEvent.java | 11 +- .../file/watch/constants/FileEventEnum.java | 32 +++++ .../file/watch/utils/WatchServiceUtils.java | 40 ------ .../watch/FileWatchComponentRecursiveTest.java | 60 ++++++++ .../file/watch/FileWatchComponentTest.java | 40 ++++-- .../file/watch/FileWatchComponentTestBase.java | 6 +- .../file/watch/SpringFileWatcherTest.java | 13 +- .../camel/component/file/watch/TestHasher.java} | 73 +++++----- .../file/watch/SpringFileWatchComponentTest.xml | 7 + docs/components/modules/ROOT/nav.adoc | 1 - .../modules/ROOT/pages/file-watch-component.adoc | 20 +-- parent/pom.xml | 8 ++ .../karaf/features/src/main/resources/features.xml | 1 + 18 files changed, 355 insertions(+), 214 deletions(-) diff --git a/components/camel-file-watch/pom.xml b/components/camel-file-watch/pom.xml index 16ad35e..d366f9f 100644 --- a/components/camel-file-watch/pom.xml +++ b/components/camel-file-watch/pom.xml @@ -39,6 +39,11 @@ <artifactId>camel-support</artifactId> </dependency> + <dependency> + <groupId>io.methvin</groupId> + <artifactId>directory-watcher</artifactId> + </dependency> + <!-- testing --> <dependency> <groupId>org.apache.camel</groupId> @@ -61,4 +66,4 @@ <scope>test</scope> </dependency> </dependencies> -</project> +</project> \ No newline at end of file diff --git a/components/camel-file-watch/src/main/docs/file-watch-component.adoc b/components/camel-file-watch/src/main/docs/file-watch-component.adoc index bff1bf1..ca17bee 100644 --- a/components/camel-file-watch/src/main/docs/file-watch-component.adoc +++ b/components/camel-file-watch/src/main/docs/file-watch-component.adoc @@ -3,9 +3,7 @@ *Available as of Camel version 3.0* -This component can be used to watch modification events in folder. It is based on java.nio.file.WatchService. -Be aware that WatchService can behave differently on different platforms. -The produced message can contain either file or directory. +This component can be used to watch file modification events in folder. It is based on https://github.com/gmethvin/directory-watcher[gmethvin/directory-watcher]. === URI Options @@ -42,7 +40,7 @@ with the following path and query parameters: |=== -==== Query Parameters (10 parameters): +==== Query Parameters (14 parameters): [width="100%",cols="2,5,^1,2",options="header"] @@ -53,7 +51,11 @@ with the following path and query parameters: | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean | *concurrentConsumers* (consumer) | The number of concurrent consumers. Increase this value, if your route is slow to prevent buffering in queue. | 1 | int | *events* (consumer) | Coma separated list of events to watch. Allowed values are: CREATE, MODIFY, DELETE. | CREATE,MODIFY,DELETE | Set +| *fileHasher* (consumer) | Reference to io.methvin.watcher.hashing.FileHasher. This prevents emitting duplicate events on some platforms. For working with large files and if you dont need detect multiple modifications per second per file, use #lastModifiedTimeFileHasher. You can also provide custom implementation in registry. | #murmur3FFileHasher | FileHasher +| *pollThreads* (consumer) | The number of threads polling WatchService. Increase this value, if you see OVERFLOW messages in log. | 1 | int | *queueSize* (consumer) | Maximum size of queue between WatchService and consumer. Unbounded by default. | 2147483647 | int +| *recursive* (consumer) | Watch recursive in current and child directories (including newly created directories). | true | boolean +| *useFileHashing* (consumer) | Enables or disables file hashing to detect duplicate events. If you disable this, you can get some events multiple times on some platforms and JDKs. Check java.nio.file.WatchService limitations for your target platform. | true | boolean | *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. | | ExceptionHandler | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. | | ExchangePattern | *basicPropertyBinding* (advanced) | Whether the endpoint should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities | false | boolean @@ -92,26 +94,26 @@ The component supports 3 options, which are listed below. === Examples: -==== Watch all events (file creation, file deletion, file modification): +==== Recursive watch all events (file creation, file deletion, file modification): [source,java] ---- from("file-watch://some-directory") .log("File event: ${header.CamelFileEventType} occurred on file ${header.CamelFileName} at ${header.CamelFileLastModified}"); ---- -==== Watch for creation and deletion of txt files: +==== Recursive watch for creation and deletion of txt files: [source,java] ---- -from("file-watch://some-directory?events=DELETE,CREATE&antInclude=*.txt") +from("file-watch://some-directory?events=DELETE,CREATE&antInclude=**/*.txt") .log("File event: ${header.CamelFileEventType} occurred on file ${header.CamelFileName} at ${header.CamelFileLastModified}"); ---- ==== Create snapshot of file when modified: [source,java] ---- -from("file-watch://some-directory?events=MODIFY") +from("file-watch://some-directory?events=MODIFY&recursive=false") .setHeader(Exchange.FILE_NAME, simple("${header.CamelFileName}.${header.CamelFileLastModified}")) - .to("file:snapshots"); + .to("file:some-directory/snapshots"); ---- === Message Headers diff --git a/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/FileWatchComponent.java b/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/FileWatchComponent.java index de633d8..03bafa6 100644 --- a/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/FileWatchComponent.java +++ b/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/FileWatchComponent.java @@ -18,7 +18,9 @@ package org.apache.camel.component.file.watch; import java.util.Map; +import io.methvin.watcher.hashing.FileHasher; import org.apache.camel.Endpoint; +import org.apache.camel.spi.Registry; import org.apache.camel.spi.annotations.Component; import org.apache.camel.support.DefaultComponent; @@ -34,4 +36,16 @@ public class FileWatchComponent extends DefaultComponent { setProperties(endpoint, parameters); return endpoint; } + + @Override + protected void doStart() throws Exception { + super.doStart(); + Registry registry = getCamelContext().getRegistry(); + if (registry.lookupByNameAndType("murmur3FFileHasher", FileHasher.class) == null) { + registry.bind("murmur3FFileHasher", FileHasher.class, FileHasher.DEFAULT_FILE_HASHER); + } + if (registry.lookupByNameAndType("lastModifiedTimeFileHasher", FileHasher.class) == null) { + registry.bind("lastModifiedTimeFileHasher", FileHasher.class, FileHasher.LAST_MODIFIED_TIME); + } + } } diff --git a/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/FileWatchConsumer.java b/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/FileWatchConsumer.java index dce656a..ece2040 100644 --- a/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/FileWatchConsumer.java +++ b/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/FileWatchConsumer.java @@ -18,29 +18,23 @@ package org.apache.camel.component.file.watch; import java.io.File; import java.io.IOException; -import java.lang.reflect.Field; -import java.nio.file.ClosedWatchServiceException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.nio.file.StandardWatchEventKinds; -import java.nio.file.WatchEvent; -import java.nio.file.WatchKey; -import java.nio.file.WatchService; -import java.util.HashSet; -import java.util.Set; +import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import io.methvin.watcher.DirectoryChangeEvent; +import io.methvin.watcher.DirectoryChangeListener; +import io.methvin.watcher.DirectoryWatcher; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.RuntimeCamelException; import org.apache.camel.component.file.watch.constants.FileEvent; -import org.apache.camel.component.file.watch.constants.FileEventEnum; import org.apache.camel.component.file.watch.utils.PathUtils; -import org.apache.camel.component.file.watch.utils.WatchServiceUtils; import org.apache.camel.support.DefaultConsumer; import org.apache.camel.util.AntPathMatcher; @@ -49,14 +43,12 @@ import org.apache.camel.util.AntPathMatcher; */ public class FileWatchConsumer extends DefaultConsumer { - private WatchService watchService; private ExecutorService watchDirExecutorService; private ExecutorService pollExecutorService; private LinkedBlockingQueue<FileEvent> eventQueue; private Path baseDirectory; private AntPathMatcher antPathMatcher; - private WatchEvent.Kind[] kinds; - + private DirectoryWatcher watcher; public FileWatchConsumer(FileWatchEndpoint endpoint, Processor processor) { super(endpoint, processor); @@ -68,16 +60,6 @@ public class FileWatchConsumer extends DefaultConsumer { antPathMatcher = new AntPathMatcher(); baseDirectory = Paths.get(getEndpoint().getPath()); - - Set<FileEventEnum> events = new HashSet<>(endpoint.getEvents()); - - kinds = new WatchEvent.Kind[endpoint.getEvents().size() + 1]; - kinds[0] = StandardWatchEventKinds.OVERFLOW; //always watch Overflow event for logging purposes - int i = 0; - for (FileEventEnum fileEventEnum: events) { - kinds[i + 1] = fileEventEnum.kind(); - i++; - } } @Override @@ -96,59 +78,43 @@ public class FileWatchConsumer extends DefaultConsumer { throw new RuntimeCamelException(String.format("Parameter path must be directory, %s given", baseDirectory.toString())); } - watchService = baseDirectory.getFileSystem().newWatchService(); + DirectoryWatcher.Builder watcherBuilder = DirectoryWatcher.builder() + .path(this.baseDirectory) + .logger(log) + .listener(new FileWatchDirectoryChangeListener()); + + if (!System.getProperty("os.name").toLowerCase().contains("mac")) { + // If not macOS, use FileSystem WatchService. io.methvin.watcher uses by default WatchService associated to default FileSystem. + // We need per FileSystem WatchService, to allow monitoring on machine with multiple file systems. + // Keep default for macOS + watcherBuilder.watchService(this.baseDirectory.getFileSystem().newWatchService()); + } + + watcherBuilder.fileHashing(getEndpoint().isUseFileHashing()); + if (getEndpoint().getFileHasher() != null && getEndpoint().isUseFileHashing()) { + watcherBuilder.fileHasher(getEndpoint().getFileHasher()); + } + + this.watcher = watcherBuilder.build(); watchDirExecutorService = getEndpoint().getCamelContext().getExecutorServiceManager() - .newSingleThreadExecutor(this, "CamelFileWatchService"); + .newFixedThreadPool(this, "CamelFileWatchService", getEndpoint().getPollThreads()); pollExecutorService = getEndpoint().getCamelContext().getExecutorServiceManager() .newFixedThreadPool(this, "CamelFileWatchPoll", getEndpoint().getConcurrentConsumers()); - register(baseDirectory); - - watchDirExecutorService.submit(new WatchServiceRunnable()); - + for (int i = 0; i < getEndpoint().getPollThreads(); i++) { + this.watcher.watchAsync(watchDirExecutorService); + } for (int i = 0; i < getEndpoint().getConcurrentConsumers(); i++) { pollExecutorService.submit(new PollRunnable()); } } - private void register(Path path) throws IOException { - boolean registered = false; - if (WatchServiceUtils.isPollingWatchService(watchService)) { - try { - // Find enum value SensitivityWatchEventModifier.HIGH using reflection to avoid importing com.sun packages - Class<?> sensitivityWatchEventModifierClass = getEndpoint().getCamelContext() - .getClassResolver().resolveClass("com.sun.nio.file.SensitivityWatchEventModifier"); - if (sensitivityWatchEventModifierClass != null) { - Field enumConstantField = sensitivityWatchEventModifierClass.getDeclaredField("HIGH"); - WatchEvent.Modifier sensitivityModifier = (WatchEvent.Modifier) enumConstantField.get(null); - - if (sensitivityModifier != null) { - path.register(watchService, kinds, sensitivityModifier); - registered = true; - } - } - } catch (IllegalAccessException | NoSuchFieldException ignored) { - // This is expected on JVMs where PollingWatchService or SensitivityWatchEventModifier are not available - } - } - - if (!registered) { - path.register(watchService, kinds); - } - } - @Override protected void doStop() throws Exception { - if (watchService != null) { - try { - watchService.close(); - log.info("WatchService closed"); - } catch (IOException e) { - log.info("Cannot close WatchService", e); - } + if (this.watcher != null) { + this.watcher.close(); } - if (watchDirExecutorService != null) { getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(watchDirExecutorService); } @@ -197,6 +163,19 @@ public class FileWatchConsumer extends DefaultConsumer { return false; } + if (!getEndpoint().isRecursive()) { + // On some platforms (eg macOS) is WatchService always recursive, + // so we need to filter this out to make this component platform independent + try { + if (!Files.isSameFile(fileEvent.getEventPath().getParent(), this.baseDirectory)) { + return false; + } + } catch (IOException e) { + log.warn(String.format("Exception occurred during executing filter. Filtering file %s out.", fileEvent.getEventPath()), e); + return false; + } + } + String pattern = getEndpoint().getAntInclude(); if (pattern == null || pattern.trim().isEmpty()) { return true; @@ -213,50 +192,27 @@ public class FileWatchConsumer extends DefaultConsumer { return (FileWatchEndpoint) super.getEndpoint(); } - class WatchServiceRunnable implements Runnable { - WatchKey watchKey; - + class FileWatchDirectoryChangeListener implements DirectoryChangeListener { @Override - public void run() { - while (take() && isRunAllowed() && !isStoppingOrStopped() && !isSuspendingOrSuspended()) { - for (WatchEvent<?> event : watchKey.pollEvents()) { - if (event.kind().equals(StandardWatchEventKinds.OVERFLOW)) { - log.warn("OVERFLOW occurred. Some events may be lost"); - continue; - } - - Path base = (Path) watchKey.watchable(); - WatchEvent<Path> eventCast = cast(event); - FileEvent fileEvent = new FileEvent(FileEventEnum.valueOf(eventCast.kind()), base.resolve(eventCast.context())); - - if (matchFilters(fileEvent)) { - eventQueue.offer(fileEvent); - } - } + public void onEvent(DirectoryChangeEvent directoryChangeEvent) { + if (directoryChangeEvent.eventType() == DirectoryChangeEvent.EventType.OVERFLOW) { + log.warn("OVERFLOW occurred, some events may be lost. Consider increasing of option 'pollThreads'"); + return; + } + FileEvent fileEvent = new FileEvent(directoryChangeEvent); + if (matchFilters(fileEvent)) { + eventQueue.offer(fileEvent); } } - private boolean take() { - if (watchKey != null && !watchKey.reset()) { - log.info("WatchDirRunnable stopping, because watchKey is in invalid state"); - return false; - } - try { - watchKey = watchService.take(); - return true; - } catch (ClosedWatchServiceException | InterruptedException e) { - log.info("WatchDirRunnable stopping because " + e.getClass().getSimpleName() + ": " + e.getMessage()); - return false; - } + @Override + public boolean isWatching() { + return !isStoppingOrStopped() && !isSuspendingOrSuspended(); } - } - @SuppressWarnings("unchecked") - private WatchEvent<Path> cast(WatchEvent<?> event) { - if (event != null && event.kind().type() == Path.class) { - return (WatchEvent<Path>) event; - } else { - throw new ClassCastException("Cannot cast " + event + " to WatchEvent<Path>"); + @Override + public void onException(Exception e) { + handleException(e); } } @@ -282,6 +238,4 @@ public class FileWatchConsumer extends DefaultConsumer { } } } - - } diff --git a/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/FileWatchEndpoint.java b/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/FileWatchEndpoint.java index c366668..d56b983 100644 --- a/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/FileWatchEndpoint.java +++ b/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/FileWatchEndpoint.java @@ -20,6 +20,7 @@ import java.util.Arrays; import java.util.HashSet; import java.util.Set; +import io.methvin.watcher.hashing.FileHasher; import org.apache.camel.Consumer; import org.apache.camel.MultipleConsumersSupport; import org.apache.camel.Processor; @@ -49,12 +50,20 @@ public class FileWatchEndpoint extends DefaultEndpoint implements MultipleConsum @UriParam(label = "consumer", description = "Auto create directory if does not exists.", defaultValue = "true") private boolean autoCreate = true; + @UriParam(label = "consumer", description = "Watch recursive in current and child directories (including newly created directories).", defaultValue = "true") + private boolean recursive = true; + @UriParam(label = "consumer", description = "The number of concurrent consumers. Increase this value, if your route is slow to prevent buffering in queue.", defaultValue = "1") private int concurrentConsumers = 1; @UriParam(label = "consumer", + description = "The number of threads polling WatchService. Increase this value, if you see OVERFLOW messages in log.", + defaultValue = "1") + private int pollThreads = 1; + + @UriParam(label = "consumer", description = "ANT style pattern to match files. The file is matched against path relative to endpoint path. " + "Pattern must be also relative (not starting with slash)", defaultValue = "**") @@ -64,6 +73,21 @@ public class FileWatchEndpoint extends DefaultEndpoint implements MultipleConsum defaultValue = "" + Integer.MAX_VALUE) private int queueSize = Integer.MAX_VALUE; + @UriParam(label = "consumer", + description = "Reference to io.methvin.watcher.hashing.FileHasher. " + + "This prevents emitting duplicate events on some platforms. " + + "For working with large files and if you dont need detect multiple modifications per second per file, " + + "use #lastModifiedTimeFileHasher. You can also provide custom implementation in registry.", + defaultValue = "#murmur3FFileHasher") + private FileHasher fileHasher = FileHasher.DEFAULT_FILE_HASHER; + + @UriParam(label = "consumer", + description = "Enables or disables file hashing to detect duplicate events. " + + "If you disable this, you can get some events multiple times on some platforms and JDKs. " + + "Check java.nio.file.WatchService limitations for your target platform.", + defaultValue = "true") + private boolean useFileHashing = true; + public FileWatchEndpoint() { } @@ -119,6 +143,14 @@ public class FileWatchEndpoint extends DefaultEndpoint implements MultipleConsum this.autoCreate = autoCreate; } + public boolean isRecursive() { + return recursive; + } + + public void setRecursive(boolean recursive) { + this.recursive = recursive; + } + public int getConcurrentConsumers() { return concurrentConsumers; } @@ -127,6 +159,14 @@ public class FileWatchEndpoint extends DefaultEndpoint implements MultipleConsum this.concurrentConsumers = concurrentConsumers; } + public int getPollThreads() { + return pollThreads; + } + + public void setPollThreads(int pollThreads) { + this.pollThreads = pollThreads; + } + public int getQueueSize() { return queueSize; } @@ -143,6 +183,22 @@ public class FileWatchEndpoint extends DefaultEndpoint implements MultipleConsum this.antInclude = antInclude; } + public FileHasher getFileHasher() { + return fileHasher; + } + + public void setFileHasher(FileHasher fileHasher) { + this.fileHasher = fileHasher; + } + + public boolean isUseFileHashing() { + return useFileHashing; + } + + public void setUseFileHashing(boolean useFileHashing) { + this.useFileHashing = useFileHashing; + } + @Override public FileWatchComponent getComponent() { return (FileWatchComponent) super.getComponent(); diff --git a/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/constants/FileEvent.java b/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/constants/FileEvent.java index 8518e83..8eca13a 100644 --- a/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/constants/FileEvent.java +++ b/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/constants/FileEvent.java @@ -17,7 +17,9 @@ package org.apache.camel.component.file.watch.constants; import java.nio.file.Path; -import java.nio.file.WatchEvent; +import java.util.Date; + +import io.methvin.watcher.DirectoryChangeEvent; public class FileEvent { private FileEventEnum eventType; @@ -34,6 +36,12 @@ public class FileEvent { this.eventPath = eventPath; } + public FileEvent(DirectoryChangeEvent event) { + this(); + this.eventType = FileEventEnum.valueOf(event.eventType()); + this.eventPath = event.path(); + } + public FileEventEnum getEventType() { return eventType; } @@ -45,5 +53,4 @@ public class FileEvent { public long getEventDate() { return this.eventDate; } - } diff --git a/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/constants/FileEventEnum.java b/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/constants/FileEventEnum.java index 9df9848..6ec3acf 100644 --- a/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/constants/FileEventEnum.java +++ b/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/constants/FileEventEnum.java @@ -20,11 +20,30 @@ import java.nio.file.Path; import java.nio.file.StandardWatchEventKinds; import java.nio.file.WatchEvent; +import io.methvin.watcher.DirectoryChangeEvent; + public enum FileEventEnum { CREATE, DELETE, MODIFY; + public static FileEventEnum valueOf(DirectoryChangeEvent directoryChangeEvent) { + return valueOf(directoryChangeEvent.eventType()); + } + + public static FileEventEnum valueOf(DirectoryChangeEvent.EventType directoryChangeEventType) { + switch (directoryChangeEventType) { + case CREATE: + return FileEventEnum.CREATE; + case DELETE: + return FileEventEnum.DELETE; + case MODIFY: + return FileEventEnum.MODIFY; + default: + return null; + } + } + public static FileEventEnum valueOf(WatchEvent<?> watchEvent) { if (watchEvent.context() instanceof Path) { return valueOf(watchEvent.kind()); @@ -57,4 +76,17 @@ public enum FileEventEnum { return null; } } + + public DirectoryChangeEvent.EventType eventType() { + switch (this) { + case CREATE: + return DirectoryChangeEvent.EventType.CREATE; + case MODIFY: + return DirectoryChangeEvent.EventType.MODIFY; + case DELETE: + return DirectoryChangeEvent.EventType.DELETE; + default: + return null; + } + } } diff --git a/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/utils/WatchServiceUtils.java b/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/utils/WatchServiceUtils.java deleted file mode 100644 index 70101d1..0000000 --- a/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/utils/WatchServiceUtils.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.file.watch.utils; - -import java.nio.file.WatchService; - -public class WatchServiceUtils { - - private WatchServiceUtils() { - } - - /** - * Check if @param watchService is underlying sun.nio.fs.PollingWatchService - * This can happen on OS X, AIX and Solaris prior to version 11 - */ - public static boolean isPollingWatchService(WatchService watchService) { - try { - // If the WatchService is a PollingWatchService, which it is on OS X, AIX and Solaris prior to version 11 - Class<?> pollingWatchService = Class.forName("sun.nio.fs.PollingWatchService"); - return pollingWatchService.isInstance(watchService); - } catch (ClassNotFoundException ignored) { - // This is expected on JVMs where PollingWatchService is not available - return false; - } - } -} diff --git a/components/camel-file-watch/src/test/java/org/apache/camel/component/file/watch/FileWatchComponentRecursiveTest.java b/components/camel-file-watch/src/test/java/org/apache/camel/component/file/watch/FileWatchComponentRecursiveTest.java new file mode 100644 index 0000000..858111b --- /dev/null +++ b/components/camel-file-watch/src/test/java/org/apache/camel/component/file/watch/FileWatchComponentRecursiveTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.watch; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.UUID; + +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Test; + +public class FileWatchComponentRecursiveTest extends FileWatchComponentTestBase { + + @Override + protected RoutesBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("file-watch://" + testPath() + "?recursive=true").to("mock:recursive"); + from("file-watch://" + testPath() + "?recursive=false").to("mock:nonRecursive"); + } + }; + } + + @Test + public void testCreateFileInSubdirectories() throws Exception { + Path newDir = Paths.get(testPath(), "a", "b", "c", "d"); + newDir = Files.createDirectories(newDir); + File newFile = new File(newDir.toFile(), UUID.randomUUID().toString()); + + if (!newFile.createNewFile()) { + throw new RuntimeException("cannot create file"); + } + MockEndpoint recursive = getMockEndpoint("mock:recursive"); + recursive.expectedMessageCount(5); // 4 directories and one file + recursive.assertIsSatisfied(); + + MockEndpoint nonRecursive = getMockEndpoint("mock:nonRecursive"); + nonRecursive.expectedMessageCount(1); // 1 directory + nonRecursive.assertIsSatisfied(); + } +} diff --git a/components/camel-file-watch/src/test/java/org/apache/camel/component/file/watch/FileWatchComponentTest.java b/components/camel-file-watch/src/test/java/org/apache/camel/component/file/watch/FileWatchComponentTest.java index f483a0c..239a08b 100644 --- a/components/camel-file-watch/src/test/java/org/apache/camel/component/file/watch/FileWatchComponentTest.java +++ b/components/camel-file-watch/src/test/java/org/apache/camel/component/file/watch/FileWatchComponentTest.java @@ -38,7 +38,7 @@ public class FileWatchComponentTest extends FileWatchComponentTestBase { MockEndpoint watchDelete = getMockEndpoint("mock:watchDelete"); MockEndpoint watchDeleteOrCreate = getMockEndpoint("mock:watchDeleteOrCreate"); - File newFile = createFile(testPath(), UUID.randomUUID().toString()); // should emit CREATE event + File newFile = createFile(testPath(), UUID.randomUUID().toString()); watchAll.expectedMessageCount(1); watchAll.setAssertPeriod(1000); @@ -80,18 +80,33 @@ public class FileWatchComponentTest extends FileWatchComponentTestBase { @Test public void testAntMatcher() throws Exception { MockEndpoint all = getMockEndpoint("mock:watchAll"); - MockEndpoint onlyTxtInRoot = getMockEndpoint("mock:onlyTxt"); + MockEndpoint onlyTxtAnywhere = getMockEndpoint("mock:onlyTxtAnywhere"); + MockEndpoint onlyTxtInSubdirectory = getMockEndpoint("mock:onlyTxtInSubdirectory"); + MockEndpoint onlyTxtInRoot = getMockEndpoint("mock:onlyTxtInRoot"); Path root = Paths.get(testPath()); + Path a = Paths.get(testPath(), "a"); + Path b = Paths.get(testPath(), "a", "b"); - Files.createDirectories(root); + Files.createDirectories(b); createFile(root.toFile(), "inRoot.txt"); createFile(root.toFile(), "inRoot.java"); - all.expectedMessageCount(2); // 1 java file 1 txt file + createFile(a.toFile(), "inA.txt"); + createFile(a.toFile(), "inA.java"); + createFile(b.toFile(), "inB.txt"); + createFile(b.toFile(), "inB.java"); + + all.expectedMessageCount(8); // 2 directories, 6 files all.assertIsSatisfied(); - onlyTxtInRoot.expectedMessageCount(1); // 1 txt file + onlyTxtAnywhere.expectedMessageCount(3); // 3 txt files + onlyTxtAnywhere.assertIsSatisfied(); + + onlyTxtInSubdirectory.expectedMessageCount(1); // 1 txt file in first subdirectory + onlyTxtInSubdirectory.assertIsSatisfied(); + + onlyTxtInRoot.expectedMessageCount(1); // 1 txt file inRoot.txt (should exclude everything in subdirectories) onlyTxtInRoot.assertIsSatisfied(); } @@ -100,12 +115,13 @@ public class FileWatchComponentTest extends FileWatchComponentTestBase { MockEndpoint mock = getMockEndpoint("mock:watchAll"); Files.write(testFiles.get(0), "Hello".getBytes(), StandardOpenOption.SYNC); + //testFiles.get(0).toFile().renameTo(new File(testPath(), "hello.txt")); + //Assert.assertTrue(newFile.createNewFile()); mock.setExpectedCount(1); mock.setResultWaitTime(1000); mock.assertIsSatisfied(); assertEquals("Hello", mock.getExchanges().get(0).getIn().getBody(String.class)); - assertEquals(FileEventEnum.MODIFY, mock.getExchanges().get(0).getIn().getHeader(FileWatchComponent.EVENT_TYPE_HEADER)); } @Test @@ -131,8 +147,16 @@ public class FileWatchComponentTest extends FileWatchComponentTestBase { .to("mock:watchAll"); from("file-watch://" + testPath() + "?events=CREATE&antInclude=*.txt") - .routeId("onlyTxt") - .to("mock:onlyTxt"); + .routeId("onlyTxtInRoot") + .to("mock:onlyTxtInRoot"); + + from("file-watch://" + testPath() + "?events=CREATE&antInclude=*/*.txt") + .routeId("onlyTxtInSubdirectory") + .to("mock:onlyTxtInSubdirectory"); + + from("file-watch://" + testPath() + "?events=CREATE&antInclude=**/*.txt") + .routeId("onlyTxtAnywhere") + .to("mock:onlyTxtAnywhere"); from("file-watch://" + testPath() + "?events=CREATE") .to("mock:watchCreate"); diff --git a/components/camel-file-watch/src/test/java/org/apache/camel/component/file/watch/FileWatchComponentTestBase.java b/components/camel-file-watch/src/test/java/org/apache/camel/component/file/watch/FileWatchComponentTestBase.java index f282e55..1aca5d3 100644 --- a/components/camel-file-watch/src/test/java/org/apache/camel/component/file/watch/FileWatchComponentTestBase.java +++ b/components/camel-file-watch/src/test/java/org/apache/camel/component/file/watch/FileWatchComponentTestBase.java @@ -58,7 +58,7 @@ public class FileWatchComponentTestBase extends CamelTestSupport { new File(testPath()).mkdirs(); for (int i = 0; i < 10; i++) { File newFile = new File(testPath(), getTestName().getMethodName() + "-" + i); - newFile.createNewFile(); + Assume.assumeTrue(newFile.createNewFile()); testFiles.add(newFile.toPath()); } } @@ -78,13 +78,13 @@ public class FileWatchComponentTestBase extends CamelTestSupport { cleanTestDir(childFile); } else { if (!childFile.delete()) { - log.warn("Cannot delete " + childFile); + throw new IOException(); } } } if (!file.delete()) { - log.warn("Cannot delete " + file); + throw new IOException(); } } diff --git a/components/camel-file-watch/src/test/java/org/apache/camel/component/file/watch/SpringFileWatcherTest.java b/components/camel-file-watch/src/test/java/org/apache/camel/component/file/watch/SpringFileWatcherTest.java index 33f7045..55d8b5f 100644 --- a/components/camel-file-watch/src/test/java/org/apache/camel/component/file/watch/SpringFileWatcherTest.java +++ b/components/camel-file-watch/src/test/java/org/apache/camel/component/file/watch/SpringFileWatcherTest.java @@ -49,12 +49,23 @@ public class SpringFileWatcherTest extends CamelSpringTestSupport { Files.write(springTestFile.toPath(), "modification".getBytes(), StandardOpenOption.SYNC); Files.write(springTestFile.toPath(), "modification 2".getBytes(), StandardOpenOption.SYNC); MockEndpoint mock = getMockEndpoint("mock:springTest"); - mock.setMinimumExpectedMessageCount(2); // two MODIFY events (On Windows sometimes more) + mock.setExpectedCount(2); // two MODIFY events mock.setResultWaitTime(1000); mock.assertIsSatisfied(); } + @Test + public void testCustomHasher() throws Exception { + Files.write(springTestCustomHasherFile.toPath(), "first modification".getBytes(), StandardOpenOption.SYNC); + Files.write(springTestCustomHasherFile.toPath(), "second modification".getBytes(), StandardOpenOption.SYNC); + + MockEndpoint mock = getMockEndpoint("mock:springTestCustomHasher"); + mock.setExpectedCount(1); // We passed dummy TestHasher which returns constant hashcode. This should cause, that second MODIFY event is discarded + mock.setResultWaitTime(1000); + mock.assertIsSatisfied(); + } + @Override protected AbstractXmlApplicationContext createApplicationContext() { return new ClassPathXmlApplicationContext("org/apache/camel/component/file/watch/SpringFileWatchComponentTest.xml"); diff --git a/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/FileWatchComponent.java b/components/camel-file-watch/src/test/java/org/apache/camel/component/file/watch/TestHasher.java similarity index 57% copy from components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/FileWatchComponent.java copy to components/camel-file-watch/src/test/java/org/apache/camel/component/file/watch/TestHasher.java index de633d8..695f2b7 100644 --- a/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/FileWatchComponent.java +++ b/components/camel-file-watch/src/test/java/org/apache/camel/component/file/watch/TestHasher.java @@ -1,37 +1,36 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.file.watch; - -import java.util.Map; - -import org.apache.camel.Endpoint; -import org.apache.camel.spi.annotations.Component; -import org.apache.camel.support.DefaultComponent; - -/** - * Represents the component that manages {@link FileWatchEndpoint}. - */ -@Component("file-watch") -public class FileWatchComponent extends DefaultComponent { - public static final String EVENT_TYPE_HEADER = "CamelFileEventType"; - - protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { - Endpoint endpoint = new FileWatchEndpoint(uri, remaining, this); - setProperties(endpoint, parameters); - return endpoint; - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.watch; + +import java.io.IOException; +import java.nio.file.Path; + +import io.methvin.watcher.hashing.FileHasher; +import io.methvin.watcher.hashing.HashCode; + +/** + * For unit test only! + */ +public class TestHasher implements FileHasher { + @Override + public HashCode hash(Path path) throws IOException { + // Always return constant + // This should cause every event is triggered only once (hashcode remains the same), so we can test this. + // Never use this in production code + return HashCode.fromLong(1L); + } +} diff --git a/components/camel-file-watch/src/test/resources/org/apache/camel/component/file/watch/SpringFileWatchComponentTest.xml b/components/camel-file-watch/src/test/resources/org/apache/camel/component/file/watch/SpringFileWatchComponentTest.xml index d7520ec..1243043 100644 --- a/components/camel-file-watch/src/test/resources/org/apache/camel/component/file/watch/SpringFileWatchComponentTest.xml +++ b/components/camel-file-watch/src/test/resources/org/apache/camel/component/file/watch/SpringFileWatchComponentTest.xml @@ -24,11 +24,18 @@ http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd "> + <bean id="testHasher" class="org.apache.camel.component.file.watch.TestHasher"/> + <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="file-watch:target/fileWatchSpringTest"/> <to uri="mock:springTest"/> </route> + + <route> + <from uri="file-watch:target/fileWatchSpringTestCustomHasher?fileHasher=#testHasher"/> + <to uri="mock:springTestCustomHasher"/> + </route> </camelContext> </beans> diff --git a/docs/components/modules/ROOT/nav.adoc b/docs/components/modules/ROOT/nav.adoc index c14a30c..9510ef0 100644 --- a/docs/components/modules/ROOT/nav.adoc +++ b/docs/components/modules/ROOT/nav.adoc @@ -105,7 +105,6 @@ * xref:fhir-component.adoc[FHIR Component] * xref:fhirJson-dataformat.adoc[FHIR JSon DataFormat] * xref:fhirXml-dataformat.adoc[FHIR XML DataFormat] -* xref:file-watch-component.adoc[file-watch Component] * xref:file-component.adoc[File Component] * xref:flatpack-component.adoc[Flatpack Component] * xref:flatpack-dataformat.adoc[Flatpack DataFormat] diff --git a/docs/components/modules/ROOT/pages/file-watch-component.adoc b/docs/components/modules/ROOT/pages/file-watch-component.adoc index bff1bf1..ca17bee 100644 --- a/docs/components/modules/ROOT/pages/file-watch-component.adoc +++ b/docs/components/modules/ROOT/pages/file-watch-component.adoc @@ -3,9 +3,7 @@ *Available as of Camel version 3.0* -This component can be used to watch modification events in folder. It is based on java.nio.file.WatchService. -Be aware that WatchService can behave differently on different platforms. -The produced message can contain either file or directory. +This component can be used to watch file modification events in folder. It is based on https://github.com/gmethvin/directory-watcher[gmethvin/directory-watcher]. === URI Options @@ -42,7 +40,7 @@ with the following path and query parameters: |=== -==== Query Parameters (10 parameters): +==== Query Parameters (14 parameters): [width="100%",cols="2,5,^1,2",options="header"] @@ -53,7 +51,11 @@ with the following path and query parameters: | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean | *concurrentConsumers* (consumer) | The number of concurrent consumers. Increase this value, if your route is slow to prevent buffering in queue. | 1 | int | *events* (consumer) | Coma separated list of events to watch. Allowed values are: CREATE, MODIFY, DELETE. | CREATE,MODIFY,DELETE | Set +| *fileHasher* (consumer) | Reference to io.methvin.watcher.hashing.FileHasher. This prevents emitting duplicate events on some platforms. For working with large files and if you dont need detect multiple modifications per second per file, use #lastModifiedTimeFileHasher. You can also provide custom implementation in registry. | #murmur3FFileHasher | FileHasher +| *pollThreads* (consumer) | The number of threads polling WatchService. Increase this value, if you see OVERFLOW messages in log. | 1 | int | *queueSize* (consumer) | Maximum size of queue between WatchService and consumer. Unbounded by default. | 2147483647 | int +| *recursive* (consumer) | Watch recursive in current and child directories (including newly created directories). | true | boolean +| *useFileHashing* (consumer) | Enables or disables file hashing to detect duplicate events. If you disable this, you can get some events multiple times on some platforms and JDKs. Check java.nio.file.WatchService limitations for your target platform. | true | boolean | *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. | | ExceptionHandler | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. | | ExchangePattern | *basicPropertyBinding* (advanced) | Whether the endpoint should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities | false | boolean @@ -92,26 +94,26 @@ The component supports 3 options, which are listed below. === Examples: -==== Watch all events (file creation, file deletion, file modification): +==== Recursive watch all events (file creation, file deletion, file modification): [source,java] ---- from("file-watch://some-directory") .log("File event: ${header.CamelFileEventType} occurred on file ${header.CamelFileName} at ${header.CamelFileLastModified}"); ---- -==== Watch for creation and deletion of txt files: +==== Recursive watch for creation and deletion of txt files: [source,java] ---- -from("file-watch://some-directory?events=DELETE,CREATE&antInclude=*.txt") +from("file-watch://some-directory?events=DELETE,CREATE&antInclude=**/*.txt") .log("File event: ${header.CamelFileEventType} occurred on file ${header.CamelFileName} at ${header.CamelFileLastModified}"); ---- ==== Create snapshot of file when modified: [source,java] ---- -from("file-watch://some-directory?events=MODIFY") +from("file-watch://some-directory?events=MODIFY&recursive=false") .setHeader(Exchange.FILE_NAME, simple("${header.CamelFileName}.${header.CamelFileLastModified}")) - .to("file:snapshots"); + .to("file:some-directory/snapshots"); ---- === Message Headers diff --git a/parent/pom.xml b/parent/pom.xml index 25dd534..448e952 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -160,6 +160,7 @@ <derby-version>10.14.2.0</derby-version> <digitalocean-api-client-version>2.17</digitalocean-api-client-version> <digitalocean-api-client-bundle-version>2.17_1</digitalocean-api-client-bundle-version> + <directory-watcher-version>0.9.5</directory-watcher-version> <disruptor-version>3.4.2</disruptor-version> <dnsjava-version>2.1.9</dnsjava-version> <dnsjava-bundle-version>2.1.9_1</dnsjava-bundle-version> @@ -5127,6 +5128,13 @@ <artifactId>java-grok</artifactId> <version>${java-grok-version}</version> </dependency> + + <!-- directory-watcher for camel-file-watch component --> + <dependency> + <groupId>io.methvin</groupId> + <artifactId>directory-watcher</artifactId> + <version>${directory-watcher-version}</version> + </dependency> </dependencies> </dependencyManagement> diff --git a/platforms/karaf/features/src/main/resources/features.xml b/platforms/karaf/features/src/main/resources/features.xml index f1841b9..bcd39e1 100644 --- a/platforms/karaf/features/src/main/resources/features.xml +++ b/platforms/karaf/features/src/main/resources/features.xml @@ -967,6 +967,7 @@ </feature> <feature name='camel-file-watch' version='${project.version}' start-level='50'> <feature version='${project.version}'>camel-core</feature> + <bundle dependency='true'>wrap:mvn:io.methvin/directory-watcher/${directory-watcher-version}</bundle> <bundle>mvn:org.apache.camel/camel-file-watch/${project.version}</bundle> </feature> <feature name='camel-flatpack' version='${project.version}' start-level='50'>