Repository: camel
Updated Branches:
  refs/heads/master 16b924b95 -> 115cb3aff


CAMEL-9517: Validator Endpoint- clearCachedSchema added



Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/115cb3af
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/115cb3af
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/115cb3af

Branch: refs/heads/master
Commit: 115cb3affbe2b0a866f7b686035b5b73626b0dca
Parents: 16b924b
Author: Franz Forsthofer <franz.forstho...@sap.com>
Authored: Thu Jan 14 15:23:21 2016 +0100
Committer: Franz Forsthofer <franz.forstho...@sap.com>
Committed: Fri Jan 15 07:51:56 2016 +0100

----------------------------------------------------------------------
 .../component/validator/ValidatorEndpoint.java  |  74 ++++--
 .../processor/validation/SchemaReader.java      | 181 +++++++++++++++
 .../validation/ValidatingProcessor.java         | 112 +++------
 .../ValidatorEndpointClearCachedSchemaTest.java | 225 +++++++++++++++++++
 4 files changed, 493 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/115cb3af/camel-core/src/main/java/org/apache/camel/component/validator/ValidatorEndpoint.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/validator/ValidatorEndpoint.java
 
b/camel-core/src/main/java/org/apache/camel/component/validator/ValidatorEndpoint.java
index 1b0c28c..6b04a8c 100644
--- 
a/camel-core/src/main/java/org/apache/camel/component/validator/ValidatorEndpoint.java
+++ 
b/camel-core/src/main/java/org/apache/camel/component/validator/ValidatorEndpoint.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.validator;
 
+import java.io.IOException;
 import java.io.InputStream;
 import javax.xml.XMLConstants;
 import javax.xml.validation.SchemaFactory;
@@ -26,9 +27,12 @@ import org.apache.camel.Component;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
+import org.apache.camel.api.management.ManagedOperation;
+import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.converter.IOConverter;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.processor.validation.DefaultValidationErrorHandler;
+import org.apache.camel.processor.validation.SchemaReader;
 import org.apache.camel.processor.validation.ValidatingProcessor;
 import org.apache.camel.processor.validation.ValidatorErrorHandler;
 import org.apache.camel.spi.Metadata;
@@ -43,6 +47,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Validates the payload of a message using XML Schema and JAXP Validation.
  */
