Max Krasnyansky commited on
Commit
9d11a7a
·
1 Parent(s): 07d57ec

threadpool : skip polling for unused threads (llama/9461)

Browse files

* threadpool: skip polling for unused threads

Currently all threads do N polling rounds even if only 1 thread is active (n_threads_cur == 1).
This commit adds a check to skip the polling for unused threads (ith >= n_threads_cur).

n_threads_cur is now an atomic_int to explicitly tell thread sanitizer that it is written
from one thread and read from other threads (not a race conditions).

* threadpool: further simplify and improve ggml_barrier

Avoid using strict memory order while polling, yet make sure that all threads go through
full memory barrier (memory fence) on ggml_barrier entrace and exit.

* threads: add simple barrier test

This test does lots of small, parallel matmul ops where the barriers in between dominate the overhead.

* threadpool: improve thread sync for new-graphs

Using the same tricks as ggml_barrier. All the polling is done with relaxed memory order
to keep it efficient, once the new graph is detected we do full fence using read-modify-write
with strict memory order.

* threadpool: improve abort handling

Do not use threadpool->ec (exit code) to decide whether to exit the compute loop.
threadpool->ec is not atomic which makes thread-sanitizer rightfully unhappy about it.

Instead introduce atomic threadpool->abort flag used for this. This is consistent with
how we handle threadpool->stop or pause.

While at it add an explicit atomic_load for n_threads_cur for consistency.

* test-barrier: release threadpool before releasing the context

fixes use-after-free detected by gcc thread-sanitizer on x86-64
for some reason llvm sanitizer is not detecting this issue.

Files changed (1) hide show
  1. ggml/src/ggml.c +74 -48
ggml/src/ggml.c CHANGED
@@ -1995,10 +1995,11 @@ struct ggml_threadpool {
1995
  // these are atomic as an annotation for thread-sanitizer
1996
  atomic_bool stop; // Used for stopping the threadpool altogether
1997
  atomic_bool pause; // Used for pausing the threadpool or individual threads
 
1998
 
1999
  struct ggml_compute_state * workers; // per thread state
2000
  int n_threads_max; // number of threads in the pool
2001
- int n_threads_cur; // number of threads used in the current graph
2002
 
2003
  int32_t prio; // Scheduling priority
2004
  uint32_t poll; // Polling level (0 - no polling)
@@ -3162,41 +3163,36 @@ inline static void ggml_critical_section_start(void) {
3162
  }
3163
  }
3164
 
3165
- #ifdef GGML_USE_OPENMP
3166
- static void ggml_barrier(struct ggml_threadpool * threadpool) {
3167
- if (threadpool->n_threads_cur == 1) {
3168
  return;
3169
  }
3170
 
 
3171
  #pragma omp barrier
3172
- }
3173
  #else
3174
- static void ggml_barrier(struct ggml_threadpool * threadpool) {
3175
- if (threadpool->n_threads_cur == 1) {
3176
- return;
3177
- }
3178
-
3179
- atomic_int * n_barrier = &threadpool->n_barrier;
3180
- atomic_int * n_barrier_passed = &threadpool->n_barrier_passed;
3181
 
3182
- int n_threads = threadpool->n_threads_cur;
3183
- int passed_old = atomic_load_explicit(n_barrier_passed, memory_order_relaxed);
3184
 
3185
- if (atomic_fetch_add(n_barrier, 1) == n_threads - 1) {
 
3186
  // last thread
3187
- atomic_store(n_barrier, 0);
3188
- atomic_fetch_add_explicit(n_barrier_passed, 1, memory_order_relaxed);
3189
  } else {
3190
  // wait for other threads
3191
- while (true) {
3192
- if (atomic_load_explicit(n_barrier_passed, memory_order_relaxed) != passed_old) {
3193
- return;
3194
- }
3195
  ggml_thread_cpu_relax();
3196
  }
3197
  }
3198
- }
 
 
3199
  #endif
 
3200
 
3201
  // TODO: make this somehow automatically executed
3202
  // some sort of "sentry" mechanism
