On Mon, Oct 1, 2018 at 3:46 PM Ben Peart <[email protected]> wrote:
> +/*
> + * A helper function that will load the specified range of cache entries
> + * from the memory mapped file and add them to the given index.
> + */
> +static unsigned long load_cache_entry_block(struct index_state *istate,
> + struct mem_pool *ce_mem_pool, int offset, int nr,
> const char *mmap,
Please use unsigned long for offset (here and in the thread_data
struct). We should use off_t instead, but that's out of scope. At
least keep offset type consistent in here.
> + unsigned long start_offset, const struct cache_entry
> *previous_ce)
I don't think you want to pass previous_ce in. You always pass NULL
anyway. And if this function is about loading a block (i.e. at block
boundary) then initial previous_ce _must_ be NULL or things break
horribly.
> @@ -1959,20 +2007,125 @@ static void *load_index_extensions(void *_data)
>
> #define THREAD_COST (10000)
>
> +struct load_cache_entries_thread_data
> +{
> + pthread_t pthread;
> + struct index_state *istate;
> + struct mem_pool *ce_mem_pool;
> + int offset;
> + const char *mmap;
> + struct index_entry_offset_table *ieot;
> + int ieot_offset; /* starting index into the ieot array */
If it's an index, maybe just name it ieot_index and we can get rid of
the comment.
> + int ieot_work; /* count of ieot entries to process */
Maybe instead of saving the whole "ieot" table here. Add
struct index_entry_offset *blocks;
which points to the starting block for this thread and rename that
mysterious (to me) ieot_work to nr_blocks. The thread will have access
from blocks[0] to blocks[nr_blocks - 1]
> + unsigned long consumed; /* return # of bytes in index file processed
> */
> +};
> +
> +/*
> + * A thread proc to run the load_cache_entries() computation
> + * across multiple background threads.
> + */
> +static void *load_cache_entries_thread(void *_data)
> +{
> + struct load_cache_entries_thread_data *p = _data;
> + int i;
> +
> + /* iterate across all ieot blocks assigned to this thread */
> + for (i = p->ieot_offset; i < p->ieot_offset + p->ieot_work; i++) {
> + p->consumed += load_cache_entry_block(p->istate,
> p->ce_mem_pool, p->offset, p->ieot->entries[i].nr, p->mmap,
> p->ieot->entries[i].offset, NULL);
Please wrap this long line.
> + p->offset += p->ieot->entries[i].nr;
> + }
> + return NULL;
> +}
> +
> +static unsigned long load_cache_entries_threaded(struct index_state *istate,
> const char *mmap, size_t mmap_size,
> + unsigned long src_offset, int nr_threads, struct
> index_entry_offset_table *ieot)
> +{
> + int i, offset, ieot_work, ieot_offset, err;
> + struct load_cache_entries_thread_data *data;
> + unsigned long consumed = 0;
> + int nr;
> +
> + /* a little sanity checking */
> + if (istate->name_hash_initialized)
> + BUG("the name hash isn't thread safe");
> +
> + mem_pool_init(&istate->ce_mem_pool, 0);
> + data = xcalloc(nr_threads, sizeof(struct
> load_cache_entries_thread_data));
we normally use sizeof(*data) instead of sizeof(struct ...)
> +
> + /* ensure we have no more threads than we have blocks to process */
> + if (nr_threads > ieot->nr)
> + nr_threads = ieot->nr;
> + data = xcalloc(nr_threads, sizeof(struct
> load_cache_entries_thread_data));
eh.. reallocate the same "data"?
> +
> + offset = ieot_offset = 0;
> + ieot_work = DIV_ROUND_UP(ieot->nr, nr_threads);
> + for (i = 0; i < nr_threads; i++) {
> + struct load_cache_entries_thread_data *p = &data[i];
> + int j;
> +
> + if (ieot_offset + ieot_work > ieot->nr)
> + ieot_work = ieot->nr - ieot_offset;
> +
> + p->istate = istate;
> + p->offset = offset;
> + p->mmap = mmap;
> + p->ieot = ieot;
> + p->ieot_offset = ieot_offset;
> + p->ieot_work = ieot_work;
> +
> + /* create a mem_pool for each thread */
> + nr = 0;
Since nr is only used in this for loop. Declare it in this scope
instead of declaring it for the whole function.
> + for (j = p->ieot_offset; j < p->ieot_offset + p->ieot_work;
> j++)
> + nr += p->ieot->entries[j].nr;
> + if (istate->version == 4) {
> + mem_pool_init(&p->ce_mem_pool,
> + estimate_cache_size_from_compressed(nr));
> + }
> + else {
> + mem_pool_init(&p->ce_mem_pool,
> + estimate_cache_size(mmap_size, nr));
> + }
Maybe keep this mem_pool_init code inside load_cache_entries_thread(),
similar to how you do it for load_cache_entries_thread(). It's mostly
to keep this loop shorter to see (and understand), of course
parallelizing this mem_pool_init() is just noise.
> +
> + err = pthread_create(&p->pthread, NULL,
> load_cache_entries_thread, p);
> + if (err)
> + die(_("unable to create load_cache_entries thread:
> %s"), strerror(err));
> +
> + /* increment by the number of cache entries in the ieot block
> being processed */
> + for (j = 0; j < ieot_work; j++)
> + offset += ieot->entries[ieot_offset + j].nr;
I wonder if it makes things simpler if you store cache_entry _index_
in entrie[] array instead of storing the number of entries. You can
easily calculate nr then by doing entries[i].index -
entries[i-1].index. And you can count multiple blocks the same way,
without looping like this.
> + ieot_offset += ieot_work;
> + }
> +
> + for (i = 0; i < nr_threads; i++) {
> + struct load_cache_entries_thread_data *p = &data[i];
> +
> + err = pthread_join(p->pthread, NULL);
> + if (err)
> + die(_("unable to join load_cache_entries thread:
> %s"), strerror(err));
> + mem_pool_combine(istate->ce_mem_pool, p->ce_mem_pool);
> + consumed += p->consumed;
> + }
> +
> + free(data);
> +
> + return consumed;
> +}
> +#endif
> +
> /* remember to discard_cache() before reading a different cache! */
> int do_read_index(struct index_state *istate, const char *path, int
> must_exist)
> {
> - int fd, i;
> + int fd;
> struct stat st;
> unsigned long src_offset;
> const struct cache_header *hdr;
> const char *mmap;
> size_t mmap_size;
> - const struct cache_entry *previous_ce = NULL;
> struct load_index_extensions p;
> size_t extension_offset = 0;
> #ifndef NO_PTHREADS
> - int nr_threads;
> + int nr_threads, cpus;
> + struct index_entry_offset_table *ieot = NULL;
> #endif
>
> if (istate->initialized)
> @@ -2014,10 +2167,18 @@ int do_read_index(struct index_state *istate, const
> char *path, int must_exist)
> p.mmap = mmap;
> p.mmap_size = mmap_size;
>
> + src_offset = sizeof(*hdr);
OK we've been doing this since forever, sizeof(struct cache_header)
probably does not have extra padding on any supported platform.
> @@ -2032,29 +2193,22 @@ int do_read_index(struct index_state *istate, const
> char *path, int must_exist)
> nr_threads--;
> }
> }
> -#endif
> -
> - if (istate->version == 4) {
> - mem_pool_init(&istate->ce_mem_pool,
> -
> estimate_cache_size_from_compressed(istate->cache_nr));
> - } else {
> - mem_pool_init(&istate->ce_mem_pool,
> - estimate_cache_size(mmap_size,
> istate->cache_nr));
> - }
>
> - src_offset = sizeof(*hdr);
> - for (i = 0; i < istate->cache_nr; i++) {
> - struct ondisk_cache_entry *disk_ce;
> - struct cache_entry *ce;
> - unsigned long consumed;
> + /*
> + * Locate and read the index entry offset table so that we can use it
> + * to multi-thread the reading of the cache entries.
> + */
> + if (extension_offset && nr_threads > 1)
> + ieot = read_ieot_extension(mmap, mmap_size, extension_offset);
You need to free ieot at some point.
>
> - disk_ce = (struct ondisk_cache_entry *)(mmap + src_offset);
> - ce = create_from_disk(istate, disk_ce, &consumed,
> previous_ce);
> - set_index_entry(istate, i, ce);
> + if (ieot)
> + src_offset += load_cache_entries_threaded(istate, mmap,
> mmap_size, src_offset, nr_threads, ieot);
> + else
> + src_offset += load_all_cache_entries(istate, mmap, mmap_size,
> src_offset);
> +#else
> + src_offset += load_all_cache_entries(istate, mmap, mmap_size,
> src_offset);
> +#endif
>
> - src_offset += consumed;
> - previous_ce = ce;
> - }
> istate->timestamp.sec = st.st_mtime;
> istate->timestamp.nsec = ST_MTIME_NSEC(st);
>
> --
> 2.18.0.windows.1
>
--
Duy