+@ManagedResource(description = "Managed ValidatorEndpoint")
 @UriEndpoint(scheme = "validator", title = "Validator", syntax = 
"validator:resourceUri", producerOnly = true, label = "core,validation")
 public class ValidatorEndpoint extends DefaultEndpoint {
 
@@ -73,6 +78,14 @@ public class ValidatorEndpoint extends DefaultEndpoint {
     @UriParam(description = "To validate against a header instead of the 
message body.")
     private String headerName;
 
+    /**
+     * We need a one-to-one relation between endpoint and schema reader in 
order
+     * to be able to clear the cached schema in the schema reader. See method
+     * {@link #clearCachedSchema}.
+     */
+    private final SchemaReader schemaReader = new SchemaReader();
+    private volatile boolean schemaReaderConfigured;
+
     public ValidatorEndpoint() {
     }
 
@@ -81,28 +94,58 @@ public class ValidatorEndpoint extends DefaultEndpoint {
         this.resourceUri = resourceUri;
     }
 
+    @ManagedOperation(description = "Clears the cached schema, forcing to 
re-load the schema on next request")
+    public void clearCachedSchema() throws Exception {
+        LOG.debug("{} rereading schema resource: {}", this, resourceUri);
+        byte[] bytes = readSchemaResource();
+        schemaReader.setSchemaAsByteArray(bytes);
+
+        schemaReader.setSchema(null); // will cause to reload the schema from
+                                      // the set byte-array on next request
+    }
+
     @Override
     public Producer createProducer() throws Exception {
-        ValidatingProcessor validator = new ValidatingProcessor();
 
+        if (!schemaReaderConfigured) {
+            if (resourceResolver != null) {
+                schemaReader.setResourceResolver(resourceResolver);
+            } else {
+                schemaReader.setResourceResolver(new 
DefaultLSResourceResolver(getCamelContext(), resourceUri));
+            }
+            schemaReader.setSchemaLanguage(getSchemaLanguage());
+            schemaReader.setSchemaFactory(getSchemaFactory());
+            
+            byte[] bytes = readSchemaResource();
+            schemaReader.setSchemaAsByteArray(bytes);
+            LOG.debug("{} using schema resource: {}", this, resourceUri);
+
+            // force loading of schema at create time otherwise concurrent
+            // processing could cause thread safe issues for the
+            // javax.xml.validation.SchemaFactory
+            schemaReader.loadSchema();
+
+            // configure only once
+            schemaReaderConfigured = true;
+        }
+
+        ValidatingProcessor validator = new ValidatingProcessor(schemaReader);
+        configureValidator(validator);
+
+        return new ValidatorProducer(this, validator);
+    }
+
+    protected byte[] readSchemaResource() throws IOException {
         InputStream is = 
ResourceHelper.resolveMandatoryResourceAsInputStream(getCamelContext(), 
resourceUri);
         byte[] bytes = null;
         try {
             bytes = IOConverter.toBytes(is);
         } finally {
-            // and make sure to close the input stream after the schema has 
been loaded
+            // and make sure to close the input stream after the schema has 
been
+            // loaded
             IOHelper.close(is);
         }
-
-        validator.setSchemaAsByteArray(bytes);
-        LOG.debug("{} using schema resource: {}", this, resourceUri);
-        configureValidator(validator);
-
-        // force loading of schema at create time otherwise concurrent
-        // processing could cause thread safe issues for the 
javax.xml.validation.SchemaFactory
-        validator.loadSchema();
-
-        return new ValidatorProducer(this, validator);
+        return bytes;
     }
 
     @Override
@@ -116,13 +159,6 @@ public class ValidatorEndpoint extends DefaultEndpoint {
     }
 
     protected void configureValidator(ValidatingProcessor validator) throws 
Exception {
-        if (resourceResolver != null) {
-            validator.setResourceResolver(resourceResolver);
-        } else {
-            validator.setResourceResolver(new 
DefaultLSResourceResolver(getCamelContext(), resourceUri));
-        }
-        validator.setSchemaLanguage(getSchemaLanguage());
-        validator.setSchemaFactory(getSchemaFactory());
         validator.setErrorHandler(getErrorHandler());
         validator.setUseDom(isUseDom());
         validator.setUseSharedSchema(isUseSharedSchema());

http://git-wip-us.apache.org/repos/asf/camel/blob/115cb3af/camel-core/src/main/java/org/apache/camel/processor/validation/SchemaReader.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/validation/SchemaReader.java
 
b/camel-core/src/main/java/org/apache/camel/processor/validation/SchemaReader.java
new file mode 100644
index 0000000..0fdb9e1
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/validation/SchemaReader.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.validation;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+
+import javax.xml.XMLConstants;
+import javax.xml.transform.Source;
+import javax.xml.transform.stream.StreamSource;
+import javax.xml.validation.Schema;
+import javax.xml.validation.SchemaFactory;
+
+import org.w3c.dom.ls.LSResourceResolver;
+import org.xml.sax.SAXException;
+
+/**
+ * Reads the schema used in the processor {@link ValidatingProcessor}. Contains
+ * the method {@link clearCachedSchema()} to force re-reading the schema.
+ */
+public class SchemaReader {
+
+    private String schemaLanguage = XMLConstants.W3C_XML_SCHEMA_NS_URI;
+    // must be volatile because is accessed from different threads see 
ValidatorEndpoint.clearCachedSchema
+    private volatile Schema schema;
+    private Source schemaSource;
+    // must be volatile because is accessed from different threads see 
ValidatorEndpoint.clearCachedSchema
+    private volatile SchemaFactory schemaFactory;
+    private URL schemaUrl;
+    private File schemaFile;
+    private volatile byte[] schemaAsByteArray;
+    private LSResourceResolver resourceResolver;
+
+    public void loadSchema() throws Exception {
+        // force loading of schema
+        schema = createSchema();
+    }
+
+    // Properties
+    // -----------------------------------------------------------------------
+
+    public Schema getSchema() throws IOException, SAXException {
+        if (schema == null) {
+            synchronized (this) {
+                if (schema == null) {
+                    schema = createSchema();
+                }
+            }
+        }
+        return schema;
+    }
+
+    public void setSchema(Schema schema) {
+        this.schema = schema;
+    }
+
+    public String getSchemaLanguage() {
+        return schemaLanguage;
+    }
+
+    public void setSchemaLanguage(String schemaLanguage) {
+        this.schemaLanguage = schemaLanguage;
+    }
+
+    public Source getSchemaSource() throws IOException {
+        if (schemaSource == null) {
+            schemaSource = createSchemaSource();
+        }
+        return schemaSource;
+    }
+
+    public void setSchemaSource(Source schemaSource) {
+        this.schemaSource = schemaSource;
+    }
+
+    public URL getSchemaUrl() {
+        return schemaUrl;
+    }
+
+    public void setSchemaUrl(URL schemaUrl) {
+        this.schemaUrl = schemaUrl;
+    }
+
+    public File getSchemaFile() {
+        return schemaFile;
+    }
+
+    public void setSchemaFile(File schemaFile) {
+        this.schemaFile = schemaFile;
+    }
+
+    public byte[] getSchemaAsByteArray() {
+        return schemaAsByteArray;
+    }
+
+    public void setSchemaAsByteArray(byte[] schemaAsByteArray) {
+        this.schemaAsByteArray = schemaAsByteArray;
+    }
+
+    public SchemaFactory getSchemaFactory() {
+        if (schemaFactory == null) {
+            synchronized (this) {
+                if (schemaFactory == null) {
+                    schemaFactory = createSchemaFactory();
+                }
+            }
+        }
+        return schemaFactory;
+    }
+
+    public void setSchemaFactory(SchemaFactory schemaFactory) {
+        this.schemaFactory = schemaFactory;
+    }
+
+    public LSResourceResolver getResourceResolver() {
+        return resourceResolver;
+    }
+
+    public void setResourceResolver(LSResourceResolver resourceResolver) {
+        this.resourceResolver = resourceResolver;
+    }
+
+    protected SchemaFactory createSchemaFactory() {
+        SchemaFactory factory = SchemaFactory.newInstance(schemaLanguage);
+        if (getResourceResolver() != null) {
+            factory.setResourceResolver(getResourceResolver());
+        }
+        return factory;
+    }
+
+    protected Source createSchemaSource() throws IOException {
+        throw new IllegalArgumentException("You must specify either a schema, 
schemaFile, schemaSource or schemaUrl property");
+    }
+
+    protected Schema createSchema() throws SAXException, IOException {
+        SchemaFactory factory = getSchemaFactory();
+
+        URL url = getSchemaUrl();
+        if (url != null) {
+            synchronized (this) {
+                return factory.newSchema(url);
+            }
+        }
+
+        File file = getSchemaFile();
+        if (file != null) {
+            synchronized (this) {
+                return factory.newSchema(file);
+            }
+        }
+
+        byte[] bytes = getSchemaAsByteArray();
+        if (bytes != null) {
+            synchronized (this) {
+                return factory.newSchema(new StreamSource(new 
ByteArrayInputStream(schemaAsByteArray)));
+            }
+        }
+
+        Source source = getSchemaSource();
+        synchronized (this) {
+            return factory.newSchema(source);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/115cb3af/camel-core/src/main/java/org/apache/camel/processor/validation/ValidatingProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/validation/ValidatingProcessor.java
 
b/camel-core/src/main/java/org/apache/camel/processor/validation/ValidatingProcessor.java
index 359ca0d..96b3db8 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/validation/ValidatingProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/validation/ValidatingProcessor.java
@@ -16,14 +16,12 @@
  */
 package org.apache.camel.processor.validation;
 
-import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URL;
 import java.util.Collections;
 
-import javax.xml.XMLConstants;
 import javax.xml.parsers.ParserConfigurationException;
 import javax.xml.transform.Result;
 import javax.xml.transform.Source;
@@ -61,22 +59,24 @@ import org.slf4j.LoggerFactory;
  */
 public class ValidatingProcessor implements AsyncProcessor {
     private static final Logger LOG = 
LoggerFactory.getLogger(ValidatingProcessor.class);
-    private final XmlConverter converter = new XmlConverter();
-    private String schemaLanguage = XMLConstants.W3C_XML_SCHEMA_NS_URI;
-    private volatile Schema schema;
-    private Source schemaSource;
-    private volatile SchemaFactory schemaFactory;
-    private URL schemaUrl;
-    private File schemaFile;
-    private byte[] schemaAsByteArray;
+    private final SchemaReader schemaReader;
     private ValidatorErrorHandler errorHandler = new 
DefaultValidationErrorHandler();
+    private final XmlConverter converter = new XmlConverter();
     private boolean useDom;
     private boolean useSharedSchema = true;
-    private LSResourceResolver resourceResolver;
     private boolean failOnNullBody = true;
     private boolean failOnNullHeader = true;
     private String headerName;
 
+    public ValidatingProcessor() {
+        schemaReader = new SchemaReader();
+    }
+
+    public ValidatingProcessor(SchemaReader schemaReader) {
+        // schema reader can be a singelton per schema, therefore make reuse, 
see ValidatorEndpoint and ValidatorProducer
+        this.schemaReader = schemaReader;
+    }
+
     public void process(Exchange exchange) throws Exception {
         AsyncProcessorHelper.process(this, exchange);
     }
@@ -182,84 +182,66 @@ public class ValidatingProcessor implements 
AsyncProcessor {
     }
 
     public void loadSchema() throws Exception {
-        // force loading of schema
-        schema = createSchema();
+        schemaReader.loadSchema();
     }
 
     // Properties
     // -----------------------------------------------------------------------
 
     public Schema getSchema() throws IOException, SAXException {
-        if (schema == null) {
-            synchronized (this) {
-                if (schema == null) {
-                    schema = createSchema();
-                }
-            }
-        }
-        return schema;
+        return schemaReader.getSchema();
     }
 
     public void setSchema(Schema schema) {
-        this.schema = schema;
+        schemaReader.setSchema(schema);
     }
 
     public String getSchemaLanguage() {
-        return schemaLanguage;
+        return schemaReader.getSchemaLanguage();
     }
 
     public void setSchemaLanguage(String schemaLanguage) {
-        this.schemaLanguage = schemaLanguage;
+        schemaReader.setSchemaLanguage(schemaLanguage);
     }
 
     public Source getSchemaSource() throws IOException {
-        if (schemaSource == null) {
-            schemaSource = createSchemaSource();
-        }
-        return schemaSource;
+        return schemaReader.getSchemaSource();
     }
 
     public void setSchemaSource(Source schemaSource) {
-        this.schemaSource = schemaSource;
+        schemaReader.setSchemaSource(schemaSource);
     }
 
     public URL getSchemaUrl() {
-        return schemaUrl;
+        return schemaReader.getSchemaUrl();
     }
 
     public void setSchemaUrl(URL schemaUrl) {
-        this.schemaUrl = schemaUrl;
+        schemaReader.setSchemaUrl(schemaUrl);
     }
 
     public File getSchemaFile() {
-        return schemaFile;
+        return schemaReader.getSchemaFile();
     }
 
     public void setSchemaFile(File schemaFile) {
-        this.schemaFile = schemaFile;
+        schemaReader.setSchemaFile(schemaFile);
     }
 
     public byte[] getSchemaAsByteArray() {
-        return schemaAsByteArray;
+        return schemaReader.getSchemaAsByteArray();
     }
 
     public void setSchemaAsByteArray(byte[] schemaAsByteArray) {
-        this.schemaAsByteArray = schemaAsByteArray;
+        schemaReader.setSchemaAsByteArray(schemaAsByteArray);
     }
 
     public SchemaFactory getSchemaFactory() {
-        if (schemaFactory == null) {
-            synchronized (this) {
-                if (schemaFactory == null) {
-                    schemaFactory = createSchemaFactory();
-                }
-            }
-        }
-        return schemaFactory;
+        return schemaReader.getSchemaFactory();
     }
 
     public void setSchemaFactory(SchemaFactory schemaFactory) {
-        this.schemaFactory = schemaFactory;
+        schemaReader.setSchemaFactory(schemaFactory);
     }
 
     public ValidatorErrorHandler getErrorHandler() {
@@ -294,11 +276,11 @@ public class ValidatingProcessor implements 
AsyncProcessor {
     }
 
     public LSResourceResolver getResourceResolver() {
-        return resourceResolver;
+        return schemaReader.getResourceResolver();
     }
 
     public void setResourceResolver(LSResourceResolver resourceResolver) {
-        this.resourceResolver = resourceResolver;
+        schemaReader.setResourceResolver(resourceResolver);
     }
 
     public boolean isFailOnNullBody() {
@@ -329,45 +311,15 @@ public class ValidatingProcessor implements 
AsyncProcessor {
     // -----------------------------------------------------------------------
 
     protected SchemaFactory createSchemaFactory() {
-        SchemaFactory factory = SchemaFactory.newInstance(schemaLanguage);
-        if (getResourceResolver() != null) {
-            factory.setResourceResolver(getResourceResolver());
-        }
-        return factory;
+        return schemaReader.createSchemaFactory();
     }
 
     protected Source createSchemaSource() throws IOException {
-        throw new IllegalArgumentException("You must specify either a schema, 
schemaFile, schemaSource or schemaUrl property");
+        return schemaReader.createSchemaSource();
     }
 
     protected Schema createSchema() throws SAXException, IOException {
-        SchemaFactory factory = getSchemaFactory();
-
-        URL url = getSchemaUrl();
-        if (url != null) {
-            synchronized (this) {
-                return factory.newSchema(url);
-            }
-        }
-
-        File file = getSchemaFile();
-        if (file != null) {
-            synchronized (this) {
-                return factory.newSchema(file);
-            }
-        }
-
-        byte[] bytes = getSchemaAsByteArray();
-        if (bytes != null) {
-            synchronized (this) {
-                return factory.newSchema(new StreamSource(new 
ByteArrayInputStream(schemaAsByteArray)));
-            }
-        }
-
-        Source source = getSchemaSource();
-        synchronized (this) {
-            return factory.newSchema(source);
-        }
+        return schemaReader.createSchema();
     }
 
     /**
@@ -461,4 +413,4 @@ public class ValidatingProcessor implements AsyncProcessor {
         return source;
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/115cb3af/camel-core/src/test/java/org/apache/camel/component/validator/ValidatorEndpointClearCachedSchemaTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/component/validator/ValidatorEndpointClearCachedSchemaTest.java
 
b/camel-core/src/test/java/org/apache/camel/component/validator/ValidatorEndpointClearCachedSchemaTest.java
new file mode 100644
index 0000000..e144208
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/component/validator/ValidatorEndpointClearCachedSchemaTest.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.validator;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.DefaultClassResolver;
+import org.apache.camel.impl.SimpleRegistry;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests whether the ValidatorEndpoint.clearCachedSchema() can be executed when
+ * several sender threads are running.
+ */
+public class ValidatorEndpointClearCachedSchemaTest extends ContextTestSupport 
{
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ValidatorEndpointClearCachedSchemaTest.class);
+
+    private SimpleRegistry simpleReg;
+
+    private CamelContext context;
+
+    @Test
+    public void testClearCachedSchema() throws Exception {
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+
+        // send one message for start up to finish.
+        new Sender().run();
+
+        // send with 5 sender threads in parallel and call clear cache in
+        // between
+        ExecutorService senderPool = Executors.newFixedThreadPool(5);
+        ExecutorService executorClearCache = Executors.newFixedThreadPool(1);
+        for (int i = 0; i < 5; i++) {
+            senderPool.execute(new Sender());
+            if (i == 2) {
+                /**
+                 * The clear cache thread calls xsdEndpoint.clearCachedSchema
+                 */
+                executorClearCache.execute(new ClearCache());
+            }
+        }
+
+        senderPool.shutdown();
+        executorClearCache.shutdown();
+
+        senderPool.awaitTermination(2, TimeUnit.SECONDS);
+
+        List<Exchange> exchanges = mock.getExchanges();
+
+        assertNotNull(exchanges);
+
+        // expect at least 5 correct sent messages, the messages sent before
+        // the clearCacheSchema method is called will fail with a validation
+        // error and will nor result in an exchange
+        assertTrue("Less then expected exchanges", exchanges.size() > 5);
+
+    }
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        simpleReg = new SimpleRegistry();
+        context = new DefaultCamelContext(simpleReg);
+        context.setClassResolver(new ClassResolverImpl());
+        return context;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                
from("direct:start").to("validator:pd:somefile.xsd").convertBodyTo(String.class).to("log:after").to("mock:result");
+
+            }
+        };
+    }
+
+    private class Sender implements Runnable {
+
+        private final String message = "<?xml version=\"1.0\" 
encoding=\"UTF-8\"?>" + //
+                                       "<p:TestMessage 
xmlns:p=\"http://apache.camel.org/test\";>" + //
+                                       
"<MessageContent>MessageContent</MessageContent>" + //
+                                       "</p:TestMessage>";
+
+        private final byte[] messageBytes = 
message.getBytes(StandardCharsets.UTF_8);
+
+        @Override
+        public void run() {
+            // send up to 5 messages
+            for (int j = 0; j < 5; j++) {
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+                sendBody("direct:start", messageBytes);
+            }
+        }
+
+    }
+
+    private class ClearCache implements Runnable {
+
+        @Override
+        public void run() {
+            try {
+                // start later after the first sender
+                // threads are running
+                Thread.sleep(200);
+                clearCachedSchema();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+    }
+
+    private void clearCachedSchema() throws Exception {
+        Collection<Endpoint> endpoints = context.getEndpoints();
+        for (Endpoint endpoint : endpoints) {
+            LOG.info("Endpoint URI: " + endpoint.getEndpointUri());
+            if (endpoint.getEndpointUri().startsWith("validator:")) {
+                ValidatorEndpoint xsltEndpoint = (ValidatorEndpoint)endpoint;
+                xsltEndpoint.clearCachedSchema();
+                LOG.info("schema cache cleared");
+            }
+        }
+    }
+
+    /**
+     * Class to simulate a change of the XSD document. During the first call of
+     * the resource a XSD is returned which does not fit to the XML document. 
In
+     * the second call a XSD fitting to the XML document is returned.
+     */
+    static class ClassResolverImpl extends DefaultClassResolver {
+
+        private final String xsdtemplate1 = "<?xml version=\"1.0\" 
encoding=\"UTF-8\"?>\n" + //
+                                            "<xsd:schema 
targetNamespace=\"http://apache.camel.org/test\"; 
xmlns=\"http://apache.camel.org/test\"; 
xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\";>"
+                                            + //
+                                            "    <xsd:complexType 
name=\"TestMessage\">" + //
+                                            "        <xsd:sequence>" + //
+                                            "            <xsd:element 
name=\"Content\" type=\"xsd:string\" />" + // //
+                                                                               
                                  // wrong
+                                                                               
                                  // element
+                                                                               
                                  // name
+                                                                               
                                  // will
+                                                                               
                                  // cause
+                                                                               
                                  // the
+                                                                               
                                  // validation
+                                                                               
                                  // to
+                                                                               
                                  // fail
+        "        </xsd:sequence>" + //
+                                            "        <xsd:attribute 
name=\"attr\" type=\"xsd:string\" default=\"xsd1\"/>" + //
+                                            "    </xsd:complexType>" + //
+                                            "    <xsd:element 
name=\"TestMessage\" type=\"TestMessage\" />" + //
+                                            "</xsd:schema>"; //
+
+        private final String xsdtemplate2 = 
xsdtemplate1.replace("\"Content\"", "\"MessageContent\""); // correct
+                                                                               
                        // element
+                                                                               
                        // name
+                                                                               
                        // -->
+                                                                               
                        // validation
+                                                                               
                        // will
+                                                                               
                        // be
+                                                                               
                        // correct
+
+        private byte[] xsd1 = xsdtemplate1.getBytes(StandardCharsets.UTF_8);
+
+        private byte[] xsd2 = xsdtemplate2.getBytes(StandardCharsets.UTF_8);
+
+        private volatile short counter;
+
+        @Override
+        public InputStream loadResourceAsStream(String uri) {
+            if (uri.startsWith("pd:")) {
+                byte[] xsd;
+                if (counter == 0) {
+                    xsd = xsd1;
+                    LOG.info("resolved XSD1");
+                } else {
+                    xsd = xsd2;
+                    LOG.info("resolved XSD2");
+                }
+                counter++;
+                return new ByteArrayInputStream(xsd);
+            } else {
+                return super.loadResourceAsStream(uri);
+            }
+        }
+
+    }
+
+}

Reply via email to