orpiske commented on code in PR #9191:
URL: https://github.com/apache/camel/pull/9191#discussion_r1084983075


##########
components/camel-zeebe/src/main/java/org/apache/camel/component/zeebe/ZeebeConsumer.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.zeebe;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.camunda.zeebe.client.api.response.ActivatedJob;
+import io.camunda.zeebe.client.api.worker.JobClient;
+import io.camunda.zeebe.client.api.worker.JobHandler;
+import io.camunda.zeebe.client.api.worker.JobWorker;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelException;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.component.zeebe.internal.OperationName;
+import org.apache.camel.component.zeebe.model.JobWorkerMessage;
+import org.apache.camel.support.DefaultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZeebeConsumer extends DefaultConsumer {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ZeebeConsumer.class);
+
+    private final ZeebeEndpoint endpoint;
+
+    private JobWorker jobWorker;
+
+    private ObjectMapper objectMapper = new ObjectMapper();
+
+    public ZeebeConsumer(ZeebeEndpoint endpoint, Processor processor) throws 
CamelException {
+        super(endpoint, processor);
+        this.endpoint = endpoint;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        final OperationName operationName = getEndpoint().getOperationName();
+        switch (operationName) {
+            case REGISTER_JOB_WORKER:
+                if (getEndpoint().getJobKey() == null) {
+                    LOG.error("Missing JobKey");
+                    throw new CamelException("Missing JobKey");
+                }
+                jobWorker = 
getEndpoint().getZeebeService().registerJobHandler(new ConsumerJobHandler(),
+                        getEndpoint().getJobKey(), getEndpoint().getTimeout());
+                break;
+            default:
+                LOG.error("Invalid Operation %s", operationName.value());

Review Comment:
   I think this should be `LOG.error("Invalid Operation {}", 
operationName.value());`



##########
components/camel-zeebe/src/main/java/org/apache/camel/component/zeebe/ZeebeConsumer.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.zeebe;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.camunda.zeebe.client.api.response.ActivatedJob;
+import io.camunda.zeebe.client.api.worker.JobClient;
+import io.camunda.zeebe.client.api.worker.JobHandler;
+import io.camunda.zeebe.client.api.worker.JobWorker;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelException;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.component.zeebe.internal.OperationName;
+import org.apache.camel.component.zeebe.model.JobWorkerMessage;
+import org.apache.camel.support.DefaultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZeebeConsumer extends DefaultConsumer {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ZeebeConsumer.class);
+
+    private final ZeebeEndpoint endpoint;
+
+    private JobWorker jobWorker;
+
+    private ObjectMapper objectMapper = new ObjectMapper();
+
+    public ZeebeConsumer(ZeebeEndpoint endpoint, Processor processor) throws 
CamelException {
+        super(endpoint, processor);
+        this.endpoint = endpoint;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        final OperationName operationName = getEndpoint().getOperationName();
+        switch (operationName) {
+            case REGISTER_JOB_WORKER:
+                if (getEndpoint().getJobKey() == null) {
+                    LOG.error("Missing JobKey");
+                    throw new CamelException("Missing JobKey");
+                }
+                jobWorker = 
getEndpoint().getZeebeService().registerJobHandler(new ConsumerJobHandler(),
+                        getEndpoint().getJobKey(), getEndpoint().getTimeout());
+                break;
+            default:
+                LOG.error("Invalid Operation %s", operationName.value());
+                throw new CamelException(String.format("Invalid Operation for 
Consumer %s", operationName.value()));
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+
+        if (jobWorker != null && jobWorker.isOpen()) {
+            jobWorker.close();
+        }
+    }
+
+    private class ConsumerJobHandler implements JobHandler {
+        @Override
+        public void handle(JobClient client, ActivatedJob job) throws 
Exception {
+            final Exchange exchange = createExchange(true);
+            final Message in = exchange.getIn();
+
+            JobWorkerMessage message = new JobWorkerMessage();
+            message.setKey(job.getKey());
+            message.setType(job.getType());
+            message.setCustomHeaders(job.getCustomHeaders());
+            message.setProcessInstanceKey(job.getProcessInstanceKey());
+            message.setBpmnProcessId(job.getBpmnProcessId());
+            
message.setProcessDefinitionVersion(job.getProcessDefinitionVersion());
+            message.setProcessDefinitionKey(job.getProcessDefinitionKey());
+            message.setElementId(job.getElementId());
+            message.setElementInstanceKey(job.getElementInstanceKey());
+            message.setWorker(job.getWorker());
+            message.setRetries(job.getRetries());
+            message.setDeadline(job.getDeadline());
+            message.setVariables(job.getVariablesAsMap());
+
+            LOG.info(job.toJson());

Review Comment:
   A few things:
   
   1. Maybe add a message explaining what is being logged.
   2. Also, seems like it would be better to use a lower debug level (`debug`, 
maybe?).



##########
components/camel-zeebe/src/main/java/org/apache/camel/component/zeebe/ZeebeEndpoint.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.zeebe;
+
+import org.apache.camel.Category;
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.zeebe.internal.OperationName;
+import org.apache.camel.component.zeebe.internal.ZeebeService;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+import org.apache.camel.support.DefaultEndpoint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Zeebe component which does bla bla.
+ *
+ * TODO: Update one line description above what the component does.
+ */
+@UriEndpoint(firstVersion = "1.0-SNAPSHOT", scheme = "zeebe", title = "Zeebe", 
syntax = "zeebe:operationName",
+             category = { Category.JAVA },
+             headersClass = ZeebeConstants.class)
+public class ZeebeEndpoint extends DefaultEndpoint {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ZeebeEndpoint.class);
+
+    @UriPath(label = "common", description = "The operation to use", enums = 
"startProcess," +
+                                                                             
"cancelProcess,publishMessage,completeJob,failJob,updateJobRetries,worker,throwError,deployResource")
+    @Metadata(required = true)
+    private OperationName operationName;
+
+    @UriParam(defaultValue = "false")
+    @Metadata(description = "Format the result in the body as JSON.")
+    private boolean formatJSON;
+
+    @UriParam
+    @Metadata(label = "consumer", description = "JobKey for the job worker.", 
javaType = "String")
+    private String jobKey;
+
+    @UriParam(defaultValue = "10")
+    @Metadata(label = "consumer", description = "Timeout for job worker.", 
javaType = "int")
+    private int timeout = 10;
+
+    public ZeebeEndpoint() {
+    }
+
+    public ZeebeEndpoint(String uri, ZeebeComponent component, OperationName 
operationName) {
+        super(uri, component);
+
+        this.operationName = operationName;
+    }
+
+    public Producer createProducer() throws Exception {
+        if (operationName == null) {
+            throw new IllegalArgumentException(String.format("Invalid 
Operation"));
+        }
+
+        return new ZeebeProducer(this);
+    }
+
+    public Consumer createConsumer(Processor processor) throws Exception {
+        if (operationName == null) {
+            throw new IllegalArgumentException(String.format("Invalid 
Operation"));
+        }

Review Comment:
   Same as above.



##########
components/camel-zeebe/src/test/java/org/apache/camel/component/zeebe/ResourceDeploymentIntegrationTest.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.zeebe;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.zeebe.model.DeploymentRequest;
+import org.apache.camel.component.zeebe.model.DeploymentResponse;
+import org.apache.camel.component.zeebe.model.ProcessDeploymentResponse;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Tag("standalone")
+public class ResourceDeploymentIntegrationTest extends CamelTestSupport {
+
+    public static final String RESOURCE_PATH = "data/";
+    public static final String RESOURCE_NAME = "test1_definition.bpmn";
+
+    public static final String INVALID_RESOURCE_NAME = "test1_definition.txt";
+
+    protected ZeebeComponent component;
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    @Test
+    void testUploadProcessDefinition() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:deployResource");
+        mock.expectedMinimumMessageCount(1);
+        mock.expectedHeaderReceived(ZeebeConstants.IS_SUCCESS, true);
+        mock.expectedHeaderReceived(ZeebeConstants.RESOURCE_NAME, 
RESOURCE_NAME);
+
+        DeploymentRequest deploymentRequest = new DeploymentRequest();
+        deploymentRequest.setName(RESOURCE_NAME);
+        deploymentRequest
+                
.setContent(this.getClass().getClassLoader().getResourceAsStream(RESOURCE_PATH 
+ RESOURCE_NAME).readAllBytes());
+
+        template.sendBody("direct:deployResource", deploymentRequest);
+        MockEndpoint.assertIsSatisfied(context);
+        if (!mock.getExchanges().isEmpty()) {
+            Exchange exchange = mock.getExchanges().get(0);
+            Object body = exchange.getIn().getBody();
+            assertTrue(body instanceof ProcessDeploymentResponse);
+            assertTrue(((ProcessDeploymentResponse) body).isSuccess());
+            
assertTrue(exchange.getIn().getHeaders().containsKey(ZeebeConstants.PROCESS_DEFINITION_KEY));
+            
assertTrue(exchange.getIn().getHeaders().containsKey(ZeebeConstants.BPMN_PROCESS_ID));
+            
assertTrue(exchange.getIn().getHeaders().containsKey(ZeebeConstants.VERSION));
+        }
+
+        // Deploy with resource name in header and resource as byte[] in body
+        template.sendBodyAndHeader("direct:deployResource",
+                
this.getClass().getClassLoader().getResourceAsStream(RESOURCE_PATH + 
RESOURCE_NAME).readAllBytes(),
+                ZeebeConstants.RESOURCE_NAME, RESOURCE_NAME);
+        MockEndpoint.assertIsSatisfied(context);
+        if (!mock.getExchanges().isEmpty()) {
+            Exchange exchange = mock.getExchanges().get(0);
+            Object body = exchange.getIn().getBody();
+            assertTrue(body instanceof ProcessDeploymentResponse);
+            assertTrue(((ProcessDeploymentResponse) body).isSuccess());
+        }
+
+        // Deploy with resource name in header and resource as String in body
+        template.sendBodyAndHeader("direct:deployResource",
+                new 
String(this.getClass().getClassLoader().getResourceAsStream(RESOURCE_PATH + 
RESOURCE_NAME).readAllBytes()),
+                ZeebeConstants.RESOURCE_NAME, RESOURCE_NAME);
+        MockEndpoint.assertIsSatisfied(context);
+        if (!mock.getExchanges().isEmpty()) {
+            Exchange exchange = mock.getExchanges().get(0);
+            Object body = exchange.getIn().getBody();
+            assertTrue(body instanceof ProcessDeploymentResponse);
+            assertTrue(((ProcessDeploymentResponse) body).isSuccess());
+        }
+    }
+
+    @Test
+    void testUploadProcessDefinitionJSON() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:deployResource_JSON");
+        mock.expectedMinimumMessageCount(1);
+
+        DeploymentRequest deploymentRequest = new DeploymentRequest();
+        deploymentRequest.setName(RESOURCE_NAME);
+        deploymentRequest
+                
.setContent(this.getClass().getClassLoader().getResourceAsStream("data/test1_definition.bpmn").readAllBytes());
+
+        template.sendBody("direct:deployResource_JSON", 
objectMapper.writeValueAsString(deploymentRequest));
+        MockEndpoint.assertIsSatisfied(context);
+        if (!mock.getExchanges().isEmpty()) {
+            Exchange exchange = mock.getExchanges().get(0);
+            String body = exchange.getIn().getBody(String.class);
+            ProcessDeploymentResponse response = objectMapper.readValue(body, 
ProcessDeploymentResponse.class);
+            assertTrue(response.isSuccess());
+        }
+    }
+
+    @Test
+    void testInvalidUploadProcessDefinition() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:deployResource");
+        mock.expectedMinimumMessageCount(1);
+        mock.expectedHeaderReceived(ZeebeConstants.IS_SUCCESS, false);
+
+        DeploymentRequest deploymentRequest = new DeploymentRequest();
+        deploymentRequest.setName(INVALID_RESOURCE_NAME);
+        deploymentRequest
+                
.setContent(this.getClass().getClassLoader().getResourceAsStream(RESOURCE_PATH 
+ RESOURCE_NAME).readAllBytes());
+
+        template.sendBody("direct:deployResource", deploymentRequest);
+        MockEndpoint.assertIsSatisfied(context);
+        if (!mock.getExchanges().isEmpty()) {
+            Exchange exchange = mock.getExchanges().get(0);
+            Object body = exchange.getIn().getBody();
+            assertTrue(body instanceof DeploymentResponse);

Review Comment:
   IIRC, there's an assert specific to check for the type. 



##########
components/camel-zeebe/src/test/java/org/apache/camel/component/zeebe/ZeebeConsumerIntegrationTest.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.zeebe;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.zeebe.model.DeploymentRequest;
+import org.apache.camel.component.zeebe.model.DeploymentResponse;
+import org.apache.camel.component.zeebe.model.JobRequest;
+import org.apache.camel.component.zeebe.model.JobWorkerMessage;
+import org.apache.camel.component.zeebe.model.ProcessDeploymentResponse;
+import org.apache.camel.component.zeebe.model.ProcessRequest;
+import org.apache.camel.component.zeebe.model.ProcessResponse;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Tag("standalone")
+@MockitoSettings(strictness = Strictness.LENIENT)
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+class ZeebeConsumerIntegrationTest extends CamelTestSupport {
+
+    public static final String TEST_1_DEFINITION_BPMN = 
"test1_definition.bpmn";
+    private ZeebeComponent component;
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    @BeforeAll
+    void initAll() throws Exception {
+        createComponent();
+        component.doStart();
+
+        DeploymentRequest deployProcessMessage = new DeploymentRequest();
+        deployProcessMessage.setName(TEST_1_DEFINITION_BPMN);
+        deployProcessMessage
+                
.setContent(this.getClass().getClassLoader().getResourceAsStream("data/test1_definition.bpmn").readAllBytes());
+
+        DeploymentResponse deploymentResponse = 
component.getZeebeService().deployResource(deployProcessMessage);
+
+        ProcessRequest processRequest = new ProcessRequest();
+        processRequest.setProcessId(((ProcessDeploymentResponse) 
deploymentResponse).getBpmnProcessId());
+        ProcessResponse processResponse = 
component.getZeebeService().startProcess(processRequest);
+    }
+
+    @BeforeEach
+    void init() throws Exception {
+
+    }
+
+    @Test
+    public void shouldProcessJobWorkerMessage() throws Exception {
+        MockEndpoint workerMock = getMockEndpoint("mock:jobWorker");
+        workerMock.expectedMinimumMessageCount(1);
+
+        for (int i = 0; i < 10; i++) {
+            if (!workerMock.getExchanges().isEmpty()) {
+                break;
+            }
+            TimeUnit.SECONDS.sleep(2);

Review Comment:
   Please, avoid `sleeps` on the test code. Our tests already consume way too 
much time and we have been trying to reduce this. If some kind of 
synchronization is needed, I'd recommend looking at using Awaitility to do so.



##########
components/camel-zeebe/src/test/java/org/apache/camel/component/zeebe/ZeebeConsumerIntegrationTest.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.zeebe;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.zeebe.model.DeploymentRequest;
+import org.apache.camel.component.zeebe.model.DeploymentResponse;
+import org.apache.camel.component.zeebe.model.JobRequest;
+import org.apache.camel.component.zeebe.model.JobWorkerMessage;
+import org.apache.camel.component.zeebe.model.ProcessDeploymentResponse;
+import org.apache.camel.component.zeebe.model.ProcessRequest;
+import org.apache.camel.component.zeebe.model.ProcessResponse;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Tag("standalone")
+@MockitoSettings(strictness = Strictness.LENIENT)
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+class ZeebeConsumerIntegrationTest extends CamelTestSupport {
+
+    public static final String TEST_1_DEFINITION_BPMN = 
"test1_definition.bpmn";
+    private ZeebeComponent component;
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    @BeforeAll
+    void initAll() throws Exception {
+        createComponent();
+        component.doStart();
+
+        DeploymentRequest deployProcessMessage = new DeploymentRequest();
+        deployProcessMessage.setName(TEST_1_DEFINITION_BPMN);
+        deployProcessMessage
+                
.setContent(this.getClass().getClassLoader().getResourceAsStream("data/test1_definition.bpmn").readAllBytes());
+
+        DeploymentResponse deploymentResponse = 
component.getZeebeService().deployResource(deployProcessMessage);
+
+        ProcessRequest processRequest = new ProcessRequest();
+        processRequest.setProcessId(((ProcessDeploymentResponse) 
deploymentResponse).getBpmnProcessId());
+        ProcessResponse processResponse = 
component.getZeebeService().startProcess(processRequest);
+    }
+
+    @BeforeEach
+    void init() throws Exception {
+
+    }
+
+    @Test
+    public void shouldProcessJobWorkerMessage() throws Exception {
+        MockEndpoint workerMock = getMockEndpoint("mock:jobWorker");
+        workerMock.expectedMinimumMessageCount(1);
+
+        for (int i = 0; i < 10; i++) {
+            if (!workerMock.getExchanges().isEmpty()) {
+                break;
+            }
+            TimeUnit.SECONDS.sleep(2);
+        }
+        MockEndpoint.assertIsSatisfied(context);
+
+        List<Exchange> exchanges = workerMock.getExchanges();
+        for (Exchange exchange : exchanges) {
+            JobWorkerMessage jobWorkerMessage = 
exchange.getIn().getBody(JobWorkerMessage.class);
+            if (jobWorkerMessage != null) {
+                assertTrue(jobWorkerMessage.getKey() > 0);
+                assertTrue(jobWorkerMessage.getProcessInstanceKey() > 0);
+                assertNotNull(jobWorkerMessage.getBpmnProcessId());
+                assertTrue(jobWorkerMessage.getProcessDefinitionVersion() > 0);
+                assertTrue(jobWorkerMessage.getProcessDefinitionKey() > 0);
+                assertNotNull(jobWorkerMessage.getElementId());
+                assertTrue(jobWorkerMessage.getProcessInstanceKey() > 0);
+                assertNotNull(jobWorkerMessage.getWorker());
+                assertTrue(jobWorkerMessage.getRetries() > 0);
+                assertTrue(jobWorkerMessage.getDeadline() > 0);
+
+                JobRequest jobRequest = new JobRequest();
+                jobRequest.setJobKey(jobWorkerMessage.getKey());
+                component.getZeebeService().completeJob(jobRequest);
+            } else {
+                Assertions.fail();

Review Comment:
   It would be good to have a fail message here.



##########
components/camel-zeebe/src/test/java/org/apache/camel/component/zeebe/model/DeploymentRequestTest.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.zeebe.model;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class DeploymentRequestTest {
+
+    private static final String MARSHAL_TEST_RESULT_1 = 
"{\"name\":\"test.bpmn\",\"content\":\"dGVzdCBjb250ZW50\"}";
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    @Test
+    public void marshalTest() {
+        DeploymentRequest message = new DeploymentRequest();
+        message.setName("test.bpmn");
+        message.setContent("test content".getBytes());
+
+        try {
+            String messageString = objectMapper.writeValueAsString(message);
+            assertEquals(MARSHAL_TEST_RESULT_1, messageString);
+        } catch (JsonProcessingException e) {
+            fail("Error in JSON processing");
+        }
+    }
+
+    @Test
+    public void unmarshalTest() {
+        try {
+            DeploymentRequest unmarshalledMessage1 = 
objectMapper.readValue(MARSHAL_TEST_RESULT_1, DeploymentRequest.class);
+
+            DeploymentRequest message = new DeploymentRequest();
+            message.setName("test.bpmn");
+            message.setContent("test content".getBytes());
+
+            assertEquals(message, unmarshalledMessage1);
+        } catch (JsonProcessingException e) {

Review Comment:
   Same as above.



##########
components/camel-zeebe/src/test/java/org/apache/camel/component/zeebe/ZeebeConsumerIntegrationTest.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.zeebe;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.zeebe.model.DeploymentRequest;
+import org.apache.camel.component.zeebe.model.DeploymentResponse;
+import org.apache.camel.component.zeebe.model.JobRequest;
+import org.apache.camel.component.zeebe.model.JobWorkerMessage;
+import org.apache.camel.component.zeebe.model.ProcessDeploymentResponse;
+import org.apache.camel.component.zeebe.model.ProcessRequest;
+import org.apache.camel.component.zeebe.model.ProcessResponse;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Tag("standalone")
+@MockitoSettings(strictness = Strictness.LENIENT)
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+class ZeebeConsumerIntegrationTest extends CamelTestSupport {
+
+    public static final String TEST_1_DEFINITION_BPMN = 
"test1_definition.bpmn";
+    private ZeebeComponent component;
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    @BeforeAll
+    void initAll() throws Exception {
+        createComponent();
+        component.doStart();
+
+        DeploymentRequest deployProcessMessage = new DeploymentRequest();
+        deployProcessMessage.setName(TEST_1_DEFINITION_BPMN);
+        deployProcessMessage
+                
.setContent(this.getClass().getClassLoader().getResourceAsStream("data/test1_definition.bpmn").readAllBytes());
+
+        DeploymentResponse deploymentResponse = 
component.getZeebeService().deployResource(deployProcessMessage);
+
+        ProcessRequest processRequest = new ProcessRequest();
+        processRequest.setProcessId(((ProcessDeploymentResponse) 
deploymentResponse).getBpmnProcessId());
+        ProcessResponse processResponse = 
component.getZeebeService().startProcess(processRequest);
+    }
+
+    @BeforeEach
+    void init() throws Exception {
+
+    }
+
+    @Test
+    public void shouldProcessJobWorkerMessage() throws Exception {
+        MockEndpoint workerMock = getMockEndpoint("mock:jobWorker");
+        workerMock.expectedMinimumMessageCount(1);
+
+        for (int i = 0; i < 10; i++) {
+            if (!workerMock.getExchanges().isEmpty()) {
+                break;
+            }
+            TimeUnit.SECONDS.sleep(2);
+        }
+        MockEndpoint.assertIsSatisfied(context);
+
+        List<Exchange> exchanges = workerMock.getExchanges();
+        for (Exchange exchange : exchanges) {
+            JobWorkerMessage jobWorkerMessage = 
exchange.getIn().getBody(JobWorkerMessage.class);
+            if (jobWorkerMessage != null) {
+                assertTrue(jobWorkerMessage.getKey() > 0);
+                assertTrue(jobWorkerMessage.getProcessInstanceKey() > 0);
+                assertNotNull(jobWorkerMessage.getBpmnProcessId());
+                assertTrue(jobWorkerMessage.getProcessDefinitionVersion() > 0);
+                assertTrue(jobWorkerMessage.getProcessDefinitionKey() > 0);
+                assertNotNull(jobWorkerMessage.getElementId());
+                assertTrue(jobWorkerMessage.getProcessInstanceKey() > 0);
+                assertNotNull(jobWorkerMessage.getWorker());
+                assertTrue(jobWorkerMessage.getRetries() > 0);
+                assertTrue(jobWorkerMessage.getDeadline() > 0);
+
+                JobRequest jobRequest = new JobRequest();
+                jobRequest.setJobKey(jobWorkerMessage.getKey());
+                component.getZeebeService().completeJob(jobRequest);
+            } else {
+                Assertions.fail();
+            }
+        }
+    }
+
+    @Test
+    public void shouldProcessJobWorkerMessageJSON() throws Exception {
+        MockEndpoint workerMock = getMockEndpoint("mock:jobWorker_JSON");
+        workerMock.expectedMinimumMessageCount(1);
+
+        for (int i = 0; i < 10; i++) {
+            if (!workerMock.getExchanges().isEmpty()) {
+                break;
+            }
+            TimeUnit.SECONDS.sleep(2);
+        }
+        MockEndpoint.assertIsSatisfied(context);
+
+        List<Exchange> exchanges = workerMock.getExchanges();
+        for (Exchange exchange : exchanges) {
+            String jobWorkerMessageString = 
exchange.getIn().getBody(String.class);
+            if (jobWorkerMessageString != null) {
+                JobWorkerMessage jobWorkerMessage = 
objectMapper.readValue(jobWorkerMessageString, JobWorkerMessage.class);
+
+                assertTrue(jobWorkerMessage.getKey() > 0);
+                assertTrue(jobWorkerMessage.getProcessInstanceKey() > 0);
+                assertNotNull(jobWorkerMessage.getBpmnProcessId());
+                assertTrue(jobWorkerMessage.getProcessDefinitionVersion() > 0);
+                assertTrue(jobWorkerMessage.getProcessDefinitionKey() > 0);
+                assertNotNull(jobWorkerMessage.getElementId());
+                assertTrue(jobWorkerMessage.getProcessInstanceKey() > 0);
+                assertNotNull(jobWorkerMessage.getWorker());
+                assertTrue(jobWorkerMessage.getRetries() > 0);
+                assertTrue(jobWorkerMessage.getDeadline() > 0);
+
+                JobRequest jobRequest = new JobRequest();
+                jobRequest.setJobKey(jobWorkerMessage.getKey());
+                component.getZeebeService().completeJob(jobRequest);
+            } else {
+                Assertions.fail();

Review Comment:
   Same note about `fail` messages.



##########
components/camel-zeebe/src/main/java/org/apache/camel/component/zeebe/ZeebeEndpoint.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.zeebe;
+
+import org.apache.camel.Category;
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.zeebe.internal.OperationName;
+import org.apache.camel.component.zeebe.internal.ZeebeService;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+import org.apache.camel.support.DefaultEndpoint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Zeebe component which does bla bla.
+ *
+ * TODO: Update one line description above what the component does.
+ */
+@UriEndpoint(firstVersion = "1.0-SNAPSHOT", scheme = "zeebe", title = "Zeebe", 
syntax = "zeebe:operationName",
+             category = { Category.JAVA },
+             headersClass = ZeebeConstants.class)
+public class ZeebeEndpoint extends DefaultEndpoint {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ZeebeEndpoint.class);
+
+    @UriPath(label = "common", description = "The operation to use", enums = 
"startProcess," +
+                                                                             
"cancelProcess,publishMessage,completeJob,failJob,updateJobRetries,worker,throwError,deployResource")
+    @Metadata(required = true)
+    private OperationName operationName;
+
+    @UriParam(defaultValue = "false")
+    @Metadata(description = "Format the result in the body as JSON.")
+    private boolean formatJSON;
+
+    @UriParam
+    @Metadata(label = "consumer", description = "JobKey for the job worker.", 
javaType = "String")
+    private String jobKey;
+
+    @UriParam(defaultValue = "10")
+    @Metadata(label = "consumer", description = "Timeout for job worker.", 
javaType = "int")
+    private int timeout = 10;
+
+    public ZeebeEndpoint() {
+    }
+
+    public ZeebeEndpoint(String uri, ZeebeComponent component, OperationName 
operationName) {
+        super(uri, component);
+
+        this.operationName = operationName;
+    }
+
+    public Producer createProducer() throws Exception {
+        if (operationName == null) {
+            throw new IllegalArgumentException(String.format("Invalid 
Operation"));

Review Comment:
   The `String.format` seems unnecessary here. Also, a bit more details about 
the state of `operationName` would be helpful (i.e.; "Invalid operation: null 
operation name"); 
   
   Lastly, probably a better check w/ the `ObjectHelper.notNull`.



##########
components/camel-zeebe/src/test/java/org/apache/camel/component/zeebe/model/DeploymentRequestTest.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.zeebe.model;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class DeploymentRequestTest {
+
+    private static final String MARSHAL_TEST_RESULT_1 = 
"{\"name\":\"test.bpmn\",\"content\":\"dGVzdCBjb250ZW50\"}";
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    @Test
+    public void marshalTest() {
+        DeploymentRequest message = new DeploymentRequest();
+        message.setName("test.bpmn");
+        message.setContent("test content".getBytes());
+
+        try {
+            String messageString = objectMapper.writeValueAsString(message);
+            assertEquals(MARSHAL_TEST_RESULT_1, messageString);
+        } catch (JsonProcessingException e) {
+            fail("Error in JSON processing");
+        }

Review Comment:
   Please use `Assert.assertDoesNotThrow`  whenever you need to assert some 
piece of test code does not throw an exception.



##########
components/camel-zeebe/src/main/java/org/apache/camel/component/zeebe/ZeebeConsumer.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.zeebe;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.camunda.zeebe.client.api.response.ActivatedJob;
+import io.camunda.zeebe.client.api.worker.JobClient;
+import io.camunda.zeebe.client.api.worker.JobHandler;
+import io.camunda.zeebe.client.api.worker.JobWorker;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelException;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.component.zeebe.internal.OperationName;
+import org.apache.camel.component.zeebe.model.JobWorkerMessage;
+import org.apache.camel.support.DefaultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZeebeConsumer extends DefaultConsumer {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ZeebeConsumer.class);
+
+    private final ZeebeEndpoint endpoint;
+
+    private JobWorker jobWorker;
+
+    private ObjectMapper objectMapper = new ObjectMapper();
+
+    public ZeebeConsumer(ZeebeEndpoint endpoint, Processor processor) throws 
CamelException {
+        super(endpoint, processor);
+        this.endpoint = endpoint;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        final OperationName operationName = getEndpoint().getOperationName();
+        switch (operationName) {
+            case REGISTER_JOB_WORKER:
+                if (getEndpoint().getJobKey() == null) {
+                    LOG.error("Missing JobKey");
+                    throw new CamelException("Missing JobKey");
+                }

Review Comment:
   I think `ObjectHelper.notNull` would do the trick here. That's what we 
normally use.



##########
components/camel-zeebe/src/test/java/org/apache/camel/component/zeebe/ZeebeConsumerIntegrationTest.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.zeebe;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.zeebe.model.DeploymentRequest;
+import org.apache.camel.component.zeebe.model.DeploymentResponse;
+import org.apache.camel.component.zeebe.model.JobRequest;
+import org.apache.camel.component.zeebe.model.JobWorkerMessage;
+import org.apache.camel.component.zeebe.model.ProcessDeploymentResponse;
+import org.apache.camel.component.zeebe.model.ProcessRequest;
+import org.apache.camel.component.zeebe.model.ProcessResponse;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Tag("standalone")
+@MockitoSettings(strictness = Strictness.LENIENT)
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+class ZeebeConsumerIntegrationTest extends CamelTestSupport {
+
+    public static final String TEST_1_DEFINITION_BPMN = 
"test1_definition.bpmn";
+    private ZeebeComponent component;
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    @BeforeAll
+    void initAll() throws Exception {
+        createComponent();
+        component.doStart();
+
+        DeploymentRequest deployProcessMessage = new DeploymentRequest();
+        deployProcessMessage.setName(TEST_1_DEFINITION_BPMN);
+        deployProcessMessage
+                
.setContent(this.getClass().getClassLoader().getResourceAsStream("data/test1_definition.bpmn").readAllBytes());
+
+        DeploymentResponse deploymentResponse = 
component.getZeebeService().deployResource(deployProcessMessage);
+
+        ProcessRequest processRequest = new ProcessRequest();
+        processRequest.setProcessId(((ProcessDeploymentResponse) 
deploymentResponse).getBpmnProcessId());
+        ProcessResponse processResponse = 
component.getZeebeService().startProcess(processRequest);
+    }
+
+    @BeforeEach
+    void init() throws Exception {
+
+    }
+
+    @Test
+    public void shouldProcessJobWorkerMessage() throws Exception {
+        MockEndpoint workerMock = getMockEndpoint("mock:jobWorker");
+        workerMock.expectedMinimumMessageCount(1);
+
+        for (int i = 0; i < 10; i++) {
+            if (!workerMock.getExchanges().isEmpty()) {
+                break;
+            }
+            TimeUnit.SECONDS.sleep(2);
+        }
+        MockEndpoint.assertIsSatisfied(context);
+
+        List<Exchange> exchanges = workerMock.getExchanges();
+        for (Exchange exchange : exchanges) {
+            JobWorkerMessage jobWorkerMessage = 
exchange.getIn().getBody(JobWorkerMessage.class);
+            if (jobWorkerMessage != null) {
+                assertTrue(jobWorkerMessage.getKey() > 0);
+                assertTrue(jobWorkerMessage.getProcessInstanceKey() > 0);
+                assertNotNull(jobWorkerMessage.getBpmnProcessId());
+                assertTrue(jobWorkerMessage.getProcessDefinitionVersion() > 0);
+                assertTrue(jobWorkerMessage.getProcessDefinitionKey() > 0);
+                assertNotNull(jobWorkerMessage.getElementId());
+                assertTrue(jobWorkerMessage.getProcessInstanceKey() > 0);
+                assertNotNull(jobWorkerMessage.getWorker());
+                assertTrue(jobWorkerMessage.getRetries() > 0);
+                assertTrue(jobWorkerMessage.getDeadline() > 0);
+
+                JobRequest jobRequest = new JobRequest();
+                jobRequest.setJobKey(jobWorkerMessage.getKey());
+                component.getZeebeService().completeJob(jobRequest);
+            } else {
+                Assertions.fail();
+            }
+        }
+    }
+
+    @Test
+    public void shouldProcessJobWorkerMessageJSON() throws Exception {
+        MockEndpoint workerMock = getMockEndpoint("mock:jobWorker_JSON");
+        workerMock.expectedMinimumMessageCount(1);
+
+        for (int i = 0; i < 10; i++) {
+            if (!workerMock.getExchanges().isEmpty()) {
+                break;
+            }
+            TimeUnit.SECONDS.sleep(2);

Review Comment:
   Same note about `sleeps` as I raised before.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to