This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch camel-2.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-2.x by this push: new e45d49c [CAMEL-13263] Lenient IPFS connection check on startup e45d49c is described below commit e45d49c76d22f22ff945f04fe41d37d164fe962e Author: Thomas Diesler <tdies...@redhat.com> AuthorDate: Tue Feb 26 14:56:25 2019 +0100 [CAMEL-13263] Lenient IPFS connection check on startup --- .../apache/camel/component/ipfs/IPFSComponent.java | 16 ------- .../apache/camel/component/ipfs/IPFSEndpoint.java | 53 ++++++++++++++++---- .../camel/component/ipfs/SimpleIPFSTest.java | 56 ++++++++-------------- 3 files changed, 64 insertions(+), 61 deletions(-) diff --git a/components/camel-ipfs/src/main/java/org/apache/camel/component/ipfs/IPFSComponent.java b/components/camel-ipfs/src/main/java/org/apache/camel/component/ipfs/IPFSComponent.java index e8a260b..37a087b 100644 --- a/components/camel-ipfs/src/main/java/org/apache/camel/component/ipfs/IPFSComponent.java +++ b/components/camel-ipfs/src/main/java/org/apache/camel/component/ipfs/IPFSComponent.java @@ -19,16 +19,11 @@ package org.apache.camel.component.ipfs; import java.net.URI; import java.util.Map; -import io.nessus.ipfs.client.DefaultIPFSClient; -import io.nessus.ipfs.client.IPFSClient; - import org.apache.camel.Endpoint; import org.apache.camel.impl.DefaultComponent; public class IPFSComponent extends DefaultComponent { - private IPFSClient client; - @Override protected Endpoint createEndpoint(String urispec, String remaining, Map<String, Object> params) throws Exception { @@ -53,17 +48,6 @@ public class IPFSComponent extends DefaultComponent { } config.setIpfsCmd(cmd); - client = createClient(config); - return new IPFSEndpoint(urispec, this, config); } - - public IPFSClient getIPFSClient() { - return client; - } - - private synchronized IPFSClient createClient(IPFSConfiguration config) { - IPFSClient ipfsClient = new DefaultIPFSClient(config.getIpfsHost(), config.getIpfsPort()); - return ipfsClient.connect(); - } } diff --git a/components/camel-ipfs/src/main/java/org/apache/camel/component/ipfs/IPFSEndpoint.java b/components/camel-ipfs/src/main/java/org/apache/camel/component/ipfs/IPFSEndpoint.java index 5f14518..3abbf50 100644 --- a/components/camel-ipfs/src/main/java/org/apache/camel/component/ipfs/IPFSEndpoint.java +++ b/components/camel-ipfs/src/main/java/org/apache/camel/component/ipfs/IPFSEndpoint.java @@ -27,7 +27,9 @@ import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import io.ipfs.multihash.Multihash; +import io.nessus.ipfs.client.DefaultIPFSClient; import io.nessus.ipfs.client.IPFSClient; +import io.nessus.ipfs.client.IPFSException; import org.apache.camel.Consumer; import org.apache.camel.Processor; @@ -36,6 +38,8 @@ import org.apache.camel.component.ipfs.IPFSConfiguration.IPFSCommand; import org.apache.camel.impl.DefaultEndpoint; import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * The camel-ipfs component provides access to the Interplanetary File System @@ -44,16 +48,39 @@ import org.apache.camel.spi.UriParam; @UriEndpoint(firstVersion = "2.23.0", scheme = "ipfs", title = "IPFS", syntax = "ipfs:host:port/cmd", producerOnly = true, label = "file,ipfs") public class IPFSEndpoint extends DefaultEndpoint { - static long defaultTimeout = 10000L; + public static final long DEFAULT_TIMEOUT = 10000L; + + private static final Logger LOG = LoggerFactory.getLogger(IPFSComponent.class); @UriParam - private final IPFSConfiguration configuration; + private final IPFSConfiguration config; + + private IPFSClient client; - public IPFSEndpoint(String uri, IPFSComponent component, IPFSConfiguration configuration) { + public IPFSEndpoint(String uri, IPFSComponent component, IPFSConfiguration config) { super(uri, component); - this.configuration = configuration; + this.config = config; + this.client = createClient(config); + } + + public IPFSClient getIPFSClient() { + return client; } + public void setClient(IPFSClient client) { + this.client = client; + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + try { + client.connect(); + } catch (IPFSException ex) { + LOG.warn(ex.getMessage()); + } + } + @Override public IPFSComponent getComponent() { return (IPFSComponent)super.getComponent(); @@ -75,11 +102,11 @@ public class IPFSEndpoint extends DefaultEndpoint { } IPFSConfiguration getConfiguration() { - return configuration; + return config; } IPFSCommand getCommand() { - String cmd = configuration.getIpfsCmd(); + String cmd = config.getIpfsCmd(); try { return IPFSCommand.valueOf(cmd); } catch (IllegalArgumentException ex) { @@ -100,7 +127,7 @@ public class IPFSEndpoint extends DefaultEndpoint { Multihash mhash = Multihash.fromBase58(cid); Future<InputStream> future = ipfs().cat(mhash); try { - return future.get(defaultTimeout, TimeUnit.MILLISECONDS); + return future.get(DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException ex) { throw new IOException("Cannot obtain: " + cid, ex); } @@ -110,13 +137,21 @@ public class IPFSEndpoint extends DefaultEndpoint { Multihash mhash = Multihash.fromBase58(cid); Future<Path> future = ipfs().get(mhash, outdir); try { - return future.get(defaultTimeout, TimeUnit.MILLISECONDS); + return future.get(DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException ex) { throw new IOException("Cannot obtain: " + cid, ex); } } private IPFSClient ipfs() { - return getComponent().getIPFSClient(); + if (!client.hasConnection()) { + client.connect(); + } + return client; + } + + private IPFSClient createClient(IPFSConfiguration config) { + IPFSClient ipfsClient = new DefaultIPFSClient(config.getIpfsHost(), config.getIpfsPort()); + return ipfsClient; } } diff --git a/components/camel-ipfs/src/test/java/org/apache/camel/component/ipfs/SimpleIPFSTest.java b/components/camel-ipfs/src/test/java/org/apache/camel/component/ipfs/SimpleIPFSTest.java index 24dc252..82c3fa0 100644 --- a/components/camel-ipfs/src/test/java/org/apache/camel/component/ipfs/SimpleIPFSTest.java +++ b/components/camel-ipfs/src/test/java/org/apache/camel/component/ipfs/SimpleIPFSTest.java @@ -25,9 +25,6 @@ import java.nio.file.Paths; import java.util.Arrays; import java.util.List; -import io.nessus.ipfs.client.DefaultIPFSClient; -import io.nessus.ipfs.client.IPFSClient; -import io.nessus.ipfs.client.IPFSException; import io.nessus.utils.StreamUtils; import org.apache.camel.CamelContext; @@ -36,24 +33,10 @@ import org.apache.camel.builder.RouteBuilder; import org.apache.camel.impl.DefaultCamelContext; import org.junit.Assert; import org.junit.Assume; -import org.junit.Before; import org.junit.Test; public class SimpleIPFSTest { - IPFSClient ipfs; - - @Before - public void before() { - ipfs = new DefaultIPFSClient("127.0.0.1", 5001); - try { - ipfs.connect(); - } catch (IPFSException ex) { - // ignore - } - Assume.assumeTrue(ipfs.hasConnection()); - } - @Test public void ipfsVersion() throws Exception { @@ -68,8 +51,8 @@ public class SimpleIPFSTest { }); camelctx.start(); - assumeIPFS(camelctx); - + assumeIPFSAvailable(camelctx); + try { ProducerTemplate producer = camelctx.createProducerTemplate(); String resA = producer.requestBody("direct:startA", null, String.class); @@ -96,12 +79,11 @@ public class SimpleIPFSTest { } }); - Path path = Paths.get("src/test/resources/html/index.html"); - camelctx.start(); - assumeIPFS(camelctx); - + assumeIPFSAvailable(camelctx); + try { + Path path = Paths.get("src/test/resources/html/index.html"); ProducerTemplate producer = camelctx.createProducerTemplate(); String res = producer.requestBody("direct:start", path, String.class); Assert.assertEquals(hash, res); @@ -124,12 +106,11 @@ public class SimpleIPFSTest { } }); - Path path = Paths.get("src/test/resources/html"); - camelctx.start(); - assumeIPFS(camelctx); - + assumeIPFSAvailable(camelctx); + try { + Path path = Paths.get("src/test/resources/html"); ProducerTemplate producer = camelctx.createProducerTemplate(); List<String> res = producer.requestBody("direct:start", path, List.class); Assert.assertEquals(10, res.size()); @@ -153,8 +134,8 @@ public class SimpleIPFSTest { }); camelctx.start(); - assumeIPFS(camelctx); - + assumeIPFSAvailable(camelctx); + try { ProducerTemplate producer = camelctx.createProducerTemplate(); InputStream res = producer.requestBody("direct:start", hash, InputStream.class); @@ -178,8 +159,8 @@ public class SimpleIPFSTest { }); camelctx.start(); - assumeIPFS(camelctx); - + assumeIPFSAvailable(camelctx); + try { ProducerTemplate producer = camelctx.createProducerTemplate(); Path res = producer.requestBody("direct:start", hash, Path.class); @@ -204,8 +185,8 @@ public class SimpleIPFSTest { }); camelctx.start(); - assumeIPFS(camelctx); - + assumeIPFSAvailable(camelctx); + try { ProducerTemplate producer = camelctx.createProducerTemplate(); Path res = producer.requestBody("direct:start", hash, Path.class); @@ -223,8 +204,11 @@ public class SimpleIPFSTest { Assert.assertEquals("The quick brown fox jumps over the lazy dog.", new String(baos.toByteArray())); } - private void assumeIPFS(CamelContext camelctx) { - IPFSComponent comp = camelctx.getComponent("ipfs", IPFSComponent.class); - Assume.assumeTrue(comp.getIPFSClient().hasConnection()); + private void assumeIPFSAvailable(CamelContext camelctx) throws Exception { + IPFSEndpoint ipfsEp = camelctx.getEndpoints().stream() + .filter(ep -> ep instanceof IPFSEndpoint) + .map(ep -> (IPFSEndpoint)ep) + .findFirst().get(); + Assume.assumeTrue(ipfsEp.getIPFSClient().hasConnection()); } }