pull: Fix a race condition causing an early exit

This is a little less magical than what we had before.
This commit is contained in:
Colin Walters 2012-12-23 17:18:08 -05:00
parent 3c67c209db
commit 9bd4d35c2b
3 changed files with 41 additions and 97 deletions

View File

@ -31,21 +31,16 @@ struct OtWorkerQueue {
GCond cond; GCond cond;
GQueue queue; GQueue queue;
volatile gint holds;
char *thread_name; char *thread_name;
gboolean complete; gboolean complete;
gboolean is_idle;
gboolean destroyed; gboolean destroyed;
GThread *worker; GThread *worker;
OtWorkerQueueFunc work_func; OtWorkerQueueFunc work_func;
OtWorkerQueueFunc work_data; OtWorkerQueueFunc work_data;
GMainContext *idle_context;
OtWorkerQueueIdleFunc idle_callback;
gpointer idle_data;
}; };
static gpointer static gpointer
@ -61,6 +56,8 @@ ot_worker_queue_new (const char *thread_name,
g_cond_init (&queue->cond); g_cond_init (&queue->cond);
g_queue_init (&queue->queue); g_queue_init (&queue->queue);
queue->is_idle = TRUE;
queue->thread_name = g_strdup (thread_name); queue->thread_name = g_strdup (thread_name);
queue->work_func = func; queue->work_func = func;
queue->work_data = data; queue->work_data = data;
@ -72,40 +69,6 @@ void
ot_worker_queue_start (OtWorkerQueue *queue) ot_worker_queue_start (OtWorkerQueue *queue)
{ {
queue->worker = g_thread_new (queue->thread_name, ot_worker_queue_thread_main, queue); queue->worker = g_thread_new (queue->thread_name, ot_worker_queue_thread_main, queue);
ot_worker_queue_push (queue, queue); /* Self marks end of (initial) queue */
}
void
ot_worker_queue_hold (OtWorkerQueue *queue)
{
g_atomic_int_inc (&queue->holds);
}
static gboolean
invoke_idle_callback (gpointer user_data)
{
OtWorkerQueue *queue = user_data;
queue->idle_callback (queue->idle_data);
return FALSE;
}
void
ot_worker_queue_release (OtWorkerQueue *queue)
{
if (!g_atomic_int_dec_and_test (&queue->holds))
return;
g_mutex_lock (&queue->mutex);
if (!g_queue_peek_tail_link (&queue->queue))
{
if (queue->idle_callback)
g_main_context_invoke (queue->idle_context,
invoke_idle_callback,
queue);
}
g_mutex_unlock (&queue->mutex);
} }
void void
@ -114,6 +77,7 @@ ot_worker_queue_push (OtWorkerQueue *queue,
{ {
g_mutex_lock (&queue->mutex); g_mutex_lock (&queue->mutex);
g_queue_push_head (&queue->queue, data); g_queue_push_head (&queue->queue, data);
queue->is_idle = FALSE;
g_cond_signal (&queue->cond); g_cond_signal (&queue->cond);
g_mutex_unlock (&queue->mutex); g_mutex_unlock (&queue->mutex);
} }
@ -131,11 +95,7 @@ ot_worker_queue_thread_main (gpointer user_data)
while (!g_queue_peek_tail_link (&queue->queue)) while (!g_queue_peek_tail_link (&queue->queue))
{ {
if (queue->idle_callback && queue->complete && queue->is_idle = TRUE;
g_atomic_int_get (&queue->holds) == 0)
g_main_context_invoke (queue->idle_context,
invoke_idle_callback,
queue);
g_cond_wait (&queue->cond, &queue->mutex); g_cond_wait (&queue->cond, &queue->mutex);
} }
@ -146,27 +106,20 @@ ot_worker_queue_thread_main (gpointer user_data)
if (!item) if (!item)
break; break;
if (item == queue) queue->work_func (item, queue->work_data);
queue->complete = TRUE;
else
queue->work_func (item, queue->work_data);
} }
return NULL; return NULL;
} }
void gboolean
ot_worker_queue_set_idle_callback (OtWorkerQueue *queue, ot_worker_queue_is_idle (OtWorkerQueue *queue)
GMainContext *context,
OtWorkerQueueIdleFunc idle_callback,
gpointer data)
{ {
g_assert (!queue->worker); gboolean ret;
if (!context) g_mutex_lock (&queue->mutex);
context = g_main_context_default (); ret = queue->is_idle;
queue->idle_context = g_main_context_ref (context); g_mutex_unlock (&queue->mutex);
queue->idle_callback = idle_callback; return ret;
queue->idle_data = data;
} }
void void
@ -180,7 +133,6 @@ ot_worker_queue_unref (OtWorkerQueue *queue)
g_free (queue->thread_name); g_free (queue->thread_name);
g_main_context_unref (queue->idle_context);
g_mutex_clear (&queue->mutex); g_mutex_clear (&queue->mutex);
g_cond_clear (&queue->cond); g_cond_clear (&queue->cond);
g_queue_clear (&queue->queue); g_queue_clear (&queue->queue);

View File

@ -31,7 +31,6 @@ typedef struct OtWorkerQueue OtWorkerQueue;
typedef void (*OtWorkerQueueFunc) (gpointer data, typedef void (*OtWorkerQueueFunc) (gpointer data,
gpointer user_data); gpointer user_data);
typedef void (*OtWorkerQueueIdleFunc) (gpointer user_data);
OtWorkerQueue *ot_worker_queue_new (const char *thread_name, OtWorkerQueue *ot_worker_queue_new (const char *thread_name,
OtWorkerQueueFunc func, OtWorkerQueueFunc func,
@ -39,13 +38,7 @@ OtWorkerQueue *ot_worker_queue_new (const char *thread_name,
void ot_worker_queue_start (OtWorkerQueue *queue); void ot_worker_queue_start (OtWorkerQueue *queue);
void ot_worker_queue_hold (OtWorkerQueue *queue); gboolean ot_worker_queue_is_idle (OtWorkerQueue *queue);
void ot_worker_queue_release (OtWorkerQueue *queue);
void ot_worker_queue_set_idle_callback (OtWorkerQueue *queue,
GMainContext *context,
OtWorkerQueueIdleFunc idle_callback,
gpointer data);
void ot_worker_queue_push (OtWorkerQueue *queue, void ot_worker_queue_push (OtWorkerQueue *queue,
gpointer data); gpointer data);

View File

@ -101,10 +101,10 @@ typedef struct {
guint outstanding_uri_requests; guint outstanding_uri_requests;
GQueue queued_filemeta; GQueue queued_filemeta;
GThread *metadata_scan_thread;
OtWorkerQueue *metadata_objects_to_scan; OtWorkerQueue *metadata_objects_to_scan;
GHashTable *scanned_metadata; /* Maps object name to itself */ GHashTable *scanned_metadata; /* Maps object name to itself */
GHashTable *requested_content; /* Maps object name to itself */ GHashTable *requested_content; /* Maps object name to itself */
guint n_outstanding_metadata_fetches;
guint n_fetched_content; guint n_fetched_content;
guint outstanding_filemeta_requests; guint outstanding_filemeta_requests;
@ -268,15 +268,23 @@ static void
check_outstanding_requests_handle_error (OtPullData *pull_data, check_outstanding_requests_handle_error (OtPullData *pull_data,
GError *error) GError *error)
{ {
if (!pull_data->metadata_scan_active && if ((!pull_data->metadata_objects_to_scan || ot_worker_queue_is_idle (pull_data->metadata_objects_to_scan)) &&
pull_data->outstanding_uri_requests == 0 && pull_data->outstanding_uri_requests == 0 &&
pull_data->outstanding_filemeta_requests == 0 && pull_data->outstanding_filemeta_requests == 0 &&
pull_data->outstanding_filecontent_requests == 0 && pull_data->outstanding_filecontent_requests == 0 &&
pull_data->n_outstanding_metadata_fetches == 0 &&
pull_data->outstanding_content_stage_requests == 0) pull_data->outstanding_content_stage_requests == 0)
g_main_loop_quit (pull_data->loop); g_main_loop_quit (pull_data->loop);
throw_async_error (pull_data, error); throw_async_error (pull_data, error);
} }
static gboolean
idle_check_outstanding_requests (gpointer user_data)
{
check_outstanding_requests_handle_error (user_data, NULL);
return FALSE;
}
static gboolean static gboolean
run_mainloop_monitor_fetcher (OtPullData *pull_data) run_mainloop_monitor_fetcher (OtPullData *pull_data)
{ {
@ -456,7 +464,6 @@ scan_dirtree_object (OtPullData *pull_data,
g_hash_table_insert (pull_data->requested_content, duped_checksum, duped_checksum); g_hash_table_insert (pull_data->requested_content, duped_checksum, duped_checksum);
g_atomic_int_inc (&pull_data->n_requested_content); g_atomic_int_inc (&pull_data->n_requested_content);
ot_worker_queue_hold (pull_data->metadata_objects_to_scan);
g_main_context_invoke (NULL, idle_queue_content_request, idle_fetch_data); g_main_context_invoke (NULL, idle_queue_content_request, idle_fetch_data);
} }
} }
@ -736,8 +743,6 @@ idle_queue_content_request (gpointer user_data)
process_one_file_request (data); process_one_file_request (data);
} }
ot_worker_queue_release (pull_data->metadata_objects_to_scan);
return FALSE; return FALSE;
} }
@ -756,10 +761,10 @@ on_metadata_staged (GObject *object,
OtPullData *pull_data = fetch_data->pull_data; OtPullData *pull_data = fetch_data->pull_data;
pull_data->n_fetched_metadata++; pull_data->n_fetched_metadata++;
pull_data->n_outstanding_metadata_fetches--;
ot_worker_queue_push (pull_data->metadata_objects_to_scan, ot_worker_queue_push (pull_data->metadata_objects_to_scan,
g_variant_ref (fetch_data->object)); g_variant_ref (fetch_data->object));
ot_worker_queue_release (pull_data->metadata_objects_to_scan);
(void) gs_file_unlink (fetch_data->temp_path, NULL, NULL); (void) gs_file_unlink (fetch_data->temp_path, NULL, NULL);
g_object_unref (fetch_data->temp_path); g_object_unref (fetch_data->temp_path);
@ -821,6 +826,7 @@ idle_fetch_metadata_object (gpointer data)
objpath = ostree_get_relative_object_path (checksum, objtype, compressed); objpath = ostree_get_relative_object_path (checksum, objtype, compressed);
obj_uri = suburi_new (pull_data->base_uri, objpath, NULL); obj_uri = suburi_new (pull_data->base_uri, objpath, NULL);
pull_data->n_outstanding_metadata_fetches++;
ostree_fetcher_request_uri_async (pull_data->fetcher, obj_uri, pull_data->cancellable, ostree_fetcher_request_uri_async (pull_data->fetcher, obj_uri, pull_data->cancellable,
meta_fetch_on_complete, fetch_data); meta_fetch_on_complete, fetch_data);
soup_uri_free (obj_uri); soup_uri_free (obj_uri);
@ -840,7 +846,6 @@ queue_metadata_object_fetch (OtPullData *pull_data,
IdleFetchMetadataObjectData *fetch_data = g_new (IdleFetchMetadataObjectData, 1); IdleFetchMetadataObjectData *fetch_data = g_new (IdleFetchMetadataObjectData, 1);
fetch_data->pull_data = pull_data; fetch_data->pull_data = pull_data;
fetch_data->object = g_variant_ref (object); fetch_data->object = g_variant_ref (object);
ot_worker_queue_hold (fetch_data->pull_data->metadata_objects_to_scan);
g_idle_add (idle_fetch_metadata_object, fetch_data); g_idle_add (idle_fetch_metadata_object, fetch_data);
} }
@ -958,8 +963,9 @@ scan_one_metadata_object (OtPullData *pull_data,
} }
g_hash_table_insert (pull_data->scanned_metadata, g_variant_ref (object), object); g_hash_table_insert (pull_data->scanned_metadata, g_variant_ref (object), object);
g_atomic_int_inc (&pull_data->n_scanned_metadata); g_atomic_int_inc (&pull_data->n_scanned_metadata);
}
g_idle_add (idle_check_outstanding_requests, pull_data);
}
ret = TRUE; ret = TRUE;
out: out:
@ -1032,15 +1038,6 @@ scan_one_metadata_object_dispatch (gpointer item,
} }
} }
static void
on_metadata_worker_idle (gpointer user_data)
{
OtPullData *pull_data = user_data;
pull_data->metadata_scan_active = FALSE;
check_outstanding_requests_handle_error (pull_data, NULL);
}
static gboolean static gboolean
idle_start_worker (gpointer user_data) idle_start_worker (gpointer user_data)
@ -1362,8 +1359,6 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error)
pull_data->metadata_objects_to_scan = ot_worker_queue_new ("metadatascan", pull_data->metadata_objects_to_scan = ot_worker_queue_new ("metadatascan",
scan_one_metadata_object_dispatch, scan_one_metadata_object_dispatch,
pull_data); pull_data);
ot_worker_queue_set_idle_callback (pull_data->metadata_objects_to_scan,
NULL, on_metadata_worker_idle, pull_data);
g_hash_table_iter_init (&hash_iter, commits_to_fetch); g_hash_table_iter_init (&hash_iter, commits_to_fetch);
while (g_hash_table_iter_next (&hash_iter, &key, &value)) while (g_hash_table_iter_next (&hash_iter, &key, &value))
@ -1401,13 +1396,17 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error)
} }
} }
g_idle_add (idle_start_worker, pull_data);
/* Start metadata thread, which kicks off further metadata requests /* Start metadata thread, which kicks off further metadata requests
* as well as content fetches. * as well as content fetches.
*/ */
if (!run_mainloop_monitor_fetcher (pull_data)) if (!ot_worker_queue_is_idle (pull_data->metadata_objects_to_scan))
goto out; {
g_idle_add (idle_start_worker, pull_data);
/* Now await work completion */
if (!run_mainloop_monitor_fetcher (pull_data))
goto out;
}
if (!ostree_repo_commit_transaction (pull_data->repo, cancellable, error)) if (!ostree_repo_commit_transaction (pull_data->repo, cancellable, error))
goto out; goto out;