Add new internal static library libthread.a that provides infrastructure
for eu-* tools to run functions concurrently using pthreads.

threadlib.c manages per-job threads as well as per-job buffers for stdout
output.  Output for each job is printed to stdout in the order that the
jobs were added to the job queue.  This helps preserve the order of
output when parallelization is added to an eu-* tool.

threadlib.h declares functions add_job and run_jobs. Jobs are added to
a threadlib.c internal job queue using add_job. run_jobs concurrently
executes jobs in parallel.

eu-readelf now links against libthread.a when elfutils is configured
with --enable-thread-safety.

        * src/Makefile.am: libthread.a is compiled and and linked with
        readelf when USE_LOCKS is defined.
        * src/threadlib.c: New file. Manages job creation, concurrent
        execution and output handling.
        * src/threadlib.h: New file. Declares functions add_job and
        run_jobs.

Signed-off-by: Aaron Merey <ame...@redhat.com>

---
v3:
Document that add_job cannot be called during run_jobs.

Add noinst_HEADERS and EXTRA_libtreahd_a_DEPENDENCIES to src/Makefile.am

Replace a printf call with fwrite.

Add a comment clarifying the use of NULL in the job queue.

The main thread now waits on a condition variable after creating worker
threads.  A worker thread will signal this condition variable when a job
completes.

> On Thu, May 22, 2025 at 06:28:52PM -0400, Aaron Merey wrote:
> > +typedef enum {
> > +  /* pthread_create has not been called.  */
> > +  NOT_STARTED,
> > +
> > +  /* pthread_create has been called.  */
> > +  STARTED,
> > +
> > +  /* The thread has finished running the job but has not been joined.  */
> > +  DONE,
> > +
> > +  /* pthread_join has been called.  */
> > +  JOINED
> > +} thread_state_t;
>
> Do we have to think about the state of other possibly failed jobs?
> Or does a failed job just call exit and that terminates everything?

Yes exit will terminate the whole program in this case.

 src/Makefile.am |  16 +++
 src/threadlib.c | 275 ++++++++++++++++++++++++++++++++++++++++++++++++
 src/threadlib.h |  36 +++++++
 3 files changed, 327 insertions(+)
 create mode 100644 src/threadlib.c
 create mode 100644 src/threadlib.h

diff --git a/src/Makefile.am b/src/Makefile.am
index 4a3fb957..f041d458 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -52,6 +52,19 @@ libar.manifest: $(libar_a_OBJECTS)
 MOSTLYCLEANFILES = *.gconv
 CLEANFILES = $(bin_SCRIPTS) $(EXTRA_libar_a_DEPENDENCIES)
 
+if USE_LOCKS
+noinst_LIBRARIES += libthread.a
+noinst_HEADERS = threadlib.h
+libthread_a_SOURCES = threadlib.c
+EXTRA_DIST += threadlib.h
+EXTRA_libthread_a_DEPENDENCIES = libthread.manifest
+
+libthread.manifest: $(libthread_a_OBJECTS)
+       $(AM_V_GEN)echo $^ > $@
+
+CLEANFILES += $(EXTRA_libthread_a_DEPENDENCIES)
+endif
+
 if BUILD_STATIC
 libasm = ../libasm/libasm.a
 libdw = ../libdw/libdw.a -lz $(zip_LIBS) $(libelf) -ldl -lpthread
@@ -91,6 +104,9 @@ ar_no_Wstack_usage = yes
 unstrip_no_Wstack_usage = yes
 
 readelf_LDADD = $(libdw) $(libebl) $(libelf) $(libeu) $(argp_LDADD)
+if USE_LOCKS
+readelf_LDADD += libthread.a
+endif
 nm_LDADD = $(libdw) $(libebl) $(libelf) $(libeu) $(argp_LDADD) $(obstack_LIBS) 
\
           $(demanglelib)
 size_LDADD = $(libelf) $(libeu) $(argp_LDADD)
