The entity queue is per repository. It is a queue of files that depend on this repository and need to wait until the repository finished its sync. There is no benefit of a global queue.
In my opinion this is more understandable. -- :wq Claudio Index: main.c =================================================================== RCS file: /cvs/src/usr.sbin/rpki-client/main.c,v retrieving revision 1.115 diff -u -p -r1.115 main.c --- main.c 10 Mar 2021 08:09:41 -0000 1.115 +++ main.c 10 Mar 2021 08:20:00 -0000 @@ -57,6 +57,7 @@ struct repo { char *local; /* local path name */ char *temp; /* temporary file / dir */ char *uris[REPO_MAX_URI]; /* URIs to fetch from */ + struct entityq queue; /* files waiting for this repo */ size_t id; /* identifier (array index) */ int uriidx; /* which URI is fetched */ int loaded; /* whether loaded or not */ @@ -208,15 +209,13 @@ entity_write_req(const struct entity *en * repo, then flush those into the parser process. */ static void -entityq_flush(struct entityq *q, const struct repo *repo) +entityq_flush(struct repo *repo) { struct entity *p, *np; - TAILQ_FOREACH_SAFE(p, q, entries, np) { - if (p->repo < 0 || repo->id != (size_t)p->repo) - continue; + TAILQ_FOREACH_SAFE(p, &repo->queue, entries, np) { entity_write_req(p); - TAILQ_REMOVE(q, p, entries); + TAILQ_REMOVE(&repo->queue, p, entries); entity_free(p); } } @@ -225,9 +224,8 @@ entityq_flush(struct entityq *q, const s * Add the heap-allocated file to the queue for processing. */ static void -entityq_add(struct entityq *q, char *file, enum rtype type, - const struct repo *rp, const unsigned char *pkey, size_t pkeysz, - char *descr) +entityq_add(char *file, enum rtype type, struct repo *rp, + const unsigned char *pkey, size_t pkeysz, char *descr) { struct entity *p; @@ -261,7 +259,7 @@ entityq_add(struct entityq *q, char *fil entity_write_req(p); entity_free(p); } else - TAILQ_INSERT_TAIL(q, p, entries); + TAILQ_INSERT_TAIL(&rp->queue, p, entries); } /* @@ -279,6 +277,7 @@ repo_alloc(void) rp = &rt.repos[rt.reposz++]; rp->id = rt.reposz - 1; + TAILQ_INIT(&rp->queue); return rp; } @@ -390,7 +389,7 @@ repo_fetch(struct repo *rp) /* * Look up a trust anchor, queueing it for download if not found. */ -static const struct repo * +static struct repo * ta_lookup(const struct tal *tal) { struct repo *rp; @@ -424,7 +423,7 @@ ta_lookup(const struct tal *tal) /* * Look up a repository, queueing it for discovery if not found. */ -static const struct repo * +static struct repo * repo_lookup(const char *uri) { char *local, *repo; @@ -494,8 +493,7 @@ repo_filename(const struct repo *repo, c * These are always relative to the directory in which "mft" sits. */ static void -queue_add_from_mft(struct entityq *q, const char *mft, - const struct mftfile *file, enum rtype type) +queue_add_from_mft(const char *mft, const struct mftfile *file, enum rtype type) { char *cp, *nfile; @@ -511,7 +509,7 @@ queue_add_from_mft(struct entityq *q, co * that the repository has already been loaded. */ - entityq_add(q, nfile, type, NULL, NULL, 0, NULL); + entityq_add(nfile, type, NULL, NULL, 0, NULL); } /* @@ -523,7 +521,7 @@ queue_add_from_mft(struct entityq *q, co * check the suffix anyway). */ static void -queue_add_from_mft_set(struct entityq *q, const struct mft *mft) +queue_add_from_mft_set(const struct mft *mft) { size_t i, sz; const struct mftfile *f; @@ -534,7 +532,7 @@ queue_add_from_mft_set(struct entityq *q assert(sz > 4); if (strcasecmp(f->file + sz - 4, ".crl") != 0) continue; - queue_add_from_mft(q, mft->file, f, RTYPE_CRL); + queue_add_from_mft(mft->file, f, RTYPE_CRL); } for (i = 0; i < mft->filesz; i++) { @@ -544,11 +542,11 @@ queue_add_from_mft_set(struct entityq *q if (strcasecmp(f->file + sz - 4, ".crl") == 0) continue; else if (strcasecmp(f->file + sz - 4, ".cer") == 0) - queue_add_from_mft(q, mft->file, f, RTYPE_CER); + queue_add_from_mft(mft->file, f, RTYPE_CER); else if (strcasecmp(f->file + sz - 4, ".roa") == 0) - queue_add_from_mft(q, mft->file, f, RTYPE_ROA); + queue_add_from_mft(mft->file, f, RTYPE_ROA); else if (strcasecmp(f->file + sz - 4, ".gbr") == 0) - queue_add_from_mft(q, mft->file, f, RTYPE_GBR); + queue_add_from_mft(mft->file, f, RTYPE_GBR); else logx("%s: unsupported file type: %s", mft->file, f->file); @@ -559,7 +557,7 @@ queue_add_from_mft_set(struct entityq *q * Add a local TAL file (RFC 7730) to the queue of files to fetch. */ static void -queue_add_tal(struct entityq *q, const char *file) +queue_add_tal(const char *file) { char *nfile, *buf; @@ -580,7 +578,7 @@ queue_add_tal(struct entityq *q, const c } /* Not in a repository, so directly add to queue. */ - entityq_add(q, nfile, RTYPE_TAL, NULL, NULL, 0, buf); + entityq_add(nfile, RTYPE_TAL, NULL, NULL, 0, buf); /* entityq_add makes a copy of buf */ free(buf); } @@ -589,10 +587,10 @@ queue_add_tal(struct entityq *q, const c * Add URIs (CER) from a TAL file, RFC 8630. */ static void -queue_add_from_tal(struct entityq *q, const struct tal *tal) +queue_add_from_tal(const struct tal *tal) { - char *nfile; - const struct repo *repo; + char *nfile; + struct repo *repo; assert(tal->urisz); @@ -600,7 +598,7 @@ queue_add_from_tal(struct entityq *q, co repo = ta_lookup(tal); nfile = ta_filename(repo, 0); - entityq_add(q, nfile, RTYPE_CER, repo, tal->pkey, + entityq_add(nfile, RTYPE_CER, repo, tal->pkey, tal->pkeysz, tal->descr); } @@ -608,10 +606,10 @@ queue_add_from_tal(struct entityq *q, co * Add a manifest (MFT) found in an X509 certificate, RFC 6487. */ static void -queue_add_from_cert(struct entityq *q, const struct cert *cert) +queue_add_from_cert(const struct cert *cert) { - const struct repo *repo; - char *nfile; + struct repo *repo; + char *nfile; repo = repo_lookup(cert->mft); if (repo == NULL) /* bad repository URI */ @@ -619,7 +617,7 @@ queue_add_from_cert(struct entityq *q, c nfile = repo_filename(repo, cert->mft); - entityq_add(q, nfile, RTYPE_MFT, repo, NULL, 0, NULL); + entityq_add(nfile, RTYPE_MFT, repo, NULL, 0, NULL); } /* @@ -629,8 +627,7 @@ queue_add_from_cert(struct entityq *q, c * In all cases, we gather statistics. */ static void -entity_process(int proc, struct stats *st, struct entityq *q, - struct vrp_tree *tree) +entity_process(int proc, struct stats *st, struct vrp_tree *tree) { enum rtype type; struct tal *tal; @@ -651,7 +648,7 @@ entity_process(int proc, struct stats *s case RTYPE_TAL: st->tals++; tal = tal_read(proc); - queue_add_from_tal(q, tal); + queue_add_from_tal(tal); tal_free(tal); break; case RTYPE_CER: @@ -669,7 +666,7 @@ entity_process(int proc, struct stats *s * we're revoked and then we don't want to * process the MFT. */ - queue_add_from_cert(q, cert); + queue_add_from_cert(cert); } else st->certs_invalid++; cert_free(cert); @@ -684,7 +681,7 @@ entity_process(int proc, struct stats *s mft = mft_read(proc); if (mft->stale) st->mfts_stale++; - queue_add_from_mft_set(q, mft); + queue_add_from_mft_set(mft); mft_free(mft); break; case RTYPE_CRL: @@ -838,7 +835,6 @@ main(int argc, char *argv[]) size_t i, outsz = 0, talsz = 0; pid_t procpid, rsyncpid, httppid; int fd[2]; - struct entityq q; struct pollfd pfd[3]; struct roa **out = NULL; char *rsync_prog = "openrsync"; @@ -953,8 +949,6 @@ main(int argc, char *argv[]) if (talsz == 0) err(1, "no TAL files found in %s", "/etc/rpki"); - TAILQ_INIT(&q); - /* change working directory to the cache directory */ if (fchdir(cachefd) == -1) err(1, "fchdir"); @@ -1074,7 +1068,7 @@ main(int argc, char *argv[]) */ for (i = 0; i < talsz; i++) - queue_add_tal(&q, tals[i]); + queue_add_tal(tals[i]); while (entity_queue > 0 && !killme) { pfd[0].events = POLLIN; @@ -1150,7 +1144,7 @@ main(int argc, char *argv[]) "fallback to cache", rt.repos[i].local); rt.repos[i].loaded = 1; stats.repos++; - entityq_flush(&q, &rt.repos[i]); + entityq_flush(&rt.repos[i]); } if ((pfd[2].revents & POLLIN)) { @@ -1163,7 +1157,7 @@ main(int argc, char *argv[]) if (http_done(&rt.repos[i], ok)) { rt.repos[i].loaded = 1; stats.repos++; - entityq_flush(&q, &rt.repos[i]); + entityq_flush(&rt.repos[i]); } } @@ -1173,7 +1167,7 @@ main(int argc, char *argv[]) */ if ((pfd[1].revents & POLLIN)) { - entity_process(proc, &stats, &q, &v); + entity_process(proc, &stats, &v); } } @@ -1183,7 +1177,7 @@ main(int argc, char *argv[]) errx(1, "excessive runtime (%d seconds), giving up", timeout); } - assert(TAILQ_EMPTY(&q)); + assert(entity_queue == 0); logx("all files parsed: generating output"); rc = 0;