This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit fac1b971ea3c17d71b8501e3f56824ccf11db58f Author: Jan Bednář <m...@janbednar.eu> AuthorDate: Sun Jul 7 17:41:48 2019 +0200 CAMEL-13628: Dropped directory-watcher dependency and related stuff --- components/camel-file-watch/pom.xml | 7 +- .../src/main/docs/file-watch-component.adoc | 20 ++- .../component/file/watch/FileWatchComponent.java | 14 -- .../component/file/watch/FileWatchConsumer.java | 160 +++++++++++++-------- .../component/file/watch/FileWatchEndpoint.java | 56 -------- .../component/file/watch/constants/FileEvent.java | 11 +- .../file/watch/constants/FileEventEnum.java | 32 ----- .../file/watch/utils/WatchServiceUtils.java | 40 ++++++ .../watch/FileWatchComponentRecursiveTest.java | 60 -------- .../file/watch/FileWatchComponentTest.java | 40 ++---- .../file/watch/FileWatchComponentTestBase.java | 6 +- .../file/watch/SpringFileWatcherTest.java | 13 +- .../camel/component/file/watch/TestHasher.java | 36 ----- .../file/watch/SpringFileWatchComponentTest.xml | 7 - docs/components/modules/ROOT/nav.adoc | 1 + .../modules/ROOT/pages/file-watch-component.adoc | 20 ++- parent/pom.xml | 8 -- .../karaf/features/src/main/resources/features.xml | 1 - 18 files changed, 177 insertions(+), 355 deletions(-) diff --git a/components/camel-file-watch/pom.xml b/components/camel-file-watch/pom.xml index d366f9f..16ad35e 100644 --- a/components/camel-file-watch/pom.xml +++ b/components/camel-file-watch/pom.xml @@ -39,11 +39,6 @@ <artifactId>camel-support</artifactId> </dependency> - <dependency> - <groupId>io.methvin</groupId> - <artifactId>directory-watcher</artifactId> - </dependency> - <!-- testing --> <dependency> <groupId>org.apache.camel</groupId> @@ -66,4 +61,4 @@ <scope>test</scope> </dependency> </dependencies> -</project> \ No newline at end of file +</project> diff --git a/components/camel-file-watch/src/main/docs/file-watch-component.adoc b/components/camel-file-watch/src/main/docs/file-watch-component.adoc index ca17bee..bff1bf1 100644 --- a/components/camel-file-watch/src/main/docs/file-watch-component.adoc +++ b/components/camel-file-watch/src/main/docs/file-watch-component.adoc @@ -3,7 +3,9 @@ *Available as of Camel version 3.0* -This component can be used to watch file modification events in folder. It is based on https://github.com/gmethvin/directory-watcher[gmethvin/directory-watcher]. +This component can be used to watch modification events in folder. It is based on java.nio.file.WatchService. +Be aware that WatchService can behave differently on different platforms. +The produced message can contain either file or directory. === URI Options @@ -40,7 +42,7 @@ with the following path and query parameters: |=== -==== Query Parameters (14 parameters): +==== Query Parameters (10 parameters): [width="100%",cols="2,5,^1,2",options="header"] @@ -51,11 +53,7 @@ with the following path and query parameters: | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean | *concurrentConsumers* (consumer) | The number of concurrent consumers. Increase this value, if your route is slow to prevent buffering in queue. | 1 | int | *events* (consumer) | Coma separated list of events to watch. Allowed values are: CREATE, MODIFY, DELETE. | CREATE,MODIFY,DELETE | Set -| *fileHasher* (consumer) | Reference to io.methvin.watcher.hashing.FileHasher. This prevents emitting duplicate events on some platforms. For working with large files and if you dont need detect multiple modifications per second per file, use #lastModifiedTimeFileHasher. You can also provide custom implementation in registry. | #murmur3FFileHasher | FileHasher -| *pollThreads* (consumer) | The number of threads polling WatchService. Increase this value, if you see OVERFLOW messages in log. | 1 | int | *queueSize* (consumer) | Maximum size of queue between WatchService and consumer. Unbounded by default. | 2147483647 | int -| *recursive* (consumer) | Watch recursive in current and child directories (including newly created directories). | true | boolean -| *useFileHashing* (consumer) | Enables or disables file hashing to detect duplicate events. If you disable this, you can get some events multiple times on some platforms and JDKs. Check java.nio.file.WatchService limitations for your target platform. | true | boolean | *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. | | ExceptionHandler | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. | | ExchangePattern | *basicPropertyBinding* (advanced) | Whether the endpoint should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities | false | boolean @@ -94,26 +92,26 @@ The component supports 3 options, which are listed below. === Examples: -==== Recursive watch all events (file creation, file deletion, file modification): +==== Watch all events (file creation, file deletion, file modification): [source,java] ---- from("file-watch://some-directory") .log("File event: ${header.CamelFileEventType} occurred on file ${header.CamelFileName} at ${header.CamelFileLastModified}"); ---- -==== Recursive watch for creation and deletion of txt files: +==== Watch for creation and deletion of txt files: [source,java] ---- -from("file-watch://some-directory?events=DELETE,CREATE&antInclude=**/*.txt") +from("file-watch://some-directory?events=DELETE,CREATE&antInclude=*.txt") .log("File event: ${header.CamelFileEventType} occurred on file ${header.CamelFileName} at ${header.CamelFileLastModified}"); ---- ==== Create snapshot of file when modified: [source,java] ---- -from("file-watch://some-directory?events=MODIFY&recursive=false") +from("file-watch://some-directory?events=MODIFY") .setHeader(Exchange.FILE_NAME, simple("${header.CamelFileName}.${header.CamelFileLastModified}")) - .to("file:some-directory/snapshots"); + .to("file:snapshots"); ---- === Message Headers diff --git a/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/FileWatchComponent.java b/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/FileWatchComponent.java index 03bafa6..de633d8 100644 --- a/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/FileWatchComponent.java +++ b/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/FileWatchComponent.java @@ -18,9 +18,7 @@ package org.apache.camel.component.file.watch; import java.util.Map; -import io.methvin.watcher.hashing.FileHasher; import org.apache.camel.Endpoint; -import org.apache.camel.spi.Registry; import org.apache.camel.spi.annotations.Component; import org.apache.camel.support.DefaultComponent; @@ -36,16 +34,4 @@ public class FileWatchComponent extends DefaultComponent { setProperties(endpoint, parameters); return endpoint; } - - @Override - protected void doStart() throws Exception { - super.doStart(); - Registry registry = getCamelContext().getRegistry(); - if (registry.lookupByNameAndType("murmur3FFileHasher", FileHasher.class) == null) { - registry.bind("murmur3FFileHasher", FileHasher.class, FileHasher.DEFAULT_FILE_HASHER); - } - if (registry.lookupByNameAndType("lastModifiedTimeFileHasher", FileHasher.class) == null) { - registry.bind("lastModifiedTimeFileHasher", FileHasher.class, FileHasher.LAST_MODIFIED_TIME); - } - } } diff --git a/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/FileWatchConsumer.java b/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/FileWatchConsumer.java index ece2040..dce656a 100644 --- a/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/FileWatchConsumer.java +++ b/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/FileWatchConsumer.java @@ -18,23 +18,29 @@ package org.apache.camel.component.file.watch; import java.io.File; import java.io.IOException; +import java.lang.reflect.Field; +import java.nio.file.ClosedWatchServiceException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.util.Objects; +import java.nio.file.StandardWatchEventKinds; +import java.nio.file.WatchEvent; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import io.methvin.watcher.DirectoryChangeEvent; -import io.methvin.watcher.DirectoryChangeListener; -import io.methvin.watcher.DirectoryWatcher; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.RuntimeCamelException; import org.apache.camel.component.file.watch.constants.FileEvent; +import org.apache.camel.component.file.watch.constants.FileEventEnum; import org.apache.camel.component.file.watch.utils.PathUtils; +import org.apache.camel.component.file.watch.utils.WatchServiceUtils; import org.apache.camel.support.DefaultConsumer; import org.apache.camel.util.AntPathMatcher; @@ -43,12 +49,14 @@ import org.apache.camel.util.AntPathMatcher; */ public class FileWatchConsumer extends DefaultConsumer { + private WatchService watchService; private ExecutorService watchDirExecutorService; private ExecutorService pollExecutorService; private LinkedBlockingQueue<FileEvent> eventQueue; private Path baseDirectory; private AntPathMatcher antPathMatcher; - private DirectoryWatcher watcher; + private WatchEvent.Kind[] kinds; + public FileWatchConsumer(FileWatchEndpoint endpoint, Processor processor) { super(endpoint, processor); @@ -60,6 +68,16 @@ public class FileWatchConsumer extends DefaultConsumer { antPathMatcher = new AntPathMatcher(); baseDirectory = Paths.get(getEndpoint().getPath()); + + Set<FileEventEnum> events = new HashSet<>(endpoint.getEvents()); + + kinds = new WatchEvent.Kind[endpoint.getEvents().size() + 1]; + kinds[0] = StandardWatchEventKinds.OVERFLOW; //always watch Overflow event for logging purposes + int i = 0; + for (FileEventEnum fileEventEnum: events) { + kinds[i + 1] = fileEventEnum.kind(); + i++; + } } @Override @@ -78,43 +96,59 @@ public class FileWatchConsumer extends DefaultConsumer { throw new RuntimeCamelException(String.format("Parameter path must be directory, %s given", baseDirectory.toString())); } - DirectoryWatcher.Builder watcherBuilder = DirectoryWatcher.builder() - .path(this.baseDirectory) - .logger(log) - .listener(new FileWatchDirectoryChangeListener()); - - if (!System.getProperty("os.name").toLowerCase().contains("mac")) { - // If not macOS, use FileSystem WatchService. io.methvin.watcher uses by default WatchService associated to default FileSystem. - // We need per FileSystem WatchService, to allow monitoring on machine with multiple file systems. - // Keep default for macOS - watcherBuilder.watchService(this.baseDirectory.getFileSystem().newWatchService()); - } - - watcherBuilder.fileHashing(getEndpoint().isUseFileHashing()); - if (getEndpoint().getFileHasher() != null && getEndpoint().isUseFileHashing()) { - watcherBuilder.fileHasher(getEndpoint().getFileHasher()); - } - - this.watcher = watcherBuilder.build(); + watchService = baseDirectory.getFileSystem().newWatchService(); watchDirExecutorService = getEndpoint().getCamelContext().getExecutorServiceManager() - .newFixedThreadPool(this, "CamelFileWatchService", getEndpoint().getPollThreads()); + .newSingleThreadExecutor(this, "CamelFileWatchService"); pollExecutorService = getEndpoint().getCamelContext().getExecutorServiceManager() .newFixedThreadPool(this, "CamelFileWatchPoll", getEndpoint().getConcurrentConsumers()); - for (int i = 0; i < getEndpoint().getPollThreads(); i++) { - this.watcher.watchAsync(watchDirExecutorService); - } + register(baseDirectory); + + watchDirExecutorService.submit(new WatchServiceRunnable()); + for (int i = 0; i < getEndpoint().getConcurrentConsumers(); i++) { pollExecutorService.submit(new PollRunnable()); } } + private void register(Path path) throws IOException { + boolean registered = false; + if (WatchServiceUtils.isPollingWatchService(watchService)) { + try { + // Find enum value SensitivityWatchEventModifier.HIGH using reflection to avoid importing com.sun packages + Class<?> sensitivityWatchEventModifierClass = getEndpoint().getCamelContext() + .getClassResolver().resolveClass("com.sun.nio.file.SensitivityWatchEventModifier"); + if (sensitivityWatchEventModifierClass != null) { + Field enumConstantField = sensitivityWatchEventModifierClass.getDeclaredField("HIGH"); + WatchEvent.Modifier sensitivityModifier = (WatchEvent.Modifier) enumConstantField.get(null); + + if (sensitivityModifier != null) { + path.register(watchService, kinds, sensitivityModifier); + registered = true; + } + } + } catch (IllegalAccessException | NoSuchFieldException ignored) { + // This is expected on JVMs where PollingWatchService or SensitivityWatchEventModifier are not available + } + } + + if (!registered) { + path.register(watchService, kinds); + } + } + @Override protected void doStop() throws Exception { - if (this.watcher != null) { - this.watcher.close(); + if (watchService != null) { + try { + watchService.close(); + log.info("WatchService closed"); + } catch (IOException e) { + log.info("Cannot close WatchService", e); + } } + if (watchDirExecutorService != null) { getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(watchDirExecutorService); } @@ -163,19 +197,6 @@ public class FileWatchConsumer extends DefaultConsumer { return false; } - if (!getEndpoint().isRecursive()) { - // On some platforms (eg macOS) is WatchService always recursive, - // so we need to filter this out to make this component platform independent - try { - if (!Files.isSameFile(fileEvent.getEventPath().getParent(), this.baseDirectory)) { - return false; - } - } catch (IOException e) { - log.warn(String.format("Exception occurred during executing filter. Filtering file %s out.", fileEvent.getEventPath()), e); - return false; - } - } - String pattern = getEndpoint().getAntInclude(); if (pattern == null || pattern.trim().isEmpty()) { return true; @@ -192,27 +213,50 @@ public class FileWatchConsumer extends DefaultConsumer { return (FileWatchEndpoint) super.getEndpoint(); } - class FileWatchDirectoryChangeListener implements DirectoryChangeListener { + class WatchServiceRunnable implements Runnable { + WatchKey watchKey; + @Override - public void onEvent(DirectoryChangeEvent directoryChangeEvent) { - if (directoryChangeEvent.eventType() == DirectoryChangeEvent.EventType.OVERFLOW) { - log.warn("OVERFLOW occurred, some events may be lost. Consider increasing of option 'pollThreads'"); - return; - } - FileEvent fileEvent = new FileEvent(directoryChangeEvent); - if (matchFilters(fileEvent)) { - eventQueue.offer(fileEvent); + public void run() { + while (take() && isRunAllowed() && !isStoppingOrStopped() && !isSuspendingOrSuspended()) { + for (WatchEvent<?> event : watchKey.pollEvents()) { + if (event.kind().equals(StandardWatchEventKinds.OVERFLOW)) { + log.warn("OVERFLOW occurred. Some events may be lost"); + continue; + } + + Path base = (Path) watchKey.watchable(); + WatchEvent<Path> eventCast = cast(event); + FileEvent fileEvent = new FileEvent(FileEventEnum.valueOf(eventCast.kind()), base.resolve(eventCast.context())); + + if (matchFilters(fileEvent)) { + eventQueue.offer(fileEvent); + } + } } } - @Override - public boolean isWatching() { - return !isStoppingOrStopped() && !isSuspendingOrSuspended(); + private boolean take() { + if (watchKey != null && !watchKey.reset()) { + log.info("WatchDirRunnable stopping, because watchKey is in invalid state"); + return false; + } + try { + watchKey = watchService.take(); + return true; + } catch (ClosedWatchServiceException | InterruptedException e) { + log.info("WatchDirRunnable stopping because " + e.getClass().getSimpleName() + ": " + e.getMessage()); + return false; + } } + } - @Override - public void onException(Exception e) { - handleException(e); + @SuppressWarnings("unchecked") + private WatchEvent<Path> cast(WatchEvent<?> event) { + if (event != null && event.kind().type() == Path.class) { + return (WatchEvent<Path>) event; + } else { + throw new ClassCastException("Cannot cast " + event + " to WatchEvent<Path>"); } } @@ -238,4 +282,6 @@ public class FileWatchConsumer extends DefaultConsumer { } } } + + } diff --git a/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/FileWatchEndpoint.java b/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/FileWatchEndpoint.java index d56b983..c366668 100644 --- a/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/FileWatchEndpoint.java +++ b/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/FileWatchEndpoint.java @@ -20,7 +20,6 @@ import java.util.Arrays; import java.util.HashSet; import java.util.Set; -import io.methvin.watcher.hashing.FileHasher; import org.apache.camel.Consumer; import org.apache.camel.MultipleConsumersSupport; import org.apache.camel.Processor; @@ -50,20 +49,12 @@ public class FileWatchEndpoint extends DefaultEndpoint implements MultipleConsum @UriParam(label = "consumer", description = "Auto create directory if does not exists.", defaultValue = "true") private boolean autoCreate = true; - @UriParam(label = "consumer", description = "Watch recursive in current and child directories (including newly created directories).", defaultValue = "true") - private boolean recursive = true; - @UriParam(label = "consumer", description = "The number of concurrent consumers. Increase this value, if your route is slow to prevent buffering in queue.", defaultValue = "1") private int concurrentConsumers = 1; @UriParam(label = "consumer", - description = "The number of threads polling WatchService. Increase this value, if you see OVERFLOW messages in log.", - defaultValue = "1") - private int pollThreads = 1; - - @UriParam(label = "consumer", description = "ANT style pattern to match files. The file is matched against path relative to endpoint path. " + "Pattern must be also relative (not starting with slash)", defaultValue = "**") @@ -73,21 +64,6 @@ public class FileWatchEndpoint extends DefaultEndpoint implements MultipleConsum defaultValue = "" + Integer.MAX_VALUE) private int queueSize = Integer.MAX_VALUE; - @UriParam(label = "consumer", - description = "Reference to io.methvin.watcher.hashing.FileHasher. " - + "This prevents emitting duplicate events on some platforms. " - + "For working with large files and if you dont need detect multiple modifications per second per file, " - + "use #lastModifiedTimeFileHasher. You can also provide custom implementation in registry.", - defaultValue = "#murmur3FFileHasher") - private FileHasher fileHasher = FileHasher.DEFAULT_FILE_HASHER; - - @UriParam(label = "consumer", - description = "Enables or disables file hashing to detect duplicate events. " - + "If you disable this, you can get some events multiple times on some platforms and JDKs. " - + "Check java.nio.file.WatchService limitations for your target platform.", - defaultValue = "true") - private boolean useFileHashing = true; - public FileWatchEndpoint() { } @@ -143,14 +119,6 @@ public class FileWatchEndpoint extends DefaultEndpoint implements MultipleConsum this.autoCreate = autoCreate; } - public boolean isRecursive() { - return recursive; - } - - public void setRecursive(boolean recursive) { - this.recursive = recursive; - } - public int getConcurrentConsumers() { return concurrentConsumers; } @@ -159,14 +127,6 @@ public class FileWatchEndpoint extends DefaultEndpoint implements MultipleConsum this.concurrentConsumers = concurrentConsumers; } - public int getPollThreads() { - return pollThreads; - } - - public void setPollThreads(int pollThreads) { - this.pollThreads = pollThreads; - } - public int getQueueSize() { return queueSize; } @@ -183,22 +143,6 @@ public class FileWatchEndpoint extends DefaultEndpoint implements MultipleConsum this.antInclude = antInclude; } - public FileHasher getFileHasher() { - return fileHasher; - } - - public void setFileHasher(FileHasher fileHasher) { - this.fileHasher = fileHasher; - } - - public boolean isUseFileHashing() { - return useFileHashing; - } - - public void setUseFileHashing(boolean useFileHashing) { - this.useFileHashing = useFileHashing; - } - @Override public FileWatchComponent getComponent() { return (FileWatchComponent) super.getComponent(); diff --git a/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/constants/FileEvent.java b/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/constants/FileEvent.java index 8eca13a..8518e83 100644 --- a/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/constants/FileEvent.java +++ b/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/constants/FileEvent.java @@ -17,9 +17,7 @@ package org.apache.camel.component.file.watch.constants; import java.nio.file.Path; -import java.util.Date; - -import io.methvin.watcher.DirectoryChangeEvent; +import java.nio.file.WatchEvent; public class FileEvent { private FileEventEnum eventType; @@ -36,12 +34,6 @@ public class FileEvent { this.eventPath = eventPath; } - public FileEvent(DirectoryChangeEvent event) { - this(); - this.eventType = FileEventEnum.valueOf(event.eventType()); - this.eventPath = event.path(); - } - public FileEventEnum getEventType() { return eventType; } @@ -53,4 +45,5 @@ public class FileEvent { public long getEventDate() { return this.eventDate; } + } diff --git a/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/constants/FileEventEnum.java b/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/constants/FileEventEnum.java index 6ec3acf..9df9848 100644 --- a/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/constants/FileEventEnum.java +++ b/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/constants/FileEventEnum.java @@ -20,30 +20,11 @@ import java.nio.file.Path; import java.nio.file.StandardWatchEventKinds; import java.nio.file.WatchEvent; -import io.methvin.watcher.DirectoryChangeEvent; - public enum FileEventEnum { CREATE, DELETE, MODIFY; - public static FileEventEnum valueOf(DirectoryChangeEvent directoryChangeEvent) { - return valueOf(directoryChangeEvent.eventType()); - } - - public static FileEventEnum valueOf(DirectoryChangeEvent.EventType directoryChangeEventType) { - switch (directoryChangeEventType) { - case CREATE: - return FileEventEnum.CREATE; - case DELETE: - return FileEventEnum.DELETE; - case MODIFY: - return FileEventEnum.MODIFY; - default: - return null; - } - } - public static FileEventEnum valueOf(WatchEvent<?> watchEvent) { if (watchEvent.context() instanceof Path) { return valueOf(watchEvent.kind()); @@ -76,17 +57,4 @@ public enum FileEventEnum { return null; } } - - public DirectoryChangeEvent.EventType eventType() { - switch (this) { - case CREATE: - return DirectoryChangeEvent.EventType.CREATE; - case MODIFY: - return DirectoryChangeEvent.EventType.MODIFY; - case DELETE: - return DirectoryChangeEvent.EventType.DELETE; - default: - return null; - } - } } diff --git a/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/utils/WatchServiceUtils.java b/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/utils/WatchServiceUtils.java new file mode 100644 index 0000000..70101d1 --- /dev/null +++ b/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/utils/WatchServiceUtils.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.watch.utils; + +import java.nio.file.WatchService; + +public class WatchServiceUtils { + + private WatchServiceUtils() { + } + + /** + * Check if @param watchService is underlying sun.nio.fs.PollingWatchService + * This can happen on OS X, AIX and Solaris prior to version 11 + */ + public static boolean isPollingWatchService(WatchService watchService) { + try { + // If the WatchService is a PollingWatchService, which it is on OS X, AIX and Solaris prior to version 11 + Class<?> pollingWatchService = Class.forName("sun.nio.fs.PollingWatchService"); + return pollingWatchService.isInstance(watchService); + } catch (ClassNotFoundException ignored) { + // This is expected on JVMs where PollingWatchService is not available + return false; + } + } +} diff --git a/components/camel-file-watch/src/test/java/org/apache/camel/component/file/watch/FileWatchComponentRecursiveTest.java b/components/camel-file-watch/src/test/java/org/apache/camel/component/file/watch/FileWatchComponentRecursiveTest.java deleted file mode 100644 index 858111b..0000000 --- a/components/camel-file-watch/src/test/java/org/apache/camel/component/file/watch/FileWatchComponentRecursiveTest.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.file.watch; - -import java.io.File; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.UUID; - -import org.apache.camel.RoutesBuilder; -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.component.mock.MockEndpoint; -import org.junit.Test; - -public class FileWatchComponentRecursiveTest extends FileWatchComponentTestBase { - - @Override - protected RoutesBuilder createRouteBuilder() throws Exception { - return new RouteBuilder() { - @Override - public void configure() throws Exception { - from("file-watch://" + testPath() + "?recursive=true").to("mock:recursive"); - from("file-watch://" + testPath() + "?recursive=false").to("mock:nonRecursive"); - } - }; - } - - @Test - public void testCreateFileInSubdirectories() throws Exception { - Path newDir = Paths.get(testPath(), "a", "b", "c", "d"); - newDir = Files.createDirectories(newDir); - File newFile = new File(newDir.toFile(), UUID.randomUUID().toString()); - - if (!newFile.createNewFile()) { - throw new RuntimeException("cannot create file"); - } - MockEndpoint recursive = getMockEndpoint("mock:recursive"); - recursive.expectedMessageCount(5); // 4 directories and one file - recursive.assertIsSatisfied(); - - MockEndpoint nonRecursive = getMockEndpoint("mock:nonRecursive"); - nonRecursive.expectedMessageCount(1); // 1 directory - nonRecursive.assertIsSatisfied(); - } -} diff --git a/components/camel-file-watch/src/test/java/org/apache/camel/component/file/watch/FileWatchComponentTest.java b/components/camel-file-watch/src/test/java/org/apache/camel/component/file/watch/FileWatchComponentTest.java index 239a08b..f483a0c 100644 --- a/components/camel-file-watch/src/test/java/org/apache/camel/component/file/watch/FileWatchComponentTest.java +++ b/components/camel-file-watch/src/test/java/org/apache/camel/component/file/watch/FileWatchComponentTest.java @@ -38,7 +38,7 @@ public class FileWatchComponentTest extends FileWatchComponentTestBase { MockEndpoint watchDelete = getMockEndpoint("mock:watchDelete"); MockEndpoint watchDeleteOrCreate = getMockEndpoint("mock:watchDeleteOrCreate"); - File newFile = createFile(testPath(), UUID.randomUUID().toString()); + File newFile = createFile(testPath(), UUID.randomUUID().toString()); // should emit CREATE event watchAll.expectedMessageCount(1); watchAll.setAssertPeriod(1000); @@ -80,33 +80,18 @@ public class FileWatchComponentTest extends FileWatchComponentTestBase { @Test public void testAntMatcher() throws Exception { MockEndpoint all = getMockEndpoint("mock:watchAll"); - MockEndpoint onlyTxtAnywhere = getMockEndpoint("mock:onlyTxtAnywhere"); - MockEndpoint onlyTxtInSubdirectory = getMockEndpoint("mock:onlyTxtInSubdirectory"); - MockEndpoint onlyTxtInRoot = getMockEndpoint("mock:onlyTxtInRoot"); + MockEndpoint onlyTxtInRoot = getMockEndpoint("mock:onlyTxt"); Path root = Paths.get(testPath()); - Path a = Paths.get(testPath(), "a"); - Path b = Paths.get(testPath(), "a", "b"); - Files.createDirectories(b); + Files.createDirectories(root); createFile(root.toFile(), "inRoot.txt"); createFile(root.toFile(), "inRoot.java"); - createFile(a.toFile(), "inA.txt"); - createFile(a.toFile(), "inA.java"); - createFile(b.toFile(), "inB.txt"); - createFile(b.toFile(), "inB.java"); - - all.expectedMessageCount(8); // 2 directories, 6 files + all.expectedMessageCount(2); // 1 java file 1 txt file all.assertIsSatisfied(); - onlyTxtAnywhere.expectedMessageCount(3); // 3 txt files - onlyTxtAnywhere.assertIsSatisfied(); - - onlyTxtInSubdirectory.expectedMessageCount(1); // 1 txt file in first subdirectory - onlyTxtInSubdirectory.assertIsSatisfied(); - - onlyTxtInRoot.expectedMessageCount(1); // 1 txt file inRoot.txt (should exclude everything in subdirectories) + onlyTxtInRoot.expectedMessageCount(1); // 1 txt file onlyTxtInRoot.assertIsSatisfied(); } @@ -115,13 +100,12 @@ public class FileWatchComponentTest extends FileWatchComponentTestBase { MockEndpoint mock = getMockEndpoint("mock:watchAll"); Files.write(testFiles.get(0), "Hello".getBytes(), StandardOpenOption.SYNC); - //testFiles.get(0).toFile().renameTo(new File(testPath(), "hello.txt")); - //Assert.assertTrue(newFile.createNewFile()); mock.setExpectedCount(1); mock.setResultWaitTime(1000); mock.assertIsSatisfied(); assertEquals("Hello", mock.getExchanges().get(0).getIn().getBody(String.class)); + assertEquals(FileEventEnum.MODIFY, mock.getExchanges().get(0).getIn().getHeader(FileWatchComponent.EVENT_TYPE_HEADER)); } @Test @@ -147,16 +131,8 @@ public class FileWatchComponentTest extends FileWatchComponentTestBase { .to("mock:watchAll"); from("file-watch://" + testPath() + "?events=CREATE&antInclude=*.txt") - .routeId("onlyTxtInRoot") - .to("mock:onlyTxtInRoot"); - - from("file-watch://" + testPath() + "?events=CREATE&antInclude=*/*.txt") - .routeId("onlyTxtInSubdirectory") - .to("mock:onlyTxtInSubdirectory"); - - from("file-watch://" + testPath() + "?events=CREATE&antInclude=**/*.txt") - .routeId("onlyTxtAnywhere") - .to("mock:onlyTxtAnywhere"); + .routeId("onlyTxt") + .to("mock:onlyTxt"); from("file-watch://" + testPath() + "?events=CREATE") .to("mock:watchCreate"); diff --git a/components/camel-file-watch/src/test/java/org/apache/camel/component/file/watch/FileWatchComponentTestBase.java b/components/camel-file-watch/src/test/java/org/apache/camel/component/file/watch/FileWatchComponentTestBase.java index 1aca5d3..f282e55 100644 --- a/components/camel-file-watch/src/test/java/org/apache/camel/component/file/watch/FileWatchComponentTestBase.java +++ b/components/camel-file-watch/src/test/java/org/apache/camel/component/file/watch/FileWatchComponentTestBase.java @@ -58,7 +58,7 @@ public class FileWatchComponentTestBase extends CamelTestSupport { new File(testPath()).mkdirs(); for (int i = 0; i < 10; i++) { File newFile = new File(testPath(), getTestName().getMethodName() + "-" + i); - Assume.assumeTrue(newFile.createNewFile()); + newFile.createNewFile(); testFiles.add(newFile.toPath()); } } @@ -78,13 +78,13 @@ public class FileWatchComponentTestBase extends CamelTestSupport { cleanTestDir(childFile); } else { if (!childFile.delete()) { - throw new IOException(); + log.warn("Cannot delete " + childFile); } } } if (!file.delete()) { - throw new IOException(); + log.warn("Cannot delete " + file); } } diff --git a/components/camel-file-watch/src/test/java/org/apache/camel/component/file/watch/SpringFileWatcherTest.java b/components/camel-file-watch/src/test/java/org/apache/camel/component/file/watch/SpringFileWatcherTest.java index 55d8b5f..33f7045 100644 --- a/components/camel-file-watch/src/test/java/org/apache/camel/component/file/watch/SpringFileWatcherTest.java +++ b/components/camel-file-watch/src/test/java/org/apache/camel/component/file/watch/SpringFileWatcherTest.java @@ -49,23 +49,12 @@ public class SpringFileWatcherTest extends CamelSpringTestSupport { Files.write(springTestFile.toPath(), "modification".getBytes(), StandardOpenOption.SYNC); Files.write(springTestFile.toPath(), "modification 2".getBytes(), StandardOpenOption.SYNC); MockEndpoint mock = getMockEndpoint("mock:springTest"); - mock.setExpectedCount(2); // two MODIFY events + mock.setMinimumExpectedMessageCount(2); // two MODIFY events (On Windows sometimes more) mock.setResultWaitTime(1000); mock.assertIsSatisfied(); } - @Test - public void testCustomHasher() throws Exception { - Files.write(springTestCustomHasherFile.toPath(), "first modification".getBytes(), StandardOpenOption.SYNC); - Files.write(springTestCustomHasherFile.toPath(), "second modification".getBytes(), StandardOpenOption.SYNC); - - MockEndpoint mock = getMockEndpoint("mock:springTestCustomHasher"); - mock.setExpectedCount(1); // We passed dummy TestHasher which returns constant hashcode. This should cause, that second MODIFY event is discarded - mock.setResultWaitTime(1000); - mock.assertIsSatisfied(); - } - @Override protected AbstractXmlApplicationContext createApplicationContext() { return new ClassPathXmlApplicationContext("org/apache/camel/component/file/watch/SpringFileWatchComponentTest.xml"); diff --git a/components/camel-file-watch/src/test/java/org/apache/camel/component/file/watch/TestHasher.java b/components/camel-file-watch/src/test/java/org/apache/camel/component/file/watch/TestHasher.java deleted file mode 100644 index 695f2b7..0000000 --- a/components/camel-file-watch/src/test/java/org/apache/camel/component/file/watch/TestHasher.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.file.watch; - -import java.io.IOException; -import java.nio.file.Path; - -import io.methvin.watcher.hashing.FileHasher; -import io.methvin.watcher.hashing.HashCode; - -/** - * For unit test only! - */ -public class TestHasher implements FileHasher { - @Override - public HashCode hash(Path path) throws IOException { - // Always return constant - // This should cause every event is triggered only once (hashcode remains the same), so we can test this. - // Never use this in production code - return HashCode.fromLong(1L); - } -} diff --git a/components/camel-file-watch/src/test/resources/org/apache/camel/component/file/watch/SpringFileWatchComponentTest.xml b/components/camel-file-watch/src/test/resources/org/apache/camel/component/file/watch/SpringFileWatchComponentTest.xml index 1243043..d7520ec 100644 --- a/components/camel-file-watch/src/test/resources/org/apache/camel/component/file/watch/SpringFileWatchComponentTest.xml +++ b/components/camel-file-watch/src/test/resources/org/apache/camel/component/file/watch/SpringFileWatchComponentTest.xml @@ -24,18 +24,11 @@ http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd "> - <bean id="testHasher" class="org.apache.camel.component.file.watch.TestHasher"/> - <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="file-watch:target/fileWatchSpringTest"/> <to uri="mock:springTest"/> </route> - - <route> - <from uri="file-watch:target/fileWatchSpringTestCustomHasher?fileHasher=#testHasher"/> - <to uri="mock:springTestCustomHasher"/> - </route> </camelContext> </beans> diff --git a/docs/components/modules/ROOT/nav.adoc b/docs/components/modules/ROOT/nav.adoc index 9510ef0..c14a30c 100644 --- a/docs/components/modules/ROOT/nav.adoc +++ b/docs/components/modules/ROOT/nav.adoc @@ -105,6 +105,7 @@ * xref:fhir-component.adoc[FHIR Component] * xref:fhirJson-dataformat.adoc[FHIR JSon DataFormat] * xref:fhirXml-dataformat.adoc[FHIR XML DataFormat] +* xref:file-watch-component.adoc[file-watch Component] * xref:file-component.adoc[File Component] * xref:flatpack-component.adoc[Flatpack Component] * xref:flatpack-dataformat.adoc[Flatpack DataFormat] diff --git a/docs/components/modules/ROOT/pages/file-watch-component.adoc b/docs/components/modules/ROOT/pages/file-watch-component.adoc index ca17bee..bff1bf1 100644 --- a/docs/components/modules/ROOT/pages/file-watch-component.adoc +++ b/docs/components/modules/ROOT/pages/file-watch-component.adoc @@ -3,7 +3,9 @@ *Available as of Camel version 3.0* -This component can be used to watch file modification events in folder. It is based on https://github.com/gmethvin/directory-watcher[gmethvin/directory-watcher]. +This component can be used to watch modification events in folder. It is based on java.nio.file.WatchService. +Be aware that WatchService can behave differently on different platforms. +The produced message can contain either file or directory. === URI Options @@ -40,7 +42,7 @@ with the following path and query parameters: |=== -==== Query Parameters (14 parameters): +==== Query Parameters (10 parameters): [width="100%",cols="2,5,^1,2",options="header"] @@ -51,11 +53,7 @@ with the following path and query parameters: | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean | *concurrentConsumers* (consumer) | The number of concurrent consumers. Increase this value, if your route is slow to prevent buffering in queue. | 1 | int | *events* (consumer) | Coma separated list of events to watch. Allowed values are: CREATE, MODIFY, DELETE. | CREATE,MODIFY,DELETE | Set -| *fileHasher* (consumer) | Reference to io.methvin.watcher.hashing.FileHasher. This prevents emitting duplicate events on some platforms. For working with large files and if you dont need detect multiple modifications per second per file, use #lastModifiedTimeFileHasher. You can also provide custom implementation in registry. | #murmur3FFileHasher | FileHasher -| *pollThreads* (consumer) | The number of threads polling WatchService. Increase this value, if you see OVERFLOW messages in log. | 1 | int | *queueSize* (consumer) | Maximum size of queue between WatchService and consumer. Unbounded by default. | 2147483647 | int -| *recursive* (consumer) | Watch recursive in current and child directories (including newly created directories). | true | boolean -| *useFileHashing* (consumer) | Enables or disables file hashing to detect duplicate events. If you disable this, you can get some events multiple times on some platforms and JDKs. Check java.nio.file.WatchService limitations for your target platform. | true | boolean | *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. | | ExceptionHandler | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. | | ExchangePattern | *basicPropertyBinding* (advanced) | Whether the endpoint should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities | false | boolean @@ -94,26 +92,26 @@ The component supports 3 options, which are listed below. === Examples: -==== Recursive watch all events (file creation, file deletion, file modification): +==== Watch all events (file creation, file deletion, file modification): [source,java] ---- from("file-watch://some-directory") .log("File event: ${header.CamelFileEventType} occurred on file ${header.CamelFileName} at ${header.CamelFileLastModified}"); ---- -==== Recursive watch for creation and deletion of txt files: +==== Watch for creation and deletion of txt files: [source,java] ---- -from("file-watch://some-directory?events=DELETE,CREATE&antInclude=**/*.txt") +from("file-watch://some-directory?events=DELETE,CREATE&antInclude=*.txt") .log("File event: ${header.CamelFileEventType} occurred on file ${header.CamelFileName} at ${header.CamelFileLastModified}"); ---- ==== Create snapshot of file when modified: [source,java] ---- -from("file-watch://some-directory?events=MODIFY&recursive=false") +from("file-watch://some-directory?events=MODIFY") .setHeader(Exchange.FILE_NAME, simple("${header.CamelFileName}.${header.CamelFileLastModified}")) - .to("file:some-directory/snapshots"); + .to("file:snapshots"); ---- === Message Headers diff --git a/parent/pom.xml b/parent/pom.xml index 448e952..25dd534 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -160,7 +160,6 @@ <derby-version>10.14.2.0</derby-version> <digitalocean-api-client-version>2.17</digitalocean-api-client-version> <digitalocean-api-client-bundle-version>2.17_1</digitalocean-api-client-bundle-version> - <directory-watcher-version>0.9.5</directory-watcher-version> <disruptor-version>3.4.2</disruptor-version> <dnsjava-version>2.1.9</dnsjava-version> <dnsjava-bundle-version>2.1.9_1</dnsjava-bundle-version> @@ -5128,13 +5127,6 @@ <artifactId>java-grok</artifactId> <version>${java-grok-version}</version> </dependency> - - <!-- directory-watcher for camel-file-watch component --> - <dependency> - <groupId>io.methvin</groupId> - <artifactId>directory-watcher</artifactId> - <version>${directory-watcher-version}</version> - </dependency> </dependencies> </dependencyManagement> diff --git a/platforms/karaf/features/src/main/resources/features.xml b/platforms/karaf/features/src/main/resources/features.xml index bcd39e1..f1841b9 100644 --- a/platforms/karaf/features/src/main/resources/features.xml +++ b/platforms/karaf/features/src/main/resources/features.xml @@ -967,7 +967,6 @@ </feature> <feature name='camel-file-watch' version='${project.version}' start-level='50'> <feature version='${project.version}'>camel-core</feature> - <bundle dependency='true'>wrap:mvn:io.methvin/directory-watcher/${directory-watcher-version}</bundle> <bundle>mvn:org.apache.camel/camel-file-watch/${project.version}</bundle> </feature> <feature name='camel-flatpack' version='${project.version}' start-level='50'>