This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit b1ecca580091da7fc52b9e68458f29944c778326 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Mon May 19 15:56:51 2025 +0200 CAMEL-22074: camel-jbang - Send command should be able to send without any existing running Camel. --- .../apache/camel/impl/console/SendDevConsole.java | 2 +- .../ROOT/pages/camel-4x-upgrade-guide-4_12.adoc | 7 +- .../modules/ROOT/pages/camel-jbang.adoc | 36 +++- .../camel/cli/connector/LocalCliConnector.java | 16 +- .../apache/camel/dsl/jbang/core/commands/Run.java | 4 +- .../core/commands/action/ActionBaseCommand.java | 4 +- .../jbang/core/commands/action/CamelBeanDump.java | 2 +- .../core/commands/action/CamelSendAction.java | 215 ++++++++++++++------- .../commands/action/TransformMessageAction.java | 2 +- 9 files changed, 205 insertions(+), 83 deletions(-) diff --git a/core/camel-console/src/main/java/org/apache/camel/impl/console/SendDevConsole.java b/core/camel-console/src/main/java/org/apache/camel/impl/console/SendDevConsole.java index 2619258ec30..cc8cb9b9f8e 100644 --- a/core/camel-console/src/main/java/org/apache/camel/impl/console/SendDevConsole.java +++ b/core/camel-console/src/main/java/org/apache/camel/impl/console/SendDevConsole.java @@ -206,7 +206,7 @@ public class SendDevConsole extends AbstractDevConsole { } catch (Exception e) { cause = e; } - if (endpoint != null && target == null) { + if (cause == null && endpoint != null && target == null) { cause = new NoSuchEndpointException(endpoint); } if (out != null && out.getException() != null) { diff --git a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_12.adoc b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_12.adoc index b2baf4d9228..b1da84595ce 100644 --- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_12.adoc +++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_12.adoc @@ -167,6 +167,11 @@ Removed deprecated `camel-k:` specific modelines. The default unmarshalType has been changed from `HashMap` to `LinkedHashMap` that keeps ordering of the elements so they have similar order as the input document. +=== camel-jbang + +The `camel cmd send` command now sends the message without the need for any existing running Camel integration. +To use any existing integration then specify the integration name or PID. + === camel-micrometer The `tags` parameter has been _fixed_ to be multivalued and tooling friendly. So @@ -215,7 +220,7 @@ TIP: The `path` can still be specified in the URI as backwards compatible. === Component Verifier Extensions and Metadata Extension -All of the component verifier extensions and metadata extensions have been removed from components. +All the component verifier extensions and metadata extensions have been removed from components. There is one single exception for ServiceNow Metadata Extension since it is used in a MOJO. diff --git a/docs/user-manual/modules/ROOT/pages/camel-jbang.adoc b/docs/user-manual/modules/ROOT/pages/camel-jbang.adoc index 3060aa9174b..ba40d5a4225 100644 --- a/docs/user-manual/modules/ROOT/pages/camel-jbang.adoc +++ b/docs/user-manual/modules/ROOT/pages/camel-jbang.adoc @@ -1340,15 +1340,15 @@ Suppose you have a Camel route that consumes messages from an external MQTT brok In the example above the MQTT broker is running on hostname `mybroker` port 1883. -The idea with the `camel cmd send` command is to _tap into_ an existing running Camel integration, +The idea with the `camel cmd send` command is to _tap into_ an existing running Camel integration (by name or PID), and reuse an existing endpoint (if possible). In this example, we want to use the existing configuration to avoid having to configure this again. -By executing the following from a shell +By executing the following from a shell, we send the message to the existing running Camel integration named mqtt: [source,bash] ---- -$ camel cmd send --body=file:payload.json mqtt +$ camel cmd send mqtt --body=file:payload.json mqtt ---- We can send a message where the payload is loaded from a file (`_payload.json_`). You can also specify the payload in the CLI @@ -1372,14 +1372,14 @@ For example, to pick the first route by _route id_: [source,bash] ---- -$ camel cmd send --body=file:payload.json --endpoint=route1 mqtt +$ camel cmd send mqtt --body=file:payload.json --endpoint=route1 ---- Or to pick the first route that uses mqtt component: [source,bash] ---- -$ camel cmd send --body=file:payload.json --endpoint=mqtt mqtt +$ camel cmd send mqtt --body=file:payload.json --endpoint=mqtt ---- We are fortunate in this situation as the endpoint can be used as both a _consumer_ and _producer_ in Camel, @@ -1387,7 +1387,31 @@ and therefore we are able to send the message to the MQTT broker via `tcp://mybr TIP: See more options with `camel cmd send --help`. -The source for this example is provided on GitHub at https://github.com/apache/camel-kamelets-examples/tree/main/jbang/mqtt)[camel-jbang MQTT example]. +The source for this example is provided on GitHub at https://github.com/apache/camel-jbang-examples/tree/main/mqtt[https://github.com/apache/camel-kamelets-examples/tree/main/jbang/mqtt])[camel-jbang MQTT example]. + +==== Sending messages without any running Camel + +*Available since Camel 4.12* + +In Camel 4.12 you can use `camel cmd send` without any existing running Camel integration, which then +will automatic startup a internal Camel and use that for sending the message. This allows to send a message +using all the Camel components or Kamelets. However, this requires that all the necessary configuration can +be provided from the CLI (which can be cumbersome for some components). + +This is done by **NOT** specifying any integration name or PID in the command such as: + +[source,bash] +---- +$ camel cmd send --body=file:payload.json --uri='paho-mqtt5:temperature?brokerUrl=tcp://mybroker:1883' +---- + +As you can see this is more cumbersome as we need to provide all the configurations, which can be more complex +for components like Kafka or AWS. So for these you may want to use a kamelet instead: + +[source,bash] +---- +$ camel cmd send --body=file:payload.json --uri='kamelet:mqtt-sink?brokerUrl=tcp://mybroker:1883&topic=temperature' +---- ==== Poll messages via Camel diff --git a/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java b/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java index 30f7d604617..eb2434e0c1a 100644 --- a/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java +++ b/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java @@ -506,6 +506,13 @@ public class LocalCliConnector extends ServiceSupport implements CliConnector, C DevConsole dc = camelContext.getCamelContextExtension().getContextPlugin(DevConsoleRegistry.class) .resolveById("send"); if (dc != null) { + + // wait for camel to be ready + StopWatch watch = new StopWatch(); + while (!camelContext.isStarted() && watch.taken() < 5000) { + Thread.sleep(100); + } + JsonObject json; String endpoint = root.getString("endpoint"); String body = root.getString("body"); @@ -692,10 +699,17 @@ public class LocalCliConnector extends ServiceSupport implements CliConnector, C } } - private void doActionReceiveTask(JsonObject root) throws IOException { + private void doActionReceiveTask(JsonObject root) throws Exception { DevConsole dc = camelContext.getCamelContextExtension().getContextPlugin(DevConsoleRegistry.class) .resolveById("receive"); if (dc != null) { + + // wait for camel to be ready + StopWatch watch = new StopWatch(); + while (!camelContext.isStarted() && watch.taken() < 5000) { + Thread.sleep(100); + } + JsonObject json; String endpoint = root.getString("endpoint"); if (endpoint != null) { diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/Run.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/Run.java index 27e14f08fd7..607c9acfd8d 100644 --- a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/Run.java +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/Run.java @@ -247,14 +247,14 @@ public class Run extends CamelCommand { List<String> loggingCategory = new ArrayList<>(); @Option(names = { "--max-messages" }, defaultValue = "0", description = "Max number of messages to process before stopping") - int maxMessages; + public int maxMessages; @Option(names = { "--max-seconds" }, defaultValue = "0", description = "Max seconds to run before stopping") int maxSeconds; @Option(names = { "--max-idle-seconds" }, defaultValue = "0", description = "For how long time in seconds Camel can be idle before stopping") - int maxIdleSeconds; + public int maxIdleSeconds; @Option(names = { "--reload", "--dev" }, description = "Enables dev mode (live reload when source files are updated and saved)") diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/ActionBaseCommand.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/ActionBaseCommand.java index b55f2adfd84..00bc6d1c0fe 100644 --- a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/ActionBaseCommand.java +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/ActionBaseCommand.java @@ -16,6 +16,7 @@ */ package org.apache.camel.dsl.jbang.core.commands.action; +import java.io.File; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; @@ -40,11 +41,12 @@ abstract class ActionBaseCommand extends CamelCommand { protected static JsonObject getJsonObject(Path outputFile) { StopWatch watch = new StopWatch(); while (watch.taken() < 5000) { + File f = outputFile.toFile(); try { // give time for response to be ready Thread.sleep(100); - if (Files.exists(outputFile)) { + if (Files.exists(outputFile) && f.length() > 0) { String text = Files.readString(outputFile); return (JsonObject) Jsoner.deserialize(text); } diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelBeanDump.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelBeanDump.java index 684de4fce8c..0d6653b7ed0 100644 --- a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelBeanDump.java +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelBeanDump.java @@ -218,7 +218,7 @@ public class CamelBeanDump extends ActionBaseCommand { } protected JsonObject waitForOutputFile(Path outputFile) { - return getJsonObject((Path) outputFile); + return getJsonObject(outputFile); } private static class Row { diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelSendAction.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelSendAction.java index a6659a21064..e31111ce30c 100644 --- a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelSendAction.java +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelSendAction.java @@ -16,15 +16,21 @@ */ package org.apache.camel.dsl.jbang.core.commands.action; +import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.apache.camel.dsl.jbang.core.commands.CamelJBangMain; +import org.apache.camel.dsl.jbang.core.commands.Run; import org.apache.camel.dsl.jbang.core.common.PathUtils; +import org.apache.camel.main.KameletMain; import org.apache.camel.util.StopWatch; import org.apache.camel.util.StringHelper; import org.apache.camel.util.TimeUtils; @@ -36,14 +42,15 @@ import org.fusesource.jansi.AnsiConsole; import picocli.CommandLine; @CommandLine.Command(name = "send", - description = "Send messages to endpoints via running Camel", sortOptions = false, + description = "Send messages to endpoints", sortOptions = false, showDefaultValues = true) public class CamelSendAction extends ActionBaseCommand { - @CommandLine.Parameters(description = "Name or pid of running Camel integration", arity = "0..1") - String name = "*"; + @CommandLine.Parameters(description = "To use an existing running Camel integration for sending the message (name or pid)", + arity = "0..1") + String name; - @CommandLine.Option(names = { "--endpoint" }, + @CommandLine.Option(names = { "--endpoint", "--uri" }, description = "Endpoint where to send the message (can be uri, pattern, or refer to a route id)") String endpoint; @@ -108,17 +115,23 @@ public class CamelSendAction extends ActionBaseCommand { @Override public Integer doCall() throws Exception { - List<Long> pids = findPids(name); - if (pids.isEmpty()) { - return 0; - } else if (pids.size() > 1) { - printer().println("Name or pid " + name + " matches " + pids.size() - + " running Camel integrations. Specify a name or PID that matches exactly one."); - return 0; + if (headers != null) { + for (String h : headers) { + if (!h.contains("=")) { + printer().println("Header must be in key=value format, was: " + h); + return 0; + } + } } - this.pid = pids.get(0); + if (name != null) { + return doCall(name); + } else { + return doCallLocal(); + } + } + protected Path writeSendData() { // ensure output file is deleted before executing action Path outputFile = getOutputFile(Long.toString(pid)); PathUtils.deleteFile(outputFile); @@ -141,10 +154,6 @@ public class CamelSendAction extends ActionBaseCommand { JsonArray arr = new JsonArray(); for (String h : headers) { JsonObject jo = new JsonObject(); - if (!h.contains("=")) { - printer().println("Header must be in key=value format, was: " + h); - return 0; - } jo.put("key", StringHelper.before(h, "=")); jo.put("value", StringHelper.after(h, "=")); arr.add(jo); @@ -159,56 +168,121 @@ public class CamelSendAction extends ActionBaseCommand { // ignore } - JsonObject jo = waitForOutputFile(outputFile); - if (jo != null) { - printStatusLine(jo); - String exchangeId = jo.getString("exchangeId"); - JsonObject message = jo.getMap("message"); - JsonObject cause = jo.getMap("exception"); - if (message != null || cause != null) { - if (replyFile != null) { - Path target = Path.of(replyFile); - String json = jo.toJson(); - if (pretty) { - json = Jsoner.prettyPrint(json, 2); + return outputFile; + } + + public Integer doCall(String name) throws Exception { + List<Long> pids = findPids(name); + if (pids.size() != 1) { + printer().println("Name or pid " + name + " matches " + pids.size() + + " running Camel integrations. Specify a name or PID that matches exactly one."); + return 0; + } + + this.pid = pids.get(0); + + Path outputFile = writeSendData(); + showStatus(outputFile); + + return 0; + } + + protected void showStatus(Path outputFile) throws Exception { + try { + JsonObject jo = waitForOutputFile(outputFile); + if (jo != null) { + printStatusLine(jo); + String exchangeId = jo.getString("exchangeId"); + JsonObject message = jo.getMap("message"); + JsonObject cause = jo.getMap("exception"); + if (message != null || cause != null) { + if (replyFile != null) { + Path target = Path.of(replyFile); + String json = jo.toJson(); + if (pretty) { + json = Jsoner.prettyPrint(json, 2); + } + try { + Files.writeString(target, json); + } catch (IOException e) { + // ignore + } } - try { - Files.writeString(target, json); - } catch (IOException e) { - // ignore + if (!showExchangeProperties && message != null) { + message.remove("exchangeProperties"); + } + if (!showExchangeVariables && message != null) { + message.remove("exchangeVariables"); + } + if (!showHeaders && message != null) { + message.remove("headers"); + } + if (!showBody && message != null) { + message.remove("body"); + } + if (!showException && cause != null) { + cause = null; + } + if (replyFile == null) { + tableHelper = new MessageTableHelper(); + tableHelper.setPretty(pretty); + tableHelper.setLoggingColor(loggingColor); + tableHelper.setShowExchangeProperties(showExchangeProperties); + tableHelper.setShowExchangeVariables(showExchangeVariables); + String mep = (reply || replyFile != null) ? "InOut" : "InOnly"; + String table = tableHelper.getDataAsTable(exchangeId, mep, jo, null, message, cause); + printer().println(table); } } - if (!showExchangeProperties && message != null) { - message.remove("exchangeProperties"); - } - if (!showExchangeVariables && message != null) { - message.remove("exchangeVariables"); - } - if (!showHeaders && message != null) { - message.remove("headers"); - } - if (!showBody && message != null) { - message.remove("body"); - } - if (!showException && cause != null) { - cause = null; - } - if (replyFile == null) { - tableHelper = new MessageTableHelper(); - tableHelper.setPretty(pretty); - tableHelper.setLoggingColor(loggingColor); - tableHelper.setShowExchangeProperties(showExchangeProperties); - tableHelper.setShowExchangeVariables(showExchangeVariables); - String table = tableHelper.getDataAsTable(exchangeId, mep, jo, null, message, cause); - printer().println(table); - } + } else { + printer().println("Send timeout"); } + } finally { + // delete output file after use + PathUtils.deleteFile(outputFile); } + } - // delete output file after use - PathUtils.deleteFile(outputFile); + private Integer doCallLocal() throws Exception { + AtomicReference<KameletMain> ref = new AtomicReference<>(); + Run run = new Run(this.getMain()) { + @Override + protected int runKameletMain(KameletMain main) throws Exception { + ref.set(main); + return super.runKameletMain(main); + } + }; + run.empty = true; + + // spawn thread that waits for response file + final CountDownLatch latch = new CountDownLatch(1); + this.pid = ProcessHandle.current().pid(); + Path outputFile = writeSendData(); + Thread t = new Thread("CamelJBangSendStatus") { + @Override + public void run() { + try { + showStatus(outputFile); + } catch (Exception e) { + e.printStackTrace(); + // ignore + } finally { + latch.countDown(); + // signal to main we are complete + KameletMain main = ref.get(); + if (main != null) { + main.completed(); + } + } + } + }; + // keep thread running as we need it to show the status before terminating + t.start(); - return 0; + Integer exit = run.call(); + latch.await(timeout + 10000, TimeUnit.MILLISECONDS); + + return exit; } private void printStatusLine(JsonObject jo) { @@ -232,14 +306,16 @@ public class CamelSendAction extends ActionBaseCommand { } // endpoint String ids = jo.getString("endpoint"); - if (ids.length() > 40) { - ids = ids.substring(0, 40); - } - ids = String.format("%40.40s", ids); - if (loggingColor) { - AnsiConsole.out().print(Ansi.ansi().fgCyan().a(ids).reset()); - } else { - printer().print(ids); + if (ids != null) { + if (ids.length() > 40) { + ids = ids.substring(0, 40); + } + ids = String.format("%40.40s", ids); + if (loggingColor) { + AnsiConsole.out().print(Ansi.ansi().fgCyan().a(ids).reset()); + } else { + printer().print(ids); + } } printer().print(" : "); // status @@ -255,7 +331,7 @@ public class CamelSendAction extends ActionBaseCommand { } private String getStatus(JsonObject r) { - boolean failed = "failed".equals(r.getString("status")); + boolean failed = "failed".equals(r.getString("status")) || "error".equals(r.getString("status")); boolean timeout = "timeout".equals(r.getString("status")); boolean reply = r.containsKey("message"); String status; @@ -294,11 +370,12 @@ public class CamelSendAction extends ActionBaseCommand { StopWatch watch = new StopWatch(); long wait = timeout + 10000; // wait longer than timeout while (watch.taken() < wait) { + File f = outputFile.toFile(); try { // give time for response to be ready Thread.sleep(20); - if (Files.exists(outputFile)) { + if (Files.exists(outputFile) && f.length() > 0) { String text = Files.readString(outputFile); return (JsonObject) Jsoner.deserialize(text); } diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/TransformMessageAction.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/TransformMessageAction.java index 32f81f55eb0..6a7cc61472d 100644 --- a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/TransformMessageAction.java +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/TransformMessageAction.java @@ -328,7 +328,7 @@ public class TransformMessageAction extends ActionWatchCommand { } private String getStatus(JsonObject r) { - boolean failed = "failed".equals(r.getString("status")); + boolean failed = "failed".equals(r.getString("status")) || "error".equals(r.getString("status")); String status; if (failed) { status = "Failed (exception)";