diff --git a/src/threadlib.c b/src/threadlib.c
new file mode 100644
index 00000000..cf89c2e3
--- /dev/null
+++ b/src/threadlib.c
@@ -0,0 +1,275 @@
+/* Functions for running jobs concurrently.
+   Copyright (C) 2025 Red Hat, Inc.
+   This file is part of elfutils.
+
+   This file is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 3 of the License, or
+   (at your option) any later version.
+
+   elfutils is distributed in the hope that it will be useful, but
+   WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program.  If not, see <http://www.gnu.org/licenses/>.  */
+
+#ifdef HAVE_CONFIG_H
+# include <config.h>
+#endif
+
+#include <error.h>
+#include <stdbool.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdatomic.h>
+
+#include "threadlib.h"
+
+/* Dynamic buffer for thread output.  */
+typedef struct {
+  size_t sizeloc;
+  char *buf;
+  FILE *file;
+} output_stream_t;
+
+/* Allocate resources for STREAM.  */
+static void
+init_thread_output_stream (output_stream_t *stream)
+{
+  stream->buf = NULL;
+  stream->sizeloc = 0;
+  stream->file = open_memstream (&(stream->buf), &(stream->sizeloc));
+
+  if (stream->file == NULL)
+    error (1, 0, _("cannot open thread output stream"));
+}
+
+/* Print and deallocate resources for STREAM.  */
+static void
+print_thread_output_stream (output_stream_t *stream)
+{
+  /* fclose may update stream->buf.  */
+  if (fclose (stream->file) != 0)
+    error (1, 0, _("cannot close thread output stream"));
+
+  fwrite (stream->buf, 1, stream->sizeloc, stdout);
+  free (stream->buf);
+}
+
+typedef enum {
+  /* pthread_create has not been called.  */
+  NOT_STARTED,
+
+  /* pthread_create has been called.  */
+  STARTED,
+
+  /* The thread has finished running the job but has not been joined.  */
+  DONE,
+
+  /* pthread_join has been called.  */
+  JOINED
+} thread_state_t;
+
+struct job_t {
+  /* A job consists of calling this function then printing any output
+     to stdout.  This function is run from thread_start_job, which also
+     initializes the FILE *.  */
+  void *(*start_routine)(void *, FILE *);
+
+  /* Arg passed to start_routine.  */
+  void *arg;
+
+  /* Thread to run start_routine.  */
+  pthread_t thread;
+
+  /* See thread_state_t.  */
+  _Atomic thread_state_t state;
+
+  /* Dynamic buffer for output generated during start_routine.
+     Contents will get printed to stdout when a job finishes.  */
+  output_stream_t stream;
+
+  /* Next job in the linked list.  */
+  struct job_t *next;
+};
+
+typedef struct {
+  struct job_t *head;
+  struct job_t *tail;
+} job_queue_t;
+
+/* If HEAD is NULL, no jobs are currently in the queue.  While jobs are
+   being added with add_job, HEAD points to the first job that was added
+   and TAIL points to the most recently added job.  When run_jobs returns,
+   HEAD and TAIL will be NULL.  */
+static job_queue_t jobs = { NULL, NULL };
+
+void
+add_job (void *(*start_routine)(void *, FILE *), void *arg)
+{
+  struct job_t *job = malloc (sizeof (struct job_t));
+
+  if (job == NULL)
+    error (1, 0, _("cannot create job"));
+
+  job->start_routine = start_routine;
+  job->arg = arg;
+  job->next = NULL;
+  atomic_store (&job->state, NOT_STARTED);
+
+  /* Insert job into the job queue.  */
+  if (jobs.head == NULL)
+    {
+      assert (jobs.tail == NULL);
+      jobs.head = job;
+      jobs.tail = job;
+    }
+  else
+    {
+      assert (jobs.tail != NULL);
+      jobs.tail->next = job;
+      jobs.tail = job;
+    }
+}
+
+static pthread_mutex_t job_mutex = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t job_cv = PTHREAD_COND_INITIALIZER;
+
+/* Thread entry point.  */
+static void *
+thread_start_job (void *arg)
+{
+  struct job_t *job = (struct job_t *) arg;
+
+  init_thread_output_stream (&job->stream);
+  job->start_routine (job->arg, job->stream.file);
+
+  /* Wake up main thread.  */
+  pthread_mutex_lock(&job_mutex);
+  atomic_store (&job->state, DONE);
+  pthread_cond_signal (&job_cv);
+  pthread_mutex_unlock (&job_mutex);
+
+  return NULL;
+}
+
+/* Run all jobs that have been added to the job queue by add_job.  */
+void
+run_jobs (int max_threads)
+{
+  if (jobs.head == NULL)
+    {
+      assert (jobs.tail == NULL);
+      return;
+    }
+  assert (jobs.tail != NULL);
+
+  /* jobs.tail was only needed to facilitate adding jobs.  */
+  jobs.tail = NULL;
+  int num_threads = 0;
+
+  /* Start no more than MAX_THREAD jobs.  */
+  for (struct job_t *job = jobs.head;
+       job != NULL && num_threads < max_threads;
+       job = job->next)
+    {
+      assert (job->start_routine != NULL);
+      atomic_store (&job->state, STARTED);
+
+      if (pthread_create (&job->thread, NULL,
+                         thread_start_job, (void *) job) != 0)
+       error(1, 0, _("cannot create thread"));
+      num_threads++;
+    }
+
+  int available_threads = max_threads - num_threads;
+  assert (available_threads >= 0);
+
+  pthread_mutex_lock (&job_mutex);
+
+  /* Iterate over the jobs until all have completed and all output has
+     been printed.  */
+  while (jobs.head != NULL)
+    {
+      /* Job output should be printed in the same order that the jobs
+        were added.  Track whether there is at least one previous job
+         whose output hasn't been printed yet.  If true, then defer
+         printing for the current job.  */
+      bool wait_to_print = false;
+      bool made_progress = false;
+
+      struct job_t *job = jobs.head;
+      struct job_t *prev = NULL;
+      while (job != NULL)
+       {
+          /* Check whether this job should be started.  */
+         if (atomic_load (&job->state) == NOT_STARTED)
+           {
+             /* Start this job if there is an available thread.  */
+             if (available_threads > 0)
+               {
+                 atomic_store (&job->state, STARTED);
+                 if (pthread_create (&job->thread, NULL,
+                                     thread_start_job, (void *) job) != 0)
+                   error (1, 0, _("cannot create thread"));
+
+                 available_threads--;
+                 made_progress = true;
+               }
+           }
+
+         /* Join thread if we haven't done so already.  */
+         if (atomic_load (&job->state) == DONE)
+           {
+             if (pthread_join (job->thread, NULL) != 0)
+               error (1, 0, _("cannot join thread"));
+
+             atomic_store (&job->state, JOINED);
+             available_threads++;
+             made_progress = true;
+           }
+
+         /* Print job output if it hasn't already been printed and
+            there is no unprinted output from a previous job.
+
+            Once a job's output has been printed all resources for
+            the job can be freed and it can be removed from the
+            job queue.  */
+         if (atomic_load (&job->state) == JOINED && !wait_to_print)
+           {
+             print_thread_output_stream (&job->stream);
+
+             /* Remove this job from the queue.  */
+             if (prev == NULL)
+               {
+                 /* This job is at the beginning of the queue.  */
+                 jobs.head = job->next;
+
+                 free (job);
+                 job = jobs.head;
+               }
+             else
+               {
+                 prev->next = job->next;
+
+                 free (job);
+                 job = prev->next;
+               }
+
+             made_progress = true;
+             continue;
+           }
+
+         prev = job;
+         job = job->next;
+         wait_to_print = true;
+       }
+
+      if (!made_progress && jobs.head != NULL)
+        pthread_cond_wait(&job_cv, &job_mutex);
+    }
+
+  pthread_mutex_unlock(&job_mutex);
+}
diff --git a/src/threadlib.h b/src/threadlib.h
new file mode 100644
index 00000000..6cbbc35d
--- /dev/null
+++ b/src/threadlib.h
@@ -0,0 +1,36 @@
+/* Copyright (C) 2025 Red Hat, Inc.
+   This file is part of elfutils.
+
+   This file is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 3 of the License, or
+   (at your option) any later version.
+
+   elfutils is distributed in the hope that it will be useful, but
+   WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program.  If not, see <http://www.gnu.org/licenses/>.  */
+
+#ifndef _THREADLIB_H
+#define _THREADLIB_H   1
+
+/* Add a job to the job queue.  When the job is run using run_job, it will
+   consist of start_routine called with ARG as well as a FILE *. The
+   contents of the FILE will be printed to stdout once start_routine
+   finishes.  */
+extern void add_job (void *(*start_routine)(void *, FILE *), void *arg);
+
+/* Run all jobs that have been added by add_job.  Jobs run in parallel
+   using at most MAX_THREADS threads.
+
+   run_jobs returns when all jobs have finished and any output from the
+   jobs has been printed to stdout.  Output from each job is printed in
+   the order which jobs were added using add_job.
+
+   While run_jobs executes, new jobs should not be added with add_job.   */
+extern void run_jobs (int max_threads);
+
+#endif  /* threadlib.h */
-- 
2.49.0

Reply via email to