Module: Mesa
Branch: main
Commit: 969ede45210ad9134236681a2965e28f51b83825
URL:    
http://cgit.freedesktop.org/mesa/mesa/commit/?id=969ede45210ad9134236681a2965e28f51b83825

Author: Guilherme Gallo <[email protected]>
Date:   Tue Oct 24 00:33:31 2023 -0300

ci/bin: Refactor create_job_needs_dag

The function is getting too big, let's add comments, docstrings to the
most important function, new type hints and extract methods from it to
make it easier to read.

Signed-off-by: Guilherme Gallo <[email protected]>
Part-of: <https://gitlab.freedesktop.org/mesa/mesa/-/merge_requests/25858>

---

 bin/ci/gitlab_gql.py | 108 ++++++++++++++++++++++++++++++++++-----------------
 1 file changed, 72 insertions(+), 36 deletions(-)

diff --git a/bin/ci/gitlab_gql.py b/bin/ci/gitlab_gql.py
index b03309da652..a5fd27efa8d 100755
--- a/bin/ci/gitlab_gql.py
+++ b/bin/ci/gitlab_gql.py
@@ -18,6 +18,7 @@ from gql.transport.aiohttp import AIOHTTPTransport
 from graphql import DocumentNode
 
 Dag = dict[str, set[str]]
+StageSeq = OrderedDict[str, set[str]]
 TOKEN_DIR = Path(getenv("XDG_CONFIG_HOME") or Path.home() / ".config")
 
 
@@ -83,57 +84,92 @@ class GitlabGQL:
         self.query._db.clear()
 
 
-def create_job_needs_dag(
-    gl_gql: GitlabGQL, params
-) -> tuple[Dag, dict[str, dict[str, Any]]]:
-    result = gl_gql.query("pipeline_details.gql", params)
-    incomplete_dag = defaultdict(set)
-    jobs = {}
-    pipeline = result["project"]["pipeline"]
-    if not pipeline:
-        raise RuntimeError(f"Could not find any pipelines for {params}")
-
-    # Record the stage sequence to post process deps that are not based on 
needs
-    # field, for example: sanity job
-    stage_sequence: OrderedDict[str, set[str]] = OrderedDict()
-    for stage in pipeline["stages"]["nodes"]:
-        stage_jobs: set[str] = set()
-        for stage_job in stage["groups"]["nodes"]:
-            for job in stage_job["jobs"]["nodes"]:
-                stage_jobs.add(job["name"])
-                needs = job.pop("needs")["nodes"]
-                jobs[job["name"]] = job
-                incomplete_dag[job["name"]] = {node["name"] for node in needs}
-                # ensure that all needed nodes its in the graph
-                [incomplete_dag[node["name"]] for node in needs]
-        stage_sequence[stage["name"]] = stage_jobs
-
+def insert_early_stage_jobs(dag: Dag, stage_sequence: StageSeq, jobs_metadata: 
dict) -> Dag:
     pre_processed_dag: Dag = {}
-    final_dag: Dag = {}
-    for job, needs in incomplete_dag.items():
+    jobs_from_early_stages = list(accumulate(stage_sequence.values(), 
set.union))
+    for job_name, needs in dag.items():
         final_needs: set = deepcopy(needs)
         # Pre-process jobs that are not based on needs field
         # e.g. sanity job in mesa MR pipelines
         if not final_needs:
-            for stage_index, stage_jobs in enumerate(stage_sequence.values()):
-                if job in stage_jobs:
-                    break
+            job_stage = jobs_metadata[job_name]["stage"]["name"]
+            stage_index = list(stage_sequence.keys()).index(job_stage)
+            if stage_index > 0:
+                final_needs |= jobs_from_early_stages[stage_index - 1]
+        pre_processed_dag[job_name] = final_needs
 
-            for prev_stage, prev_stage_jobs in 
list(stage_sequence.items())[:stage_index]:
-                final_needs |= prev_stage_jobs
-        pre_processed_dag[job] = final_needs
+    return pre_processed_dag
 
-    for job, needs in pre_processed_dag.items():
+
+def traverse_dag_needs(dag: Dag) -> None:
+    for job, needs in dag.items():
         final_needs: set = deepcopy(needs)
         # Post process jobs that are based on needs field
         partial = True
 
         while partial:
-            next_depth = {n for dn in final_needs for n in 
pre_processed_dag[dn]}
+            next_depth = {n for dn in final_needs for n in dag[dn]}
             partial = not final_needs.issuperset(next_depth)
             final_needs = final_needs.union(next_depth)
 
-        final_dag[job] = final_needs
+        dag[job] = final_needs
+
+
+def extract_stages_and_job_needs(pipeline_result: dict[str, Any]) -> 
tuple[Dag, StageSeq, dict]:
+    incomplete_dag = defaultdict(set)
+    jobs_metadata = {}
+    # Record the stage sequence to post process deps that are not based on 
needs
+    # field, for example: sanity job
+    stage_sequence: OrderedDict[str, set[str]] = OrderedDict()
+    for stage in pipeline_result["stages"]["nodes"]:
+        stage_jobs: set[str] = set()
+        for stage_job in stage["groups"]["nodes"]:
+            for job in stage_job["jobs"]["nodes"]:
+                stage_jobs.add(job["name"])
+                needs = job.pop("needs")["nodes"]
+                jobs_metadata[job["name"]] = job
+                incomplete_dag[job["name"]] = {node["name"] for node in needs}
+                # ensure that all needed nodes its in the graph
+                [incomplete_dag[node["name"]] for node in needs]
+        stage_sequence[stage["name"]] = stage_jobs
+
+    return incomplete_dag, stage_sequence, jobs_metadata
+
+
+def create_job_needs_dag(gl_gql: GitlabGQL, params) -> tuple[Dag, dict[str, 
dict[str, Any]]]:
+    """
+    The function `create_job_needs_dag` retrieves pipeline details from 
GitLab, extracts stages and
+    job needs, inserts early stage jobs, and returns the final DAG and job 
dictionary.
+
+    Args:
+        gl_gql (GitlabGQL): The `gl_gql` parameter is an instance of the 
`GitlabGQL` class, which is
+            used to make GraphQL queries to the GitLab API.
+        params: The `params` parameter is a dictionary that contains the 
necessary parameters for
+            the GraphQL query. It is used to specify the details of the 
pipeline for which the job
+            needs DAG is being created.
+            The specific keys and values in the `params` dictionary will 
depend on
+            the requirements of the GraphQL query being executed
+
+    Returns:
+        The function `create_job_needs_dag` returns a tuple containing two 
elements.
+        The first element is the final DAG (Directed Acyclic Graph) 
representing the stages and job
+        dependencies.
+        The second element is a dictionary containing information about the 
jobs in the DAG, where
+        the keys are job names and the values are dictionaries containing 
additional job
+        information.
+    """
+    result = gl_gql.query("pipeline_details.gql", params)
+    pipeline = result["project"]["pipeline"]
+    if not pipeline:
+        raise RuntimeError(f"Could not find any pipelines for {params}")
+
+    incomplete_dag, stage_sequence, jobs_metadata = 
extract_stages_and_job_needs(pipeline)
+    # Fill the DAG with the job needs from stages that don't have any needs 
but still need to wait
+    # for previous stages
+    final_dag = insert_early_stage_jobs(incomplete_dag, stage_sequence, 
jobs_metadata)
+    # Now that each job has its direct needs filled correctly, update the 
"needs" field for each job
+    # in the DAG by performing a topological traversal
+    traverse_dag_needs(final_dag)
 
     return final_dag, jobs_metadata
 

Reply via email to