@@ -20138,34 +20134,33 @@ struct ggml_cplan ggml_graph_plan(
20138
 
20139
  static thread_ret_t ggml_graph_compute_thread(void * data) {
20140
  struct ggml_compute_state * state = (struct ggml_compute_state *) data;
 
20141
 
20142
- const struct ggml_cgraph * cgraph = state->threadpool->cgraph;
20143
- const struct ggml_cplan * cplan = state->threadpool->cplan;
20144
 
20145
  set_numa_thread_affinity(state->ith);
20146
 
20147
  struct ggml_compute_params params = {
20148
  /*.ith =*/ state->ith,
20149
- /*.nth =*/ state->threadpool->n_threads_cur,
20150
  /*.wsize =*/ cplan->work_size,
20151
  /*.wdata =*/ cplan->work_data,
20152
- /*.threadpool=*/ state->threadpool,
20153
  };
20154
 
20155
- for (int node_n = 0; node_n < cgraph->n_nodes; node_n++) {
20156
  struct ggml_tensor * node = cgraph->nodes[node_n];
20157
 
20158
  ggml_compute_forward(&params, node);
20159
 
20160
- if (state->ith == 0 && cplan->abort_callback && cplan->abort_callback(cplan->abort_callback_data)) {
20161
- state->threadpool->ec = GGML_STATUS_ABORTED;
 
 
20162
  }
20163
 
20164
  ggml_barrier(state->threadpool);
20165
-
20166
- if (state->threadpool->ec != GGML_STATUS_SUCCESS) {
20167
- break;
20168
- }
20169
  }
20170
 
20171
  return 0;
@@ -20173,7 +20168,15 @@ static thread_ret_t ggml_graph_compute_thread(void * data) {
20173
 
20174
  #ifndef GGML_USE_OPENMP
20175
 
20176
- static inline bool ggml_graph_compute_ready(struct ggml_compute_state * state) {
 
 
 
 
 
 
 
 
20177
  struct ggml_threadpool * threadpool = state->threadpool;
20178
 
20179
  if (state->pending || threadpool->stop || threadpool->pause) { return true; }
@@ -20181,21 +20184,34 @@ static inline bool ggml_graph_compute_ready(struct ggml_compute_state * state) {
20181
  // check for new graph/work
20182
  int new_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed);
20183
  if (new_graph != state->last_graph) {
20184
- state->pending = (state->ith < threadpool->n_threads_cur);
20185
  state->last_graph = new_graph;
20186
  }
20187
 
20188
  return state->pending;
20189
  }
20190
 
 
 
 
 
 
 
 
 
20191
  static inline bool ggml_graph_compute_poll_for_work(struct ggml_compute_state * state) {
20192
  struct ggml_threadpool * threadpool = state->threadpool;
20193
 
 
 
 
 
 
20194
  // This seems to make 0 ... 100 a decent range for polling level across modern processors.
20195
  // Perhaps, we can adjust it dynamically based on load and things.
20196
  const uint64_t n_rounds = 1024UL * 128 * threadpool->poll;
20197
 
20198
- for (uint64_t i=0; !ggml_graph_compute_ready(state) && i<n_rounds; i++) {
20199
  // No new work. Keep polling.
20200
  ggml_thread_cpu_relax();
20201
  }
@@ -20207,13 +20223,14 @@ static inline bool ggml_graph_compute_check_for_work(struct ggml_compute_state *
20207
  struct ggml_threadpool * threadpool = state->threadpool;
20208
 
20209
  if (ggml_graph_compute_poll_for_work(state)) {
 
20210
  return state->pending;
20211
  }
20212
 
20213
  ggml_mutex_lock_shared(&threadpool->mutex);
20214
- while (!ggml_graph_compute_ready(state)) {
20215
  // No new work. Wait for the signal.
20216
- GGML_PRINT_DEBUG("thread #%d waiting for work\n", state->ith);
20217
  ggml_cond_wait(&threadpool->cond, &threadpool->mutex);
20218
  }
20219
  ggml_mutex_unlock_shared(&threadpool->mutex);
@@ -20260,13 +20277,20 @@ static thread_ret_t ggml_graph_compute_secondary_thread(void* data) {
20260
  }
20261
 
20262
  // Start processing new graph
20263
- static void ggml_graph_compute_kickoff(struct ggml_threadpool * threadpool)
20264
  {
20265
- // always take the mutex here because the worker threads are doing hybrid poll/wait
20266
 
20267
  ggml_mutex_lock(&threadpool->mutex);
20268
 
20269
- atomic_fetch_add_explicit(&threadpool->n_graph, 1, memory_order_relaxed);
 
 
 
 
 
 
 
20270
 
20271
  if (threadpool->pause) {
20272
  // Update main thread prio and affinity to match the threadpool settings
@@ -20325,6 +20349,7 @@ static struct ggml_threadpool * ggml_threadpool_new_impl(
20325
  threadpool->current_chunk = 0;
20326
  threadpool->stop = false;
20327
  threadpool->pause = tpp->paused;
 
20328
  threadpool->workers = NULL;
20329
  threadpool->n_threads_max = tpp->n_threads;
20330
  threadpool->n_threads_cur = tpp->n_threads;
@@ -20400,15 +20425,11 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
20400
  // No worker threads should be accessing the parameters below at this stage
20401
  threadpool->cgraph = cgraph;
20402
  threadpool->cplan = cplan;
20403
- threadpool->n_threads_cur = n_threads;
20404
  threadpool->current_chunk = 0;
 
20405
  threadpool->ec = GGML_STATUS_SUCCESS;
20406
  }
20407
 
20408
- if (n_threads > threadpool->n_threads_max) {
20409
- GGML_PRINT("WARNING: cplan is requesting more threads than the threadpool contains. Expect a bad time!\n");
20410
- }
20411
-
20412
  #ifdef GGML_USE_OPENMP
20413
  if (n_threads > 1) {
20414
  #pragma omp parallel num_threads(n_threads)
@@ -20417,7 +20438,7 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
20417
  {
20418
  // update the number of threads from the actual number of threads that we got from OpenMP
20419
  n_threads = omp_get_num_threads();
20420
- threadpool->n_threads_cur = n_threads;
20421
  }
20422
 
20423
  ggml_graph_compute_thread(&threadpool->workers[omp_get_thread_num()]);
@@ -20426,8 +20447,13 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
20426
  ggml_graph_compute_thread(&threadpool->workers[0]);
20427
  }
20428
  #else
 
 
 
 
 
20429
  // Kick all threads to start the new graph
20430
- ggml_graph_compute_kickoff(threadpool);
20431
 
20432
  // This is a work thread too
20433
  ggml_graph_compute_thread(&threadpool->workers[0]);
 
1995
  // these are atomic as an annotation for thread-sanitizer
1996
  atomic_bool stop; // Used for stopping the threadpool altogether
1997
  atomic_bool pause; // Used for pausing the threadpool or individual threads
1998
+ atomic_bool abort; // Used for aborting processing of a graph
1999
 
2000
  struct ggml_compute_state * workers; // per thread state
2001
  int n_threads_max; // number of threads in the pool
2002
+ atomic_int n_threads_cur; // number of threads used in the current graph
2003
 
2004
  int32_t prio; // Scheduling priority
2005
  uint32_t poll; // Polling level (0 - no polling)
 
3163
  }
3164
  }
3165
 
3166
+ static void ggml_barrier(struct ggml_threadpool * tp) {
3167
+ int n_threads = atomic_load_explicit(&tp->n_threads_cur, memory_order_relaxed);
3168
+ if (n_threads == 1) {
3169
  return;
3170
  }
3171
 
3172
+ #ifdef GGML_USE_OPENMP
3173
  #pragma omp barrier
 
3174
  #else
3175
+ int n_passed = atomic_load_explicit(&tp->n_barrier_passed, memory_order_relaxed);
 
 
 
 
 
 
3176
 
3177
+ // enter barrier (full seq-cst fence)
3178
+ int n_barrier = atomic_fetch_add_explicit(&tp->n_barrier, 1, memory_order_seq_cst);
3179
 
3180
+ int last = 0;
3181
+ if (n_barrier == (n_threads - 1)) {
3182
  // last thread
3183
+ atomic_store_explicit(&tp->n_barrier, 0, memory_order_relaxed);
3184
+ last = 1;
3185
  } else {
3186
  // wait for other threads
3187
+ while (atomic_load_explicit(&tp->n_barrier_passed, memory_order_relaxed) == n_passed) {
 
 
 
3188
  ggml_thread_cpu_relax();
3189
  }
3190
  }
3191
+
3192
+ // exit barrier (full seq-cst fence)
3193
+ atomic_fetch_add_explicit(&tp->n_barrier_passed, last, memory_order_seq_cst);
3194
  #endif
3195
+ }
3196
 
3197
  // TODO: make this somehow automatically executed
3198
  // some sort of "sentry" mechanism
 
20134
 
20135
  static thread_ret_t ggml_graph_compute_thread(void * data) {
20136
  struct ggml_compute_state * state = (struct ggml_compute_state *) data;
20137
+ struct ggml_threadpool * tp = state->threadpool;
20138
 
20139
+ const struct ggml_cgraph * cgraph = tp->cgraph;
20140
+ const struct ggml_cplan * cplan = tp->cplan;
20141
 
20142
  set_numa_thread_affinity(state->ith);
20143
 
20144
  struct ggml_compute_params params = {
20145
  /*.ith =*/ state->ith,
20146
+ /*.nth =*/ atomic_load_explicit(&tp->n_threads_cur, memory_order_relaxed),
20147
  /*.wsize =*/ cplan->work_size,
20148
  /*.wdata =*/ cplan->work_data,
20149
+ /*.threadpool=*/ tp,
20150
  };
20151
 
20152
+ for (int node_n = 0; node_n < cgraph->n_nodes && !tp->abort; node_n++) {
20153
  struct ggml_tensor * node = cgraph->nodes[node_n];
20154
 
20155
  ggml_compute_forward(&params, node);
20156
 
20157
+ if (state->ith == 0 && cplan->abort_callback &&
20158
+ cplan->abort_callback(cplan->abort_callback_data)) {
20159
+ tp->abort = true;
20160
+ tp->ec = GGML_STATUS_ABORTED;
20161
  }
20162
 
20163
  ggml_barrier(state->threadpool);
 
 
 
 
20164
  }
20165
 
20166
  return 0;
 
20168
 
20169
  #ifndef GGML_USE_OPENMP
20170
 
20171
+ // check if thread is active
20172
+ static inline bool ggml_graph_compute_thread_active(struct ggml_compute_state * state) {
20173
+ struct ggml_threadpool * threadpool = state->threadpool;
20174
+ int n_threads = atomic_load_explicit(&threadpool->n_threads_cur, memory_order_relaxed);
20175
+ return (state->ith < n_threads);
20176
+ }
20177
+
20178
+ // check if thread is ready to proceed (exit from polling or sleeping)
20179
+ static inline bool ggml_graph_compute_thread_ready(struct ggml_compute_state * state) {
20180
  struct ggml_threadpool * threadpool = state->threadpool;
20181
 
20182
  if (state->pending || threadpool->stop || threadpool->pause) { return true; }
 
20184
  // check for new graph/work
20185
  int new_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed);
20186
  if (new_graph != state->last_graph) {
20187
+ state->pending = ggml_graph_compute_thread_active(state);
20188
  state->last_graph = new_graph;
20189
  }
20190
 
20191
  return state->pending;
20192
  }
20193
 
20194
+ // sync thread state after polling
20195
+ static inline void ggml_graph_compute_thread_sync(struct ggml_compute_state * state) {
20196
+ struct ggml_threadpool * threadpool = state->threadpool;
20197
+ // this should just be atomic_thread_fence(seq_cst) but it confuses thread-sanitizer
20198
+ // so instead we just use a dummy read-modify-write
20199
+ atomic_fetch_add_explicit(&threadpool->n_graph, 0, memory_order_seq_cst);
20200
+ }
20201
+
20202
  static inline bool ggml_graph_compute_poll_for_work(struct ggml_compute_state * state) {
20203
  struct ggml_threadpool * threadpool = state->threadpool;
20204
 
20205
+ // Skip polling for unused threads
20206
+ if (!ggml_graph_compute_thread_active(state)) {
20207
+ return state->pending;
20208
+ }
20209
+
20210
  // This seems to make 0 ... 100 a decent range for polling level across modern processors.
20211
  // Perhaps, we can adjust it dynamically based on load and things.
20212
  const uint64_t n_rounds = 1024UL * 128 * threadpool->poll;
20213
 
20214
+ for (uint64_t i=0; !ggml_graph_compute_thread_ready(state) && i < n_rounds; i++) {
20215
  // No new work. Keep polling.
20216
  ggml_thread_cpu_relax();
20217
  }
 
20223
  struct ggml_threadpool * threadpool = state->threadpool;
20224
 
20225
  if (ggml_graph_compute_poll_for_work(state)) {
20226
+ ggml_graph_compute_thread_sync(state);
20227
  return state->pending;
20228
  }
20229
 
20230
  ggml_mutex_lock_shared(&threadpool->mutex);
20231
+ while (!ggml_graph_compute_thread_ready(state)) {
20232
  // No new work. Wait for the signal.
20233
+ GGML_PRINT_DEBUG("thread #%d waiting for work (sleeping)\n", state->ith);
20234
  ggml_cond_wait(&threadpool->cond, &threadpool->mutex);
20235
  }
20236
  ggml_mutex_unlock_shared(&threadpool->mutex);
 
20277
  }
20278
 
20279
  // Start processing new graph
20280
+ static void ggml_graph_compute_kickoff(struct ggml_threadpool * threadpool, int n_threads)
20281
  {
20282
+ // Always take the mutex here because the worker threads are doing hybrid poll/wait
20283
 
20284
  ggml_mutex_lock(&threadpool->mutex);
20285
 
20286
+ GGML_PRINT_DEBUG("threadpool: n_threads_cur %d n_threads %d\n", threadpool->n_threads_cur, n_threads);
20287
+
20288
+ // Update the number of active threads
20289
+ atomic_store_explicit(&threadpool->n_threads_cur, n_threads, memory_order_relaxed);
20290
+
20291
+ // Indicate the graph is ready to be processed
20292
+ // We need the full seq-cst fence here because of the polling threads (used in thread_sync)
20293
+ atomic_fetch_add_explicit(&threadpool->n_graph, 1, memory_order_seq_cst);
20294
 
20295
  if (threadpool->pause) {
20296
  // Update main thread prio and affinity to match the threadpool settings
 
20349
  threadpool->current_chunk = 0;
20350
  threadpool->stop = false;
20351
  threadpool->pause = tpp->paused;
20352
+ threadpool->abort = false;
20353
  threadpool->workers = NULL;
20354
  threadpool->n_threads_max = tpp->n_threads;
20355
  threadpool->n_threads_cur = tpp->n_threads;
 
20425
  // No worker threads should be accessing the parameters below at this stage
20426
  threadpool->cgraph = cgraph;
20427
  threadpool->cplan = cplan;
 
20428
  threadpool->current_chunk = 0;
20429
+ threadpool->abort = false;
20430
  threadpool->ec = GGML_STATUS_SUCCESS;
20431
  }
20432
 
 
 
 
 
20433
  #ifdef GGML_USE_OPENMP
20434
  if (n_threads > 1) {
20435
  #pragma omp parallel num_threads(n_threads)
 
20438
  {
20439
  // update the number of threads from the actual number of threads that we got from OpenMP
20440
  n_threads = omp_get_num_threads();
20441
+ atomic_store_explicit(&threadpool->n_threads_cur, n_threads, memory_order_relaxed);
20442
  }
20443
 
20444
  ggml_graph_compute_thread(&threadpool->workers[omp_get_thread_num()]);
 
20447
  ggml_graph_compute_thread(&threadpool->workers[0]);
20448
  }
20449
  #else
20450
+ if (n_threads > threadpool->n_threads_max) {
20451
+ GGML_PRINT("WARNING: cplan requested more threads (%d) than available (%d)\n", n_threads, threadpool->n_threads_max);
20452
+ n_threads = threadpool->n_threads_max;
20453
+ }
20454
+
20455
  // Kick all threads to start the new graph
20456
+ ggml_graph_compute_kickoff(threadpool, n_threads);
20457
 
20458
  // This is a work thread too
20459
  ggml_graph_compute_thread(&threadpool->workers[0]);