This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 5f37eca0c9b CAMEL-20421: camel-knative - Only create consumer/producer http facto… (#13148) 5f37eca0c9b is described below commit 5f37eca0c9b74078bfc48c1344fe2a76ec02a851 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Sat Feb 17 20:04:56 2024 +0100 CAMEL-20421: camel-knative - Only create consumer/producer http facto… (#13148) CAMEL-20421: camel-knative - Only create consumer/producer http factory when needed. Make camel-jbang able to detect only when knative consumer is in use and startup embedded HTTP server to make it work out of the box. --- .../camel/component/knative/KnativeComponent.java | 88 +++++-------- .../camel/component/knative/KnativeEndpoint.java | 18 +-- .../services/org/apache/camel/dev-console/knative | 2 + .../component/knative/http/KnativeHttpConsole.java | 141 +++++++++++++++++++++ .../knative/http/KnativeHttpConsumer.java | 14 +- .../knative/http/KnativeHttpConsumerFactory.java | 9 +- .../knative/http/KnativeHttpProducer.java | 16 ++- .../knative/http/KnativeHttpProducerFactory.java | 7 + .../component/knative/http/KnativeHttpTest.java | 11 -- .../knative/http/KnativeHttpTestSupport.java | 18 +-- .../platform/http/main/MainHttpServer.java | 17 ++- .../main/java/org/apache/camel/CamelContext.java | 8 ++ .../camel/impl/engine/AbstractCamelContext.java | 6 + .../camel/cli/connector/LocalCliConnector.java | 8 ++ .../jbang/core/commands/process/ListService.java | 12 +- .../java/org/apache/camel/main/KameletMain.java | 2 +- .../DependencyDownloaderClassResolver.java | 25 +++- .../DependencyDownloaderComponentResolver.java | 39 +----- .../camel/main/download/MainHttpServerFactory.java | 64 ++++++++++ .../main/download/ResourceResolverListener.java | 29 +++++ 20 files changed, 392 insertions(+), 142 deletions(-) diff --git a/components/camel-knative/camel-knative-component/src/main/java/org/apache/camel/component/knative/KnativeComponent.java b/components/camel-knative/camel-knative-component/src/main/java/org/apache/camel/component/knative/KnativeComponent.java index cc81602bbf6..2b68cd98efb 100644 --- a/components/camel-knative/camel-knative-component/src/main/java/org/apache/camel/component/knative/KnativeComponent.java +++ b/components/camel-knative/camel-knative-component/src/main/java/org/apache/camel/component/knative/KnativeComponent.java @@ -55,9 +55,6 @@ public class KnativeComponent extends HealthCheckComponent { @Metadata private KnativeConsumerFactory consumerFactory; - private boolean managedProducer; - private boolean managedConsumer; - public KnativeComponent() { this(null); } @@ -135,6 +132,13 @@ public class KnativeComponent extends HealthCheckComponent { return producerFactory; } + public synchronized KnativeProducerFactory getOrCreateProducerFactory() throws Exception { + if (producerFactory == null) { + producerFactory = setUpProducerFactory(); + } + return producerFactory; + } + /** * The protocol producer factory. */ @@ -146,6 +150,13 @@ public class KnativeComponent extends HealthCheckComponent { return consumerFactory; } + public synchronized KnativeConsumerFactory getOrCreateConsumerFactory() throws Exception { + if (consumerFactory == null) { + consumerFactory = setUpConsumerFactory(); + } + return consumerFactory; + } + /** * The protocol consumer factory. */ @@ -173,44 +184,19 @@ public class KnativeComponent extends HealthCheckComponent { @Override protected void doInit() throws Exception { super.doInit(); - - setUpProducerFactory(); - setUpConsumerFactory(); - - if (this.producerFactory != null && managedProducer) { - ServiceHelper.initService(this.producerFactory); - } - if (this.consumerFactory != null && managedConsumer) { - ServiceHelper.initService(this.consumerFactory); - } + ServiceHelper.initService(consumerFactory, producerFactory); } @Override protected void doStart() throws Exception { super.doStart(); - - if (this.producerFactory != null && managedProducer) { - ServiceHelper.startService(this.producerFactory); - } - if (this.consumerFactory != null && managedConsumer) { - ServiceHelper.startService(this.consumerFactory); - } - - if (this.producerFactory == null && this.consumerFactory == null) { - throw new IllegalStateException("No producer or consumer factory has been configured"); - } + ServiceHelper.startService(consumerFactory, producerFactory); } @Override protected void doStop() throws Exception { super.doStop(); - - if (this.producerFactory != null && managedProducer) { - ServiceHelper.stopService(this.producerFactory); - } - if (this.consumerFactory != null && managedConsumer) { - ServiceHelper.stopService(this.consumerFactory); - } + ServiceHelper.stopService(consumerFactory, producerFactory); } @SuppressWarnings({ "unchecked", "rawtypes" }) @@ -290,58 +276,44 @@ public class KnativeComponent extends HealthCheckComponent { return conf; } - private void setUpProducerFactory() throws Exception { + private KnativeProducerFactory setUpProducerFactory() throws Exception { if (producerFactory == null) { this.producerFactory = CamelContextHelper.lookup(getCamelContext(), protocol.name(), KnativeProducerFactory.class); - if (this.producerFactory == null) { this.producerFactory = getCamelContext() .getCamelContextExtension() .getBootstrapFactoryFinder(Knative.KNATIVE_TRANSPORT_RESOURCE_PATH) .newInstance(protocol.name() + "-producer", KnativeProducerFactory.class) - .orElse(null); - - if (this.producerFactory == null) { - return; - } - + .orElseThrow(() -> new IllegalArgumentException( + "Cannot create KnativeProducerFactory. Make sure camel-knative-http JAR is on classpath.")); if (configuration.getTransportOptions() != null) { setProperties(producerFactory, new HashMap<>(configuration.getTransportOptions())); } - - this.managedProducer = true; - - getCamelContext().addService(this.producerFactory); + getCamelContext().addService(this.producerFactory, true, true); } - - LOGGER.info("Using Knative producer factory: {} for protocol: {}", producerFactory, protocol.name()); + LOGGER.debug("Using Knative producer factory: {} for protocol: {}", producerFactory, protocol.name()); } + + return producerFactory; } - private void setUpConsumerFactory() throws Exception { + private KnativeConsumerFactory setUpConsumerFactory() throws Exception { if (consumerFactory == null) { this.consumerFactory = CamelContextHelper.lookup(getCamelContext(), protocol.name(), KnativeConsumerFactory.class); - if (this.consumerFactory == null) { this.consumerFactory = getCamelContext() .getCamelContextExtension() .getBootstrapFactoryFinder(Knative.KNATIVE_TRANSPORT_RESOURCE_PATH) .newInstance(protocol.name() + "-consumer", KnativeConsumerFactory.class) - .orElse(null); - - if (this.consumerFactory == null) { - return; - } + .orElseThrow(() -> new IllegalArgumentException( + "Cannot create KnativeConsumerFactory. Make sure camel-knative-http JAR is on classpath.")); if (configuration.getTransportOptions() != null) { setProperties(consumerFactory, new HashMap<>(configuration.getTransportOptions())); } - - this.managedConsumer = true; - - getCamelContext().addService(this.consumerFactory); + getCamelContext().addService(this.consumerFactory, true, true); } - - LOGGER.info("Using Knative consumer factory: {} for protocol: {}", consumerFactory, protocol.name()); + LOGGER.debug("Using Knative consumer factory: {} for protocol: {}", consumerFactory, protocol.name()); } + return consumerFactory; } } diff --git a/components/camel-knative/camel-knative-component/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java b/components/camel-knative/camel-knative-component/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java index 32bbb81aead..afa99dbb03d 100644 --- a/components/camel-knative/camel-knative-component/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java +++ b/components/camel-knative/camel-knative-component/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java @@ -87,10 +87,12 @@ public class KnativeEndpoint extends DefaultEndpoint { @Override public Producer createProducer() throws Exception { - final KnativeResource service = lookupServiceDefinition(Knative.EndpointKind.sink); + KnativeResource service = lookupServiceDefinition(Knative.EndpointKind.sink); + final Processor ceProcessor = cloudEventProcessor.producer(this, service); final Producer producer - = getComponent().getProducerFactory().createProducer(this, createTransportConfiguration(service), service); + = getComponent().getOrCreateProducerFactory().createProducer(this, createTransportConfiguration(service), + service); PropertyBindingSupport.build() .withCamelContext(getCamelContext()) @@ -106,6 +108,7 @@ public class KnativeEndpoint extends DefaultEndpoint { @Override public Consumer createConsumer(Processor processor) throws Exception { KnativeResource service = lookupServiceDefinition(Knative.EndpointKind.source); + Processor ceProcessor = cloudEventProcessor.consumer(this, service); Processor replyProcessor = configuration.isReplyWithCloudEvent() ? cloudEventProcessor.producer(this, service) : null; @@ -121,9 +124,12 @@ public class KnativeEndpoint extends DefaultEndpoint { = PluginHelper.getProcessorFactory(camelContext).createProcessor(camelContext, "Pipeline", new Object[] { list }); - Consumer consumer = getComponent().getConsumerFactory().createConsumer(this, + Consumer consumer = getComponent().getOrCreateConsumerFactory().createConsumer(this, createTransportConfiguration(service), service, pipeline); + // signal that this path is exposed for knative + String path = service.getPath(); + PropertyBindingSupport.build() .withCamelContext(camelContext) .withProperties(configuration.getTransportOptions()) @@ -133,15 +139,9 @@ public class KnativeEndpoint extends DefaultEndpoint { .bind(); configureConsumer(consumer); - return consumer; } - @Override - public boolean isSingleton() { - return true; - } - public Knative.Type getType() { return type; } diff --git a/components/camel-knative/camel-knative-http/src/generated/resources/META-INF/services/org/apache/camel/dev-console/knative b/components/camel-knative/camel-knative-http/src/generated/resources/META-INF/services/org/apache/camel/dev-console/knative new file mode 100644 index 00000000000..2444b43c411 --- /dev/null +++ b/components/camel-knative/camel-knative-http/src/generated/resources/META-INF/services/org/apache/camel/dev-console/knative @@ -0,0 +1,2 @@ +# Generated by camel build tools - do NOT edit this file! +class=org.apache.camel.component.knative.http.KnativeHttpConsole diff --git a/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsole.java b/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsole.java new file mode 100644 index 00000000000..324d6745586 --- /dev/null +++ b/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsole.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.knative.http; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import javax.management.MBeanServer; +import javax.management.ObjectName; + +import org.apache.camel.Consumer; +import org.apache.camel.Route; +import org.apache.camel.Service; +import org.apache.camel.api.management.ManagedCamelContext; +import org.apache.camel.spi.annotations.DevConsole; +import org.apache.camel.support.console.AbstractDevConsole; +import org.apache.camel.util.StringHelper; +import org.apache.camel.util.json.JsonObject; + +@DevConsole("knative") +public class KnativeHttpConsole extends AbstractDevConsole { + + public KnativeHttpConsole() { + super("camel", "knative", "Knative", "Knative HTTP Service"); + } + + @Override + protected String doCallText(Map<String, Object> options) { + StringBuilder sb = new StringBuilder(); + + // find if we use MainHttpServer and get its configuration for url + ManagedCamelContext mcc = getCamelContext().getCamelContextExtension().getContextPlugin(ManagedCamelContext.class); + if (mcc != null) { + String host = null; + int port = -1; + String path = null; + try { + Service service = getCamelContext().hasService(s -> s.getClass().getSimpleName().equals("MainHttpServer")); + if (service != null) { + MBeanServer mBeanServer = getCamelContext().getManagementStrategy().getManagementAgent().getMBeanServer(); + ObjectName on = getCamelContext().getManagementStrategy().getManagementObjectNameStrategy() + .getObjectNameForService(getCamelContext(), service); + host = (String) mBeanServer.getAttribute(on, "Host"); + port = (int) mBeanServer.getAttribute(on, "Port"); + path = (String) mBeanServer.getAttribute(on, "Path"); + } + } catch (Exception e) { + // ignore + } + + List<Consumer> list = getCamelContext().getRoutes() + .stream().map(Route::getConsumer) + .filter(c -> KnativeHttpConsumer.class.getName().equals(c.getClass().getName())) + .toList(); + + for (Consumer c : list) { + KnativeHttpConsumer knc = (KnativeHttpConsumer) c; + if (host != null) { + String p = path != null ? path + "/" + knc.getPath() : knc.getPath(); + // remove leading slashes + p = StringHelper.removeStartingCharacters(p, '/'); + sb.append(String.format(" %s://%s:%d/%s\n", "http", host, port, p)); + } else { + sb.append(String.format(" %s://%s\n", "http", knc.getPath())); + } + + } + } + + return sb.toString(); + } + + @Override + protected JsonObject doCallJson(Map<String, Object> options) { + JsonObject root = new JsonObject(); + + ManagedCamelContext mcc = getCamelContext().getCamelContextExtension().getContextPlugin(ManagedCamelContext.class); + if (mcc != null) { + String host = null; + int port = -1; + String path = null; + try { + Service service = getCamelContext().hasService(s -> s.getClass().getSimpleName().equals("MainHttpServer")); + if (service != null) { + MBeanServer mBeanServer = getCamelContext().getManagementStrategy().getManagementAgent().getMBeanServer(); + ObjectName on = getCamelContext().getManagementStrategy().getManagementObjectNameStrategy() + .getObjectNameForService(getCamelContext(), service); + host = (String) mBeanServer.getAttribute(on, "Host"); + port = (int) mBeanServer.getAttribute(on, "Port"); + path = (String) mBeanServer.getAttribute(on, "Path"); + } + } catch (Exception e) { + // ignore + } + List<Consumer> list = getCamelContext().getRoutes() + .stream().map(Route::getConsumer) + .filter(c -> KnativeHttpConsumer.class.getName().equals(c.getClass().getName())) + .toList(); + + List<JsonObject> arr = new ArrayList<>(); + for (Consumer c : list) { + KnativeHttpConsumer knc = (KnativeHttpConsumer) c; + + JsonObject jo = new JsonObject(); + jo.put("protocol", "http"); + if (host != null) { + jo.put("host", host); + } + if (port != -1) { + jo.put("port", port); + } + String p = path != null ? path + "/" + knc.getPath() : knc.getPath(); + // remove leading slashes + p = StringHelper.removeStartingCharacters(p, '/'); + jo.put("path", p); + arr.add(jo); + } + if (!arr.isEmpty()) { + root.put("consumers", arr); + } + } + + return root; + } + +} diff --git a/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java b/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java index f7525322b16..19862f53002 100644 --- a/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java +++ b/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumer.java @@ -39,6 +39,8 @@ import org.apache.camel.Message; import org.apache.camel.NoTypeConversionAvailableException; import org.apache.camel.Processor; import org.apache.camel.TypeConverter; +import org.apache.camel.api.management.ManagedAttribute; +import org.apache.camel.api.management.ManagedResource; import org.apache.camel.component.knative.spi.KnativeResource; import org.apache.camel.component.knative.spi.KnativeTransportConfiguration; import org.apache.camel.spi.HeaderFilterStrategy; @@ -52,6 +54,7 @@ import org.slf4j.LoggerFactory; import static org.apache.camel.util.CollectionHelper.appendEntry; +@ManagedResource(description = "Managed KnativeHttpConsumer") public class KnativeHttpConsumer extends DefaultConsumer { private static final Logger LOGGER = LoggerFactory.getLogger(KnativeHttpConsumer.class); @@ -60,6 +63,7 @@ public class KnativeHttpConsumer extends DefaultConsumer { private final KnativeResource resource; private final Supplier<Router> router; private final HeaderFilterStrategy headerFilterStrategy; + private volatile String path; private String basePath; private Route route; @@ -80,6 +84,12 @@ public class KnativeHttpConsumer extends DefaultConsumer { this.preallocateBodyBuffer = true; } + @ManagedAttribute(description = "Path for accessing the Knative service") + public String getPath() { + return path; + } + + @ManagedAttribute(description = "Base path") public String getBasePath() { return basePath; } @@ -88,6 +98,7 @@ public class KnativeHttpConsumer extends DefaultConsumer { this.basePath = basePath; } + @ManagedAttribute(description = "Maximum body size") public BigInteger getMaxBodySize() { return maxBodySize; } @@ -96,6 +107,7 @@ public class KnativeHttpConsumer extends DefaultConsumer { this.maxBodySize = maxBodySize; } + @ManagedAttribute(description = "Preallocate body buffer") public boolean isPreallocateBodyBuffer() { return preallocateBodyBuffer; } @@ -107,7 +119,7 @@ public class KnativeHttpConsumer extends DefaultConsumer { @Override protected void doStart() throws Exception { if (route == null) { - String path = resource.getPath(); + path = resource.getPath(); if (ObjectHelper.isEmpty(path)) { path = "/"; } diff --git a/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumerFactory.java b/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumerFactory.java index e751ab3f06a..b05719bb22e 100644 --- a/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumerFactory.java +++ b/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpConsumerFactory.java @@ -34,6 +34,13 @@ public class KnativeHttpConsumerFactory extends ServiceSupport implements CamelC private Router router; private CamelContext camelContext; + public KnativeHttpConsumerFactory() { + } + + public KnativeHttpConsumerFactory(CamelContext camelContext) { + this.camelContext = camelContext; + } + public Router getRouter() { return router; } @@ -71,8 +78,6 @@ public class KnativeHttpConsumerFactory extends ServiceSupport implements CamelC /** * Resolve router from given Camel context if not explicitly set. KnativeHttpConsumer implementation usually calls * this method to retrieve the router during service startup phase. - * - * @return */ private Router lookupRouter() { if (router == null) { diff --git a/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java b/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java index abdeb3e9e0b..68e6bf7504b 100644 --- a/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java +++ b/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducer.java @@ -34,6 +34,8 @@ import org.apache.camel.Exchange; import org.apache.camel.InvalidPayloadException; import org.apache.camel.Message; import org.apache.camel.RuntimeCamelException; +import org.apache.camel.api.management.ManagedAttribute; +import org.apache.camel.api.management.ManagedResource; import org.apache.camel.component.knative.spi.KnativeResource; import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.support.DefaultAsyncProducer; @@ -43,6 +45,7 @@ import org.apache.camel.util.URISupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@ManagedResource(description = "Managed KnativeHttpProducer") public class KnativeHttpProducer extends DefaultAsyncProducer { private static final Logger LOGGER = LoggerFactory.getLogger(KnativeHttpProducer.class); @@ -68,23 +71,25 @@ public class KnativeHttpProducer extends DefaultAsyncProducer { this.headerFilterStrategy = new KnativeHttpHeaderFilterStrategy(); } + @ManagedAttribute(description = "Url for calling the Knative HTTP service") + public String getUrl() { + return uri; + } + @Override public boolean process(Exchange exchange, AsyncCallback callback) { if (exchange.getMessage().getBody() == null) { exchange.setException(new IllegalArgumentException("body must not be null")); callback.done(true); - return true; } final byte[] payload; - try { payload = exchange.getMessage().getMandatoryBody(byte[].class); } catch (InvalidPayloadException e) { exchange.setException(e); callback.done(true); - return true; } @@ -153,17 +158,15 @@ public class KnativeHttpProducer extends DefaultAsyncProducer { @Override protected void doInit() throws Exception { + super.doInit(); this.uri = getUrl(serviceDefinition); this.host = getHost(serviceDefinition); this.client = WebClient.create(vertx, clientOptions); - - super.doInit(); } @Override protected void doStop() throws Exception { super.doStop(); - if (this.client != null) { LOGGER.debug("Shutting down client: {}", client); this.client.close(); @@ -194,7 +197,6 @@ public class KnativeHttpProducer extends DefaultAsyncProducer { private String getHost(KnativeResource definition) { String url = getUrl(definition); - try { return new URL(url).getHost(); } catch (MalformedURLException e) { diff --git a/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducerFactory.java b/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducerFactory.java index 7baae79d835..d5cbe2393ef 100644 --- a/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducerFactory.java +++ b/components/camel-knative/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpProducerFactory.java @@ -35,6 +35,13 @@ public class KnativeHttpProducerFactory extends ServiceSupport implements CamelC private WebClientOptions vertxHttpClientOptions; private CamelContext camelContext; + public KnativeHttpProducerFactory() { + } + + public KnativeHttpProducerFactory(CamelContext camelContext) { + this.camelContext = camelContext; + } + public Vertx getVertx() { return vertx; } diff --git a/components/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java b/components/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java index 68667d417ae..f13821d2832 100644 --- a/components/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java +++ b/components/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTest.java @@ -46,7 +46,6 @@ import org.apache.camel.test.AvailablePortFinder; import org.apache.camel.util.ObjectHelper; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; @@ -108,16 +107,6 @@ public class KnativeHttpTest { // // ************************** - @Test - void testCreateComponent() { - context.start(); - - assertThat(context.getComponent("knative")).isInstanceOfSatisfying(KnativeComponent.class, c -> { - assertThat(c.getProducerFactory()).isInstanceOf(KnativeHttpProducerFactory.class); - assertThat(c.getConsumerFactory()).isInstanceOf(KnativeHttpConsumerFactory.class); - }); - } - void doTestKnativeSource(CloudEvent ce, String basePath, String path) throws Exception { KnativeComponent component = configureKnativeComponent( context, diff --git a/components/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTestSupport.java b/components/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTestSupport.java index b79c05ddb90..c5c8a33c1a0 100644 --- a/components/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTestSupport.java +++ b/components/camel-knative/camel-knative-http/src/test/java/org/apache/camel/component/knative/http/KnativeHttpTestSupport.java @@ -21,15 +21,10 @@ import java.util.List; import java.util.Map; import org.apache.camel.CamelContext; -import org.apache.camel.Consumer; -import org.apache.camel.Endpoint; -import org.apache.camel.Processor; -import org.apache.camel.Producer; import org.apache.camel.component.cloudevents.CloudEvent; import org.apache.camel.component.knative.KnativeComponent; import org.apache.camel.component.knative.spi.KnativeEnvironment; import org.apache.camel.component.knative.spi.KnativeResource; -import org.apache.camel.component.knative.spi.KnativeTransportConfiguration; import org.apache.camel.component.platform.http.PlatformHttpComponent; import org.apache.camel.component.platform.http.PlatformHttpConstants; import org.apache.camel.component.platform.http.vertx.VertxPlatformHttpEngine; @@ -61,19 +56,18 @@ public final class KnativeHttpTestSupport { KnativeComponent component = context.getComponent("knative", KnativeComponent.class); component.setCloudEventsSpecVersion(ce.version()); component.setEnvironment(environment); - component.setConsumerFactory(new KnativeHttpConsumerFactory() { + component.setConsumerFactory(new KnativeHttpConsumerFactory(context) { @Override - public Consumer createConsumer( - Endpoint endpoint, KnativeTransportConfiguration config, KnativeResource service, Processor processor) { + protected void doBuild() throws Exception { + super.doBuild(); this.setRouter(VertxPlatformHttpRouter.lookup(context)); - return super.createConsumer(endpoint, config, service, processor); } }); - component.setProducerFactory(new KnativeHttpProducerFactory() { + component.setProducerFactory(new KnativeHttpProducerFactory(context) { @Override - public Producer createProducer(Endpoint endpoint, KnativeTransportConfiguration config, KnativeResource service) { + protected void doBuild() throws Exception { + super.doBuild(); this.setVertx(VertxPlatformHttpRouter.lookup(context).vertx()); - return super.createProducer(endpoint, config, service); } }); diff --git a/components/camel-platform-http-main/src/main/java/org/apache/camel/component/platform/http/main/MainHttpServer.java b/components/camel-platform-http-main/src/main/java/org/apache/camel/component/platform/http/main/MainHttpServer.java index 31c4295d1f1..b93e12aff0d 100644 --- a/components/camel-platform-http-main/src/main/java/org/apache/camel/component/platform/http/main/MainHttpServer.java +++ b/components/camel-platform-http-main/src/main/java/org/apache/camel/component/platform/http/main/MainHttpServer.java @@ -52,9 +52,10 @@ import io.vertx.ext.web.impl.Utils; import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; import org.apache.camel.Exchange; -import org.apache.camel.NonManagedService; import org.apache.camel.StartupListener; import org.apache.camel.StaticService; +import org.apache.camel.api.management.ManagedAttribute; +import org.apache.camel.api.management.ManagedResource; import org.apache.camel.component.platform.http.HttpEndpointModel; import org.apache.camel.component.platform.http.PlatformHttpComponent; import org.apache.camel.component.platform.http.main.jolokia.JolokiaHttpRequestHandlerSupport; @@ -83,7 +84,8 @@ import org.json.simple.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class MainHttpServer extends ServiceSupport implements CamelContextAware, StaticService, NonManagedService { +@ManagedResource(description = "Camel Main Embedded HTTP server") +public class MainHttpServer extends ServiceSupport implements CamelContextAware, StaticService { private static final Logger LOG = LoggerFactory.getLogger(MainHttpServer.class); @@ -119,6 +121,7 @@ public class MainHttpServer extends ServiceSupport implements CamelContextAware, this.configuration = configuration; } + @ManagedAttribute(description = "Whether dev console is enabled (/q/dev)") public boolean isDevConsoleEnabled() { return devConsoleEnabled; } @@ -130,10 +133,12 @@ public class MainHttpServer extends ServiceSupport implements CamelContextAware, this.devConsoleEnabled = devConsoleEnabled; } + @ManagedAttribute(description = "Whether health check is enabled (q/health)") public boolean isHealthCheckEnabled() { return healthCheckEnabled; } + @ManagedAttribute(description = "Whether Jolokia is enabled (q/jolokia)") public boolean isJolokiaEnabled() { return jolokiaEnabled; } @@ -152,6 +157,7 @@ public class MainHttpServer extends ServiceSupport implements CamelContextAware, this.jolokiaEnabled = jolokiaEnabledEnabled; } + @ManagedAttribute(description = "Whether metrics is enabled (q/metric)") public boolean isMetricsEnabled() { return metricsEnabled; } @@ -163,6 +169,7 @@ public class MainHttpServer extends ServiceSupport implements CamelContextAware, this.metricsEnabled = metricsEnabled; } + @ManagedAttribute(description = "Whether file upload is enabled (only for development) (q/upload)") public boolean isUploadEnabled() { return uploadEnabled; } @@ -174,6 +181,7 @@ public class MainHttpServer extends ServiceSupport implements CamelContextAware, this.uploadEnabled = uploadEnabled; } + @ManagedAttribute(description = "Directory for upload.") public String getUploadSourceDir() { return uploadSourceDir; } @@ -185,6 +193,7 @@ public class MainHttpServer extends ServiceSupport implements CamelContextAware, this.uploadSourceDir = uploadSourceDir; } + @ManagedAttribute(description = "HTTP server port number") public int getPort() { return configuration.getBindPort(); } @@ -193,6 +202,7 @@ public class MainHttpServer extends ServiceSupport implements CamelContextAware, configuration.setBindPort(port); } + @ManagedAttribute(description = "HTTP server hostname") public String getHost() { return configuration.getBindHost(); } @@ -201,6 +211,7 @@ public class MainHttpServer extends ServiceSupport implements CamelContextAware, configuration.setBindHost(host); } + @ManagedAttribute(description = "HTTP server base path") public String getPath() { return configuration.getPath(); } @@ -209,6 +220,7 @@ public class MainHttpServer extends ServiceSupport implements CamelContextAware, configuration.setPath(path); } + @ManagedAttribute(description = "HTTP server maximum body size") public Long getMaxBodySize() { return configuration.getMaxBodySize(); } @@ -225,6 +237,7 @@ public class MainHttpServer extends ServiceSupport implements CamelContextAware, configuration.setSslContextParameters(sslContextParameters); } + @ManagedAttribute(description = "HTTP server using global SSL context parameters") public boolean isUseGlobalSslContextParameters() { return configuration.isUseGlobalSslContextParameters(); } diff --git a/core/camel-api/src/main/java/org/apache/camel/CamelContext.java b/core/camel-api/src/main/java/org/apache/camel/CamelContext.java index a3cb4bcf99b..1ade841251b 100644 --- a/core/camel-api/src/main/java/org/apache/camel/CamelContext.java +++ b/core/camel-api/src/main/java/org/apache/camel/CamelContext.java @@ -270,6 +270,14 @@ public interface CamelContext extends CamelContextLifecycle, RuntimeConfiguratio */ boolean hasService(Object object); + /** + * Finds the first service matching the filter + * + * @param filter the filter + * @return the service if found or null if none found + */ + Service hasService(java.util.function.Predicate<Service> filter); + /** * Has the given service type already been added to this CamelContext? * diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java index 118af831990..24cb8e2af08 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java @@ -38,6 +38,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; @@ -1354,6 +1355,11 @@ public abstract class AbstractCamelContext extends BaseService return internalServiceManager.hasServices(type); } + @Override + public Service hasService(Predicate<Service> filter) { + return internalServiceManager.getServices().stream().filter(filter).findFirst().orElse(null); + } + @Override public void deferStartService(Object object, boolean stopOnShutdown) { internalServiceManager.deferStartService(this, object, stopOnShutdown, false); diff --git a/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java b/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java index df051eb3318..593f8369f71 100644 --- a/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java +++ b/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java @@ -1112,6 +1112,14 @@ public class LocalCliConnector extends ServiceSupport implements CliConnector, C root.put("mllp", json); } } + // knative is optional + dc = PluginHelper.getDevConsoleResolver(camelContext).lookupDevConsole("knative"); + if (dc.isPresent()) { + JsonObject json = (JsonObject) dc.get().call(DevConsole.MediaType.JSON); + if (json != null) { + root.put("knative", json); + } + } return root; } diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListService.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListService.java index 7cdd1889e03..1e9879f9d31 100644 --- a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListService.java +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListService.java @@ -94,6 +94,7 @@ public class ListService extends ProcessWatchCommand { fetchServices(root, row, "netty", rows); fetchServices(root, row, "mina", rows); fetchServices(root, row, "mllp", rows); + fetchServices(root, row, "knative", rows); } }); @@ -126,7 +127,16 @@ public class ListService extends ProcessWatchCommand { jo = (JsonObject) o; row.component = component; row.protocol = jo.getString("protocol"); - row.service = row.protocol + ":" + jo.getString("host") + ":" + jo.getInteger("port"); + String p = row.protocol + ":"; + if (p.startsWith("http")) { + // we want double slashes for http protocols + p = p + "//"; + } + row.service = p + jo.getString("host") + ":" + jo.getInteger("port"); + String path = jo.getString("path"); + if (path != null) { + row.service += "/" + path; + } rows.add(row); } } diff --git a/dsl/camel-kamelet-main/src/main/java/org/apache/camel/main/KameletMain.java b/dsl/camel-kamelet-main/src/main/java/org/apache/camel/main/KameletMain.java index 53f73531fa2..ffa1d3ffacb 100644 --- a/dsl/camel-kamelet-main/src/main/java/org/apache/camel/main/KameletMain.java +++ b/dsl/camel-kamelet-main/src/main/java/org/apache/camel/main/KameletMain.java @@ -548,7 +548,7 @@ public class KameletMain extends MainCommandLineSupport { new DependencyDownloaderStrategy(answer)); // download class-resolver - ClassResolver classResolver = new DependencyDownloaderClassResolver(answer, knownDeps); + ClassResolver classResolver = new DependencyDownloaderClassResolver(answer, knownDeps, silent); answer.setClassResolver(classResolver); // re-create factory finder with download class-resolver FactoryFinderResolver ffr = PluginHelper.getFactoryFinderResolver(answer); diff --git a/dsl/camel-kamelet-main/src/main/java/org/apache/camel/main/download/DependencyDownloaderClassResolver.java b/dsl/camel-kamelet-main/src/main/java/org/apache/camel/main/download/DependencyDownloaderClassResolver.java index fa700f58e60..330728cf27c 100644 --- a/dsl/camel-kamelet-main/src/main/java/org/apache/camel/main/download/DependencyDownloaderClassResolver.java +++ b/dsl/camel-kamelet-main/src/main/java/org/apache/camel/main/download/DependencyDownloaderClassResolver.java @@ -17,6 +17,8 @@ package org.apache.camel.main.download; import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; import org.apache.camel.CamelContext; import org.apache.camel.impl.engine.DefaultClassResolver; @@ -25,18 +27,25 @@ import org.apache.camel.util.ObjectHelper; public final class DependencyDownloaderClassResolver extends DefaultClassResolver { + private final List<ResourceResolverListener> resourceResolverListeners = new ArrayList<>(); private final KnownDependenciesResolver knownDependenciesResolver; private final DependencyDownloader downloader; + private final boolean silent; public DependencyDownloaderClassResolver(CamelContext camelContext, - KnownDependenciesResolver knownDependenciesResolver) { + KnownDependenciesResolver knownDependenciesResolver, + boolean silent) { super(camelContext); this.downloader = camelContext.hasService(DependencyDownloader.class); this.knownDependenciesResolver = knownDependenciesResolver; + this.silent = silent; + this.resourceResolverListeners.add(new KNativeHttpServerFactory()); } @Override public InputStream loadResourceAsStream(String uri) { + resourceResolverListeners.forEach(l -> l.onLoadResourceAsStream(uri)); + InputStream answer = null; try { answer = super.loadResourceAsStream(uri); @@ -93,4 +102,18 @@ public final class DependencyDownloaderClassResolver extends DefaultClassResolve return answer; } + private class KNativeHttpServerFactory implements ResourceResolverListener { + + @Override + public void onLoadResourceAsStream(String uri) { + try { + if ("META-INF/services/org/apache/camel/knative/transport/http-consumer".equals(uri)) { + MainHttpServerFactory.setupHttpServer(getCamelContext(), silent); + } + } catch (Exception e) { + // ignore + } + } + } + } diff --git a/dsl/camel-kamelet-main/src/main/java/org/apache/camel/main/download/DependencyDownloaderComponentResolver.java b/dsl/camel-kamelet-main/src/main/java/org/apache/camel/main/download/DependencyDownloaderComponentResolver.java index 8fd9e2536fb..06bc65678ac 100644 --- a/dsl/camel-kamelet-main/src/main/java/org/apache/camel/main/download/DependencyDownloaderComponentResolver.java +++ b/dsl/camel-kamelet-main/src/main/java/org/apache/camel/main/download/DependencyDownloaderComponentResolver.java @@ -20,17 +20,11 @@ import java.util.List; import org.apache.camel.CamelContext; import org.apache.camel.Component; -import org.apache.camel.Service; import org.apache.camel.catalog.CamelCatalog; import org.apache.camel.catalog.DefaultCamelCatalog; import org.apache.camel.component.platform.http.PlatformHttpComponent; -import org.apache.camel.component.platform.http.main.MainHttpServer; import org.apache.camel.component.stub.StubComponent; import org.apache.camel.impl.engine.DefaultComponentResolver; -import org.apache.camel.main.HttpServerConfigurationProperties; -import org.apache.camel.main.MainConstants; -import org.apache.camel.main.MainHttpServerFactory; -import org.apache.camel.main.util.CamelJBangSettingsHelper; import org.apache.camel.main.util.SuggestSimilarHelper; import org.apache.camel.tooling.model.ComponentModel; @@ -75,25 +69,8 @@ public final class DependencyDownloaderComponentResolver extends DefaultComponen sc.setShadow(true); sc.setShadowPattern(stubPattern); } - if (answer instanceof PlatformHttpComponent || name.equals("knative")) { - // set up a default http server on configured port if not already done - MainHttpServer server = camelContext.hasService(MainHttpServer.class); - if (server == null) { - // need to capture that we use a http-server - HttpServerConfigurationProperties config = new HttpServerConfigurationProperties(null); - CamelJBangSettingsHelper.writeSettingsIfNotExists("camel.jbang.platform-http.port", - String.valueOf(config.getPort())); - if (!silent) { - try { - // enable http server if not silent - MainHttpServerFactory factory = resolveMainHttpServerFactory(camelContext); - Service httpServer = factory.newHttpServer(config); - camelContext.addService(httpServer, true, true); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - } + if (answer instanceof PlatformHttpComponent) { + MainHttpServerFactory.setupHttpServer(camelContext, silent); } if (answer == null) { List<String> suggestion = SuggestSimilarHelper.didYouMean(catalog.findComponentNames(), name); @@ -120,16 +97,4 @@ public final class DependencyDownloaderComponentResolver extends DefaultComponen return ACCEPTED_STUB_NAMES.contains(name); } - private static MainHttpServerFactory resolveMainHttpServerFactory(CamelContext camelContext) throws Exception { - // lookup in service registry first - MainHttpServerFactory answer = camelContext.getRegistry().findSingleByType(MainHttpServerFactory.class); - if (answer == null) { - answer = camelContext.getCamelContextExtension().getBootstrapFactoryFinder() - .newInstance(MainConstants.PLATFORM_HTTP_SERVER, MainHttpServerFactory.class) - .orElseThrow(() -> new IllegalArgumentException( - "Cannot find MainHttpServerFactory on classpath. Add camel-platform-http-main to classpath.")); - } - return answer; - } - } diff --git a/dsl/camel-kamelet-main/src/main/java/org/apache/camel/main/download/MainHttpServerFactory.java b/dsl/camel-kamelet-main/src/main/java/org/apache/camel/main/download/MainHttpServerFactory.java new file mode 100644 index 00000000000..8cbba6d1f00 --- /dev/null +++ b/dsl/camel-kamelet-main/src/main/java/org/apache/camel/main/download/MainHttpServerFactory.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.main.download; + +import org.apache.camel.CamelContext; +import org.apache.camel.Service; +import org.apache.camel.component.platform.http.main.MainHttpServer; +import org.apache.camel.main.HttpServerConfigurationProperties; +import org.apache.camel.main.MainConstants; +import org.apache.camel.main.util.CamelJBangSettingsHelper; + +public class MainHttpServerFactory { + + public static MainHttpServer setupHttpServer(CamelContext camelContext, boolean silent) { + // set up a default http server on configured port if not already done + MainHttpServer server = camelContext.hasService(MainHttpServer.class); + if (server == null) { + // need to capture that we use a http-server + HttpServerConfigurationProperties config = new HttpServerConfigurationProperties(null); + CamelJBangSettingsHelper.writeSettingsIfNotExists("camel.jbang.platform-http.port", + String.valueOf(config.getPort())); + if (!silent) { + try { + // enable http server if not silent + org.apache.camel.main.MainHttpServerFactory factory = resolveMainHttpServerFactory(camelContext); + Service httpServer = factory.newHttpServer(config); + camelContext.addService(httpServer, true, true); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + return server; + } + + private static org.apache.camel.main.MainHttpServerFactory resolveMainHttpServerFactory(CamelContext camelContext) + throws Exception { + // lookup in service registry first + org.apache.camel.main.MainHttpServerFactory answer + = camelContext.getRegistry().findSingleByType(org.apache.camel.main.MainHttpServerFactory.class); + if (answer == null) { + answer = camelContext.getCamelContextExtension().getBootstrapFactoryFinder() + .newInstance(MainConstants.PLATFORM_HTTP_SERVER, org.apache.camel.main.MainHttpServerFactory.class) + .orElseThrow(() -> new IllegalArgumentException( + "Cannot find MainHttpServerFactory on classpath. Add camel-platform-http-main to classpath.")); + } + return answer; + } + +} diff --git a/dsl/camel-kamelet-main/src/main/java/org/apache/camel/main/download/ResourceResolverListener.java b/dsl/camel-kamelet-main/src/main/java/org/apache/camel/main/download/ResourceResolverListener.java new file mode 100644 index 00000000000..c75b8fb97ee --- /dev/null +++ b/dsl/camel-kamelet-main/src/main/java/org/apache/camel/main/download/ResourceResolverListener.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.main.download; + +/** + * Listener when resources are loaded (eg from src/main/resources) + */ +@FunctionalInterface +public interface ResourceResolverListener { + + /** + * Invoked when a resource is attempted to be loaded. + */ + void onLoadResourceAsStream(String uri); +}