davsclaus commented on code in PR #9191: URL: https://github.com/apache/camel/pull/9191#discussion_r1089314827
########## components/camel-zeebe/pom.xml: ########## @@ -0,0 +1,122 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- Review Comment: Use same license header as other pom.xml files ########## components/camel-zeebe/pom.xml: ########## @@ -0,0 +1,122 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.camel</groupId> + <artifactId>components</artifactId> + <version>3.21.0-SNAPSHOT</version> + </parent> + + <artifactId>camel-zeebe</artifactId> + <packaging>jar</packaging> + <name>Camel :: Zeebe</name> + <description>Camel Zeebe support</description> + + <properties> + <firstVersion>3.20</firstVersion> Review Comment: 3.21 ########## components/camel-zeebe/src/main/java/org/apache/camel/component/zeebe/ZeebeConsumer.java: ########## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.zeebe; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.camunda.zeebe.client.api.response.ActivatedJob; +import io.camunda.zeebe.client.api.worker.JobClient; +import io.camunda.zeebe.client.api.worker.JobHandler; +import io.camunda.zeebe.client.api.worker.JobWorker; +import org.apache.camel.AsyncCallback; +import org.apache.camel.CamelException; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.zeebe.internal.OperationName; +import org.apache.camel.component.zeebe.model.JobWorkerMessage; +import org.apache.camel.support.DefaultConsumer; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ZeebeConsumer extends DefaultConsumer { + private static final Logger LOG = LoggerFactory.getLogger(ZeebeConsumer.class); + + private final ZeebeEndpoint endpoint; + + private JobWorker jobWorker; + + private ObjectMapper objectMapper = new ObjectMapper(); + + public ZeebeConsumer(ZeebeEndpoint endpoint, Processor processor) throws CamelException { + super(endpoint, processor); + this.endpoint = endpoint; + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + + final OperationName operationName = getEndpoint().getOperationName(); + switch (operationName) { + case REGISTER_JOB_WORKER: + ObjectHelper.notNull(getEndpoint().getJobKey(), "jobKey"); + + jobWorker = getEndpoint().getZeebeService().registerJobHandler(new ConsumerJobHandler(), + getEndpoint().getJobKey(), getEndpoint().getTimeout()); + break; + default: + LOG.error("Invalid Operation {}", operationName.value()); + throw new CamelException(String.format("Invalid Operation for Consumer %s", operationName.value())); + } + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + + if (jobWorker != null && jobWorker.isOpen()) { + jobWorker.close(); + } + } + + private class ConsumerJobHandler implements JobHandler { + @Override + public void handle(JobClient client, ActivatedJob job) throws Exception { + final Exchange exchange = createExchange(true); + + JobWorkerMessage message = new JobWorkerMessage(); + message.setKey(job.getKey()); + message.setType(job.getType()); + message.setCustomHeaders(job.getCustomHeaders()); + message.setProcessInstanceKey(job.getProcessInstanceKey()); + message.setBpmnProcessId(job.getBpmnProcessId()); + message.setProcessDefinitionVersion(job.getProcessDefinitionVersion()); + message.setProcessDefinitionKey(job.getProcessDefinitionKey()); + message.setElementId(job.getElementId()); + message.setElementInstanceKey(job.getElementInstanceKey()); + message.setWorker(job.getWorker()); + message.setRetries(job.getRetries()); + message.setDeadline(job.getDeadline()); + message.setVariables(job.getVariablesAsMap()); + + LOG.debug("New Job Message: {}", job.toJson()); Review Comment: Use ifDebugEnabled as toJson is expensive ########## components/camel-zeebe/src/main/java/org/apache/camel/component/zeebe/ZeebeEndpoint.java: ########## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.zeebe; + +import org.apache.camel.Category; +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.component.zeebe.internal.OperationName; +import org.apache.camel.component.zeebe.internal.ZeebeService; +import org.apache.camel.spi.Metadata; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; +import org.apache.camel.spi.UriPath; +import org.apache.camel.support.DefaultEndpoint; +import org.apache.camel.util.ObjectHelper; + +/** + * Zeebe component which does integrage with Camunda Zeebe to interact with the API. + */ +@UriEndpoint(firstVersion = "1.0-SNAPSHOT", scheme = "zeebe", title = "Zeebe", syntax = "zeebe:operationName", + category = { Category.JAVA }, + headersClass = ZeebeConstants.class) +public class ZeebeEndpoint extends DefaultEndpoint { + + @UriPath(label = "common", description = "The operation to use", enums = "startProcess," + + "cancelProcess,publishMessage,completeJob,failJob,updateJobRetries,worker,throwError,deployResource") + @Metadata(required = true) + private OperationName operationName; + + @UriParam(defaultValue = "false") + @Metadata(description = "Format the result in the body as JSON.") + private boolean formatJSON; + + @UriParam + @Metadata(label = "consumer", description = "JobKey for the job worker.", javaType = "String") + private String jobKey; + + @UriParam(defaultValue = "10") + @Metadata(label = "consumer", description = "Timeout for job worker.", javaType = "int") Review Comment: javaType = should only be set when its a special type, so remove ########## components/camel-zeebe/src/main/java/org/apache/camel/component/zeebe/ZeebeComponent.java: ########## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.zeebe; + +import java.util.Map; + +import org.apache.camel.Endpoint; +import org.apache.camel.component.zeebe.internal.OperationName; +import org.apache.camel.component.zeebe.internal.ZeebeService; +import org.apache.camel.spi.Metadata; +import org.apache.camel.support.DefaultComponent; + +@org.apache.camel.spi.annotations.Component("zeebe") +public class ZeebeComponent extends DefaultComponent { + + @Metadata(defaultValue = "" + ZeebeConstants.DEFAULT_GATEWAY_HOST) + String gatewayHost; + @Metadata(defaultValue = "" + ZeebeConstants.DEFAULT_GATEWAY_PORT) + int gatewayPort; + @Metadata Review Comment: label = security ########## components/camel-zeebe/src/main/java/org/apache/camel/component/zeebe/ZeebeProducer.java: ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.zeebe; + +import org.apache.camel.CamelException; +import org.apache.camel.Exchange; +import org.apache.camel.component.zeebe.internal.OperationName; +import org.apache.camel.component.zeebe.processor.DeploymentProcessor; +import org.apache.camel.component.zeebe.processor.JobProcessor; +import org.apache.camel.component.zeebe.processor.MessageProcessor; +import org.apache.camel.component.zeebe.processor.ProcessProcessor; +import org.apache.camel.component.zeebe.processor.ZeebeProcessor; +import org.apache.camel.support.DefaultProducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ZeebeProducer extends DefaultProducer { + private static final Logger LOG = LoggerFactory.getLogger(ZeebeProducer.class); + private ZeebeEndpoint endpoint; + private ZeebeProcessor processor; + + public ZeebeProducer(ZeebeEndpoint endpoint) { + super(endpoint); + this.endpoint = endpoint; + + final OperationName operationName = endpoint.getOperationName(); + if (isProcessOperation(operationName)) { + processor = new ProcessProcessor(endpoint); + } else if (isMessageOperation(operationName)) { + processor = new MessageProcessor(endpoint); + } else if (isJobOperation(operationName)) { + processor = new JobProcessor(endpoint); + } else if (isDeploymentOperation(operationName)) { + processor = new DeploymentProcessor(endpoint); + } + } + + public void process(Exchange exchange) throws Exception { + if (processor != null) { + processor.process(exchange); + } else { + LOG.error("No processor found"); Review Comment: Do not log and throw exception, only trow exception ########## components/camel-zeebe/src/main/java/org/apache/camel/component/zeebe/ZeebeComponent.java: ########## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.zeebe; + +import java.util.Map; + +import org.apache.camel.Endpoint; +import org.apache.camel.component.zeebe.internal.OperationName; +import org.apache.camel.component.zeebe.internal.ZeebeService; +import org.apache.camel.spi.Metadata; +import org.apache.camel.support.DefaultComponent; + +@org.apache.camel.spi.annotations.Component("zeebe") +public class ZeebeComponent extends DefaultComponent { + + @Metadata(defaultValue = "" + ZeebeConstants.DEFAULT_GATEWAY_HOST) + String gatewayHost; + @Metadata(defaultValue = "" + ZeebeConstants.DEFAULT_GATEWAY_PORT) + int gatewayPort; + @Metadata + String clientId; + @Metadata Review Comment: label = security, secret = true ########## components/camel-zeebe/src/main/java/org/apache/camel/component/zeebe/ZeebeProducer.java: ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.zeebe; + +import org.apache.camel.CamelException; +import org.apache.camel.Exchange; +import org.apache.camel.component.zeebe.internal.OperationName; +import org.apache.camel.component.zeebe.processor.DeploymentProcessor; +import org.apache.camel.component.zeebe.processor.JobProcessor; +import org.apache.camel.component.zeebe.processor.MessageProcessor; +import org.apache.camel.component.zeebe.processor.ProcessProcessor; +import org.apache.camel.component.zeebe.processor.ZeebeProcessor; +import org.apache.camel.support.DefaultProducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ZeebeProducer extends DefaultProducer { + private static final Logger LOG = LoggerFactory.getLogger(ZeebeProducer.class); + private ZeebeEndpoint endpoint; + private ZeebeProcessor processor; + + public ZeebeProducer(ZeebeEndpoint endpoint) { + super(endpoint); + this.endpoint = endpoint; + + final OperationName operationName = endpoint.getOperationName(); + if (isProcessOperation(operationName)) { + processor = new ProcessProcessor(endpoint); + } else if (isMessageOperation(operationName)) { + processor = new MessageProcessor(endpoint); + } else if (isJobOperation(operationName)) { + processor = new JobProcessor(endpoint); + } else if (isDeploymentOperation(operationName)) { + processor = new DeploymentProcessor(endpoint); Review Comment: Add doInit or doStart and validate that processor != null and fail if so, eg such validation should be in init or start ########## components/camel-zeebe/src/main/java/org/apache/camel/component/zeebe/ZeebeComponent.java: ########## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.zeebe; + +import java.util.Map; + +import org.apache.camel.Endpoint; +import org.apache.camel.component.zeebe.internal.OperationName; +import org.apache.camel.component.zeebe.internal.ZeebeService; +import org.apache.camel.spi.Metadata; +import org.apache.camel.support.DefaultComponent; + +@org.apache.camel.spi.annotations.Component("zeebe") +public class ZeebeComponent extends DefaultComponent { + + @Metadata(defaultValue = "" + ZeebeConstants.DEFAULT_GATEWAY_HOST) + String gatewayHost; + @Metadata(defaultValue = "" + ZeebeConstants.DEFAULT_GATEWAY_PORT) + int gatewayPort; + @Metadata + String clientId; + @Metadata + String clientSecret; + @Metadata Review Comment: label = security ########## components/camel-zeebe/src/main/java/org/apache/camel/component/zeebe/processor/DeploymentProcessor.java: ########## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.zeebe.processor; + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.camel.CamelException; +import org.apache.camel.Exchange; +import org.apache.camel.component.zeebe.ZeebeConstants; +import org.apache.camel.component.zeebe.ZeebeEndpoint; +import org.apache.camel.component.zeebe.internal.ZeebeService; +import org.apache.camel.component.zeebe.model.DeploymentRequest; +import org.apache.camel.component.zeebe.model.DeploymentResponse; +import org.apache.camel.component.zeebe.model.ProcessDeploymentResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DeploymentProcessor extends AbstractBaseProcessor { + private static final Logger LOG = LoggerFactory.getLogger(DeploymentProcessor.class); + + public DeploymentProcessor(ZeebeEndpoint endpoint) { + super(endpoint); + } + + @Override + public void process(Exchange exchange) throws Exception { + DeploymentRequest message = null; + + Object body = exchange.getMessage().getBody(); + String headerResourceName = exchange.getMessage().getHeader(ZeebeConstants.RESOURCE_NAME, String.class); + if (headerResourceName != null && (body instanceof String || body instanceof byte[])) { + message = new DeploymentRequest(); + message.setName(headerResourceName); + if (body instanceof String) { + message.setContent(((String) body).getBytes()); + } else { + message.setContent((byte[]) body); + } + } else if (body instanceof DeploymentRequest) { + message = (DeploymentRequest) body; + } else if (body instanceof String) { + try { + message = objectMapper.readValue((String) body, DeploymentRequest.class); + } catch (JsonProcessingException jsonProcessingException) { + throw new IllegalArgumentException("Cannot convert body to DeploymentRequestMessage", jsonProcessingException); + } + } else { Review Comment: The body can be inputstream and still possible to read into a String so without using camel type converter then this processor is very limited ########## components/camel-zeebe/src/main/java/org/apache/camel/component/zeebe/ZeebeConsumer.java: ########## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.zeebe; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.camunda.zeebe.client.api.response.ActivatedJob; +import io.camunda.zeebe.client.api.worker.JobClient; +import io.camunda.zeebe.client.api.worker.JobHandler; +import io.camunda.zeebe.client.api.worker.JobWorker; +import org.apache.camel.AsyncCallback; +import org.apache.camel.CamelException; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.zeebe.internal.OperationName; +import org.apache.camel.component.zeebe.model.JobWorkerMessage; +import org.apache.camel.support.DefaultConsumer; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ZeebeConsumer extends DefaultConsumer { + private static final Logger LOG = LoggerFactory.getLogger(ZeebeConsumer.class); + + private final ZeebeEndpoint endpoint; + + private JobWorker jobWorker; + + private ObjectMapper objectMapper = new ObjectMapper(); + + public ZeebeConsumer(ZeebeEndpoint endpoint, Processor processor) throws CamelException { + super(endpoint, processor); + this.endpoint = endpoint; + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + + final OperationName operationName = getEndpoint().getOperationName(); + switch (operationName) { + case REGISTER_JOB_WORKER: + ObjectHelper.notNull(getEndpoint().getJobKey(), "jobKey"); + + jobWorker = getEndpoint().getZeebeService().registerJobHandler(new ConsumerJobHandler(), + getEndpoint().getJobKey(), getEndpoint().getTimeout()); + break; + default: + LOG.error("Invalid Operation {}", operationName.value()); Review Comment: Do not log and throw exception, only throw exception ########## components/camel-zeebe/src/main/java/org/apache/camel/component/zeebe/ZeebeEndpoint.java: ########## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.zeebe; + +import org.apache.camel.Category; +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.component.zeebe.internal.OperationName; +import org.apache.camel.component.zeebe.internal.ZeebeService; +import org.apache.camel.spi.Metadata; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; +import org.apache.camel.spi.UriPath; +import org.apache.camel.support.DefaultEndpoint; +import org.apache.camel.util.ObjectHelper; + +/** + * Zeebe component which does integrage with Camunda Zeebe to interact with the API. + */ +@UriEndpoint(firstVersion = "1.0-SNAPSHOT", scheme = "zeebe", title = "Zeebe", syntax = "zeebe:operationName", Review Comment: 3.21.0 ########## components/camel-zeebe/src/main/java/org/apache/camel/component/zeebe/ZeebeEndpoint.java: ########## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.zeebe; + +import org.apache.camel.Category; +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.component.zeebe.internal.OperationName; +import org.apache.camel.component.zeebe.internal.ZeebeService; +import org.apache.camel.spi.Metadata; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; +import org.apache.camel.spi.UriPath; +import org.apache.camel.support.DefaultEndpoint; +import org.apache.camel.util.ObjectHelper; + +/** + * Zeebe component which does integrage with Camunda Zeebe to interact with the API. + */ +@UriEndpoint(firstVersion = "1.0-SNAPSHOT", scheme = "zeebe", title = "Zeebe", syntax = "zeebe:operationName", + category = { Category.JAVA }, Review Comment: Category is JBPM or some sort kind like that ########## components/camel-zeebe/src/main/java/org/apache/camel/component/zeebe/ZeebeConsumer.java: ########## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.zeebe; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.camunda.zeebe.client.api.response.ActivatedJob; +import io.camunda.zeebe.client.api.worker.JobClient; +import io.camunda.zeebe.client.api.worker.JobHandler; +import io.camunda.zeebe.client.api.worker.JobWorker; +import org.apache.camel.AsyncCallback; +import org.apache.camel.CamelException; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.zeebe.internal.OperationName; +import org.apache.camel.component.zeebe.model.JobWorkerMessage; +import org.apache.camel.support.DefaultConsumer; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ZeebeConsumer extends DefaultConsumer { + private static final Logger LOG = LoggerFactory.getLogger(ZeebeConsumer.class); + + private final ZeebeEndpoint endpoint; + + private JobWorker jobWorker; + + private ObjectMapper objectMapper = new ObjectMapper(); + + public ZeebeConsumer(ZeebeEndpoint endpoint, Processor processor) throws CamelException { + super(endpoint, processor); + this.endpoint = endpoint; + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + + final OperationName operationName = getEndpoint().getOperationName(); + switch (operationName) { + case REGISTER_JOB_WORKER: + ObjectHelper.notNull(getEndpoint().getJobKey(), "jobKey"); + + jobWorker = getEndpoint().getZeebeService().registerJobHandler(new ConsumerJobHandler(), + getEndpoint().getJobKey(), getEndpoint().getTimeout()); + break; + default: + LOG.error("Invalid Operation {}", operationName.value()); + throw new CamelException(String.format("Invalid Operation for Consumer %s", operationName.value())); + } + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + + if (jobWorker != null && jobWorker.isOpen()) { + jobWorker.close(); Review Comment: Maybe close should be in try .. catch and WARN log if closing failed ########## components/camel-zeebe/src/main/java/org/apache/camel/component/zeebe/ZeebeEndpoint.java: ########## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.zeebe; + +import org.apache.camel.Category; +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.component.zeebe.internal.OperationName; +import org.apache.camel.component.zeebe.internal.ZeebeService; +import org.apache.camel.spi.Metadata; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; +import org.apache.camel.spi.UriPath; +import org.apache.camel.support.DefaultEndpoint; +import org.apache.camel.util.ObjectHelper; + +/** + * Zeebe component which does integrage with Camunda Zeebe to interact with the API. + */ +@UriEndpoint(firstVersion = "1.0-SNAPSHOT", scheme = "zeebe", title = "Zeebe", syntax = "zeebe:operationName", + category = { Category.JAVA }, + headersClass = ZeebeConstants.class) +public class ZeebeEndpoint extends DefaultEndpoint { + + @UriPath(label = "common", description = "The operation to use", enums = "startProcess," + + "cancelProcess,publishMessage,completeJob,failJob,updateJobRetries,worker,throwError,deployResource") + @Metadata(required = true) + private OperationName operationName; + + @UriParam(defaultValue = "false") + @Metadata(description = "Format the result in the body as JSON.") + private boolean formatJSON; + + @UriParam + @Metadata(label = "consumer", description = "JobKey for the job worker.", javaType = "String") Review Comment: javaType = should only be set when its a special type, so remove ########## components/camel-zeebe/src/main/java/org/apache/camel/component/zeebe/processor/DeploymentProcessor.java: ########## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.zeebe.processor; + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.camel.CamelException; +import org.apache.camel.Exchange; +import org.apache.camel.component.zeebe.ZeebeConstants; +import org.apache.camel.component.zeebe.ZeebeEndpoint; +import org.apache.camel.component.zeebe.internal.ZeebeService; +import org.apache.camel.component.zeebe.model.DeploymentRequest; +import org.apache.camel.component.zeebe.model.DeploymentResponse; +import org.apache.camel.component.zeebe.model.ProcessDeploymentResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DeploymentProcessor extends AbstractBaseProcessor { + private static final Logger LOG = LoggerFactory.getLogger(DeploymentProcessor.class); + + public DeploymentProcessor(ZeebeEndpoint endpoint) { + super(endpoint); + } + + @Override + public void process(Exchange exchange) throws Exception { + DeploymentRequest message = null; + + Object body = exchange.getMessage().getBody(); + String headerResourceName = exchange.getMessage().getHeader(ZeebeConstants.RESOURCE_NAME, String.class); + if (headerResourceName != null && (body instanceof String || body instanceof byte[])) { + message = new DeploymentRequest(); + message.setName(headerResourceName); + if (body instanceof String) { + message.setContent(((String) body).getBytes()); + } else { + message.setContent((byte[]) body); + } + } else if (body instanceof DeploymentRequest) { + message = (DeploymentRequest) body; + } else if (body instanceof String) { + try { + message = objectMapper.readValue((String) body, DeploymentRequest.class); + } catch (JsonProcessingException jsonProcessingException) { + throw new IllegalArgumentException("Cannot convert body to DeploymentRequestMessage", jsonProcessingException); + } + } else { + LOG.error("Deployment Resource missing"); Review Comment: Log and throw! ########## components/camel-zeebe/src/main/java/org/apache/camel/component/zeebe/processor/DeploymentProcessor.java: ########## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.zeebe.processor; + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.camel.CamelException; +import org.apache.camel.Exchange; +import org.apache.camel.component.zeebe.ZeebeConstants; +import org.apache.camel.component.zeebe.ZeebeEndpoint; +import org.apache.camel.component.zeebe.internal.ZeebeService; +import org.apache.camel.component.zeebe.model.DeploymentRequest; +import org.apache.camel.component.zeebe.model.DeploymentResponse; +import org.apache.camel.component.zeebe.model.ProcessDeploymentResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DeploymentProcessor extends AbstractBaseProcessor { + private static final Logger LOG = LoggerFactory.getLogger(DeploymentProcessor.class); + + public DeploymentProcessor(ZeebeEndpoint endpoint) { + super(endpoint); + } + + @Override + public void process(Exchange exchange) throws Exception { + DeploymentRequest message = null; + + Object body = exchange.getMessage().getBody(); + String headerResourceName = exchange.getMessage().getHeader(ZeebeConstants.RESOURCE_NAME, String.class); + if (headerResourceName != null && (body instanceof String || body instanceof byte[])) { + message = new DeploymentRequest(); + message.setName(headerResourceName); + if (body instanceof String) { + message.setContent(((String) body).getBytes()); + } else { + message.setContent((byte[]) body); + } + } else if (body instanceof DeploymentRequest) { + message = (DeploymentRequest) body; + } else if (body instanceof String) { + try { + message = objectMapper.readValue((String) body, DeploymentRequest.class); + } catch (JsonProcessingException jsonProcessingException) { + throw new IllegalArgumentException("Cannot convert body to DeploymentRequestMessage", jsonProcessingException); + } + } else { + LOG.error("Deployment Resource missing"); + throw new CamelException("Deployment Resource missing"); + } + + DeploymentResponse resultMessage = null; + + switch (endpoint.getOperationName()) { + case DEPLOY_RESOURCE: + resultMessage = deployResource(message); + break; + default: + LOG.error("Unknown Operation!"); Review Comment: log and throw! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org