This is an automated email from the ASF dual-hosted git repository.

wmedvedeo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-kie-tools.git


The following commit(s) were added to refs/heads/main by this push:
     new e198bc4a5d7 kie-tools-2910: Update workflow metadata upon deployment 
events (#3035)
e198bc4a5d7 is described below

commit e198bc4a5d7210d7101201fff240f1d5d181db5c
Author: Walter Medvedeo <[email protected]>
AuthorDate: Fri Mar 28 17:51:19 2025 +0100

    kie-tools-2910: Update workflow metadata upon deployment events (#3035)
---
 packages/sonataflow-operator/test/e2e/helpers.go   | 81 ++++++++++++++++++++++
 .../sonataflow-operator/test/e2e/platform_test.go  | 23 ++++++
 2 files changed, 104 insertions(+)

diff --git a/packages/sonataflow-operator/test/e2e/helpers.go 
b/packages/sonataflow-operator/test/e2e/helpers.go
index 8cebfa87dc4..e44e8af4687 100644
--- a/packages/sonataflow-operator/test/e2e/helpers.go
+++ b/packages/sonataflow-operator/test/e2e/helpers.go
@@ -400,3 +400,84 @@ func getPodNameAfterWorkflowInstCreation(name, ns string) 
(string, error) {
        }
        return "", fmt.Errorf("invalid data received: %s", string(out))
 }
+
+// extractJSONResponse utility function to extract the json portion of the 
output produced when we execute commands
+// inside a pod via kubectl exec podname xxxx. On the basis of course that the 
given command execution produces a json.
+// Below we show an example of the full output returned by the kubectl exec 
podname xxxx command execution, however we
+// are only interested on the json part: {"data":{"ProcessDefinitions":[]}}
+//
+// % Total    % Received % Xferd  Average Speed   Time    Time     Time  
Current
+// Dload  Upload   Total   Spent    Left  Speed
+//
+// 0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     
0{"data":{"ProcessDefinitions":[]}}
+// 100    85  100    34  100    51   8500  12750 --:--:-- --:--:-- --:--:-- 
21250
+func extractJSONResponse(terminalOutput string) string {
+       ind1 := strings.Index(terminalOutput, "{")
+       ind2 := strings.LastIndex(terminalOutput, "}")
+       return terminalOutput[ind1 : ind2+1]
+}
+
+// verifyWorkflowDefinitionIsInStatus returns true if the workflow definition 
has the status == expectedStatus in the
+// target data-index, false in any other case.
+func verifyWorkflowDefinitionIsInStatus(podName string, containerName string, 
namespace, dataIndexServiceName string, workflowId string, expectedStatus 
string) bool {
+       status, ok, err := getWorkflowDefinitionStatus(podName, containerName, 
namespace, dataIndexServiceName, workflowId)
+       if err != nil {
+               GinkgoWriter.Println(fmt.Errorf("failed to verify workflow 
definition status for workflow: %s : %v", workflowId, err))
+       }
+       return ok && status == expectedStatus
+}
+
+// getWorkflowDefinitionStatus returns the "status" of a workflow definition, 
true if the workflow definition was found
+// for that workflowId, false if not found. Returned value is read as follows:
+// true, ""            : the "status" was not set in the workflow definition 
metadata. (not yet populated)
+// true, "available"   : workflow definition is available.
+// true, "unavailable" : workflow definition is not available.
+// false, ""           : workflow definition was not found.
+func getWorkflowDefinitionStatus(podName string, containerName string, 
namespace string, dataIndexServiceName string, workflowId string) (string, 
bool, error) {
+
+       // DI query example like executed in bash terminal: {"query" : "{ 
ProcessDefinitions ( where:{id: {equal: \"greet\"}} ) { metadata } }"  }
+       // WF not found query result example: {"data":{"ProcessDefinitions":[]}}
+       // WF found query result example: 
{"data":{"ProcessDefinitions":[{"metadata":{"status":"available","Variable":"workflowdata","Description":"YAML
 based greeting workflow"}}]}}
+
+       query := fmt.Sprintf("{\"query\" : \"{ ProcessDefinitions (where:{id: 
{equal: \\\"%s\\\"}} ) { metadata } }\"  }", workflowId)
+       curlCmd := fmt.Sprintf("curl -H 'Content-Type: application/json' -H 
'Accept: application/json' -X POST --data '%s' http://%s/graphql";, query, 
dataIndexServiceName)
+       fmt.Printf("querying workflow definition metadata for workflowId: %s, 
curl: %s\n", workflowId, curlCmd)
+
+       // execute with bash command to ensure the json response is not clipped 
from the terminal output
+       cmd := exec.Command("kubectl", "exec", podName, "-c", containerName, 
"-n", namespace, "--", "/bin/bash", "-c", curlCmd)
+       output, err := utils.Run(cmd)
+       if err != nil {
+               return "", false, fmt.Errorf("failed to execute query against 
data-index service: %s, from pod: %s, containter: %s, output: %s, error: %v", 
dataIndexServiceName, podName, containerName, output, err)
+       }
+       stringOutput := string(output)
+       jsonOutput := extractJSONResponse(stringOutput)
+       fmt.Printf("query result: %s\n", jsonOutput)
+
+       queryResult := make(map[string]interface{})
+       err = json.Unmarshal([]byte(jsonOutput), &queryResult)
+       if err != nil {
+               return "", false, fmt.Errorf("failed to parse data-index query 
result from query against data-index service: %s, from pod: %s, container: %s, 
queryResult: %s, error: %v", dataIndexServiceName, podName, containerName, 
queryResult, err)
+       }
+       rawData, ok := queryResult["data"]
+       if !ok {
+               // the "data" field must be present, if not, an error was 
produced in the DI, e.g., the query was formulated wrong.
+               return "", false, fmt.Errorf("failed to execute data-index 
query against data-index service: %s, from pod: %s, container: %s. It looks 
like the query was formulated wrong, query: %s, queryResult: %s", 
dataIndexServiceName, podName, containerName, query, queryResult)
+       }
+       data := rawData.(map[string]interface{})
+       definitions := data["ProcessDefinitions"].([]interface{})
+       if len(definitions) == 0 {
+               // workflow definition not found
+               return "", false, nil
+       }
+       definition := definitions[0].(map[string]interface{})
+       rawMetadata, ok := definition["metadata"]
+       if !ok {
+               return "", false, nil
+       }
+       metadata := rawMetadata.(map[string]interface{})
+       available, ok := metadata["status"]
+       if !ok {
+               return "", false, nil
+       }
+       return available.(string), true, nil
+}
diff --git a/packages/sonataflow-operator/test/e2e/platform_test.go 
b/packages/sonataflow-operator/test/e2e/platform_test.go
index 671a0705623..b72dc5116b5 100644
--- a/packages/sonataflow-operator/test/e2e/platform_test.go
+++ b/packages/sonataflow-operator/test/e2e/platform_test.go
@@ -187,6 +187,29 @@ var _ = Describe("Platform Use Cases :: ", 
Label("platform"), Ordered, func() {
                                        return 
verifyWorkflowIsInRunningState(sf, targetNamespace)
                                }, 10*time.Minute, 
5*time.Second).Should(BeTrue())
                        }
+
+                       if profile != metadata.DevProfile {
+                               By("Verify that the workflow definition is 
available")
+                               cmd = exec.Command("kubectl", "get", "pod", 
"-l", "app.kubernetes.io/name in (data-index-service)", "-n", targetNamespace, 
"-ojsonpath={.items[*].metadata.name}")
+                               output, err = utils.Run(cmd)
+                               Expect(err).NotTo(HaveOccurred())
+                               dataIndexPod := string(output)
+
+                               EventuallyWithOffset(1, func() bool {
+                                       return 
verifyWorkflowDefinitionIsInStatus(dataIndexPod, "data-index-service", 
targetNamespace, "sonataflow-platform-data-index-service", 
"callbackstatetimeouts", "available")
+                               }, 10*time.Minute, 
5*time.Second).Should(BeTrue())
+
+                               By("Undeploy the SonataFlow CR")
+                               cmd = exec.Command("kubectl", "delete", "-n", 
targetNamespace, "-f", filepath.Join(projectDir,
+                                       testcaseDir, profile.String(), 
persistenceType, "sonataflow"))
+                               manifests, err = utils.Run(cmd)
+                               Expect(err).NotTo(HaveOccurred())
+
+                               By("Verify that the workflow definition is 
unavailable")
+                               EventuallyWithOffset(1, func() bool {
+                                       return 
verifyWorkflowDefinitionIsInStatus(dataIndexPod, "data-index-service", 
targetNamespace, "sonataflow-platform-data-index-service", 
"callbackstatetimeouts", "unavailable")
+                               }, 10*time.Minute, 
5*time.Second).Should(BeTrue())
+                       }
                },
                        Entry("with both Job Service and Data Index and 
ephemeral persistence and the workflow in a dev profile", 
test.GetPathFromE2EDirectory("platform", "services"), metadata.DevProfile, 
ephemeral),
                        Entry("with both Job Service and Data Index and 
ephemeral persistence and the workflow in a gitops profile", 
test.GetPathFromE2EDirectory("platform", "services"), metadata.GitOpsProfile, 
ephemeral),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to