diff --git a/src/libostree/ostree-fetcher.c b/src/libostree/ostree-fetcher.c index d956d959..313df6a8 100644 --- a/src/libostree/ostree-fetcher.c +++ b/src/libostree/ostree-fetcher.c @@ -67,7 +67,12 @@ typedef struct { guint64 total_downloaded; } ThreadClosure; +static void +session_thread_process_pending_queue (ThreadClosure *thread_closure); + typedef struct { + volatile int ref_count; + ThreadClosure *thread_closure; SoupURI *uri; @@ -186,10 +191,22 @@ pending_task_compare (gconstpointer a, (priority_a < priority_b) ? -1 : 1; } -static void -pending_uri_free (OstreeFetcherPendingURI *pending) +static OstreeFetcherPendingURI * +pending_uri_ref (OstreeFetcherPendingURI *pending) { - g_hash_table_remove (pending->thread_closure->outstanding, pending); + g_return_val_if_fail (pending != NULL, NULL); + g_return_val_if_fail (pending->ref_count > 0, NULL); + + g_atomic_int_inc (&pending->ref_count); + + return pending; +} + +static void +pending_uri_unref (OstreeFetcherPendingURI *pending) +{ + if (!g_atomic_int_dec_and_test (&pending->ref_count)) + return; g_clear_pointer (&pending->thread_closure, thread_closure_unref); @@ -331,8 +348,7 @@ session_thread_process_pending_queue (ThreadClosure *thread_closure) pending = g_task_get_task_data (task); cancellable = g_task_get_cancellable (task); - /* pending_uri_free() removes this. */ - g_hash_table_add (thread_closure->outstanding, pending); + g_hash_table_add (thread_closure->outstanding, pending_uri_ref (pending)); soup_request_send_async (pending->request, cancellable, @@ -540,7 +556,7 @@ _ostree_fetcher_constructed (GObject *object) self->thread_closure->tmpdir_dfd = -1; self->thread_closure->tmpdir_lock = empty_lockfile; - self->thread_closure->outstanding = g_hash_table_new (NULL, NULL); + self->thread_closure->outstanding = g_hash_table_new_full (NULL, NULL, NULL, (GDestroyNotify)pending_uri_unref); self->thread_closure->output_stream_set = g_hash_table_new_full (NULL, NULL, (GDestroyNotify) NULL, (GDestroyNotify) g_object_unref); @@ -742,6 +758,18 @@ on_stream_read (GObject *object, GAsyncResult *result, gpointer user_data); +static void +remove_pending_rerun_queue (OstreeFetcherPendingURI *pending) +{ + /* Hold a temporary ref to ensure the reference to + * pending->thread_closure is valid. + */ + pending_uri_ref (pending); + g_hash_table_remove (pending->thread_closure->outstanding, pending); + session_thread_process_pending_queue (pending->thread_closure); + pending_uri_unref (pending); +} + static void on_out_splice_complete (GObject *object, GAsyncResult *result, @@ -770,7 +798,10 @@ on_out_splice_complete (GObject *object, out: if (local_error) - g_task_return_error (task, local_error); + { + g_task_return_error (task, local_error); + remove_pending_rerun_queue (pending); + } g_object_unref (task); } @@ -802,6 +833,7 @@ on_stream_read (GObject *object, g_task_return_pointer (task, g_strdup (pending->out_tmpfile), (GDestroyNotify) g_free); + remove_pending_rerun_queue (pending); } else { @@ -837,7 +869,10 @@ on_stream_read (GObject *object, out: if (local_error) - g_task_return_error (task, local_error); + { + g_task_return_error (task, local_error); + remove_pending_rerun_queue (pending); + } g_object_unref (task); } @@ -883,6 +918,7 @@ on_request_sent (GObject *object, g_strdup (pending->out_tmpfile), (GDestroyNotify) g_free); } + remove_pending_rerun_queue (pending); goto out; } else if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code)) @@ -947,6 +983,7 @@ on_request_sent (GObject *object, g_task_return_pointer (task, g_object_ref (pending->request_body), (GDestroyNotify) g_object_unref); + remove_pending_rerun_queue (pending); } out: @@ -955,6 +992,7 @@ on_request_sent (GObject *object, if (pending->request_body) (void) g_input_stream_close (pending->request_body, NULL, NULL); g_task_return_error (task, local_error); + remove_pending_rerun_queue (pending); } g_object_unref (task); @@ -979,6 +1017,7 @@ ostree_fetcher_request_uri_internal (OstreeFetcher *self, /* SoupRequest is created in session thread. */ pending = g_new0 (OstreeFetcherPendingURI, 1); + pending->ref_count = 1; pending->thread_closure = thread_closure_ref (self->thread_closure); pending->uri = soup_uri_copy (uri); pending->max_size = max_size; @@ -986,7 +1025,7 @@ ostree_fetcher_request_uri_internal (OstreeFetcher *self, task = g_task_new (self, cancellable, callback, user_data); g_task_set_source_tag (task, source_tag); - g_task_set_task_data (task, pending, (GDestroyNotify) pending_uri_free); + g_task_set_task_data (task, pending, (GDestroyNotify) pending_uri_unref); /* We'll use the GTask priority for our own priority queue. */ g_task_set_priority (task, priority);