diff --git a/src/libostree/ostree-fetcher.c b/src/libostree/ostree-fetcher.c index e88bd1d0..fbfc0aec 100644 --- a/src/libostree/ostree-fetcher.c +++ b/src/libostree/ostree-fetcher.c @@ -39,10 +39,8 @@ typedef enum { } OstreeFetcherState; typedef struct { - guint refcount; OstreeFetcher *self; SoupURI *uri; - int priority; OstreeFetcherState state; @@ -57,37 +55,30 @@ typedef struct { guint64 current_size; guint64 content_length; - GCancellable *cancellable; GTask *task; } OstreeFetcherPendingURI; static int -pending_uri_compare (gconstpointer a, - gconstpointer b, - gpointer unused) +pending_task_compare (gconstpointer a, + gconstpointer b, + gpointer unused) { - const OstreeFetcherPendingURI *pending_a = a; - const OstreeFetcherPendingURI *pending_b = b; + gint priority_a = g_task_get_priority (G_TASK (a)); + gint priority_b = g_task_get_priority (G_TASK (b)); - return (pending_a->priority == pending_b->priority) ? 0 : - (pending_a->priority < pending_b->priority) ? -1 : 1; + return (priority_a == priority_b) ? 0 : + (priority_a < priority_b) ? -1 : 1; } static void pending_uri_free (OstreeFetcherPendingURI *pending) { - g_assert (pending->refcount > 0); - pending->refcount--; - if (pending->refcount > 0) - return; - soup_uri_free (pending->uri); g_clear_object (&pending->self); g_clear_object (&pending->request); g_clear_object (&pending->request_body); g_free (pending->out_tmpfile); g_clear_object (&pending->out_stream); - g_clear_object (&pending->cancellable); g_free (pending); } @@ -130,7 +121,8 @@ _ostree_fetcher_finalize (GObject *object) g_hash_table_destroy (self->sending_messages); g_hash_table_destroy (self->output_stream_set); - g_queue_clear (&self->pending_queue); + while (!g_queue_is_empty (&self->pending_queue)) + g_object_unref (g_queue_pop_head (&self->pending_queue)); G_OBJECT_CLASS (_ostree_fetcher_parent_class)->finalize (object); } @@ -282,11 +274,22 @@ ostree_fetcher_process_pending_queue (OstreeFetcher *self) while (g_queue_peek_head (&self->pending_queue) != NULL && self->outstanding < self->max_outstanding) { - OstreeFetcherPendingURI *next = g_queue_pop_head (&self->pending_queue); + GTask *task; + OstreeFetcherPendingURI *pending; + GCancellable *cancellable; + + task = g_queue_pop_head (&self->pending_queue); + + pending = g_task_get_task_data (task); + cancellable = g_task_get_cancellable (task); self->outstanding++; - soup_request_send_async (next->request, next->cancellable, - on_request_sent, next); + soup_request_send_async (pending->request, + cancellable, + on_request_sent, + g_object_ref (task)); + + g_object_unref (task); } } @@ -303,7 +306,7 @@ finish_stream (OstreeFetcherPendingURI *pending, */ if (pending->out_stream) { - if (!g_output_stream_close (pending->out_stream, pending->cancellable, error)) + if (!g_output_stream_close (pending->out_stream, cancellable, error)) goto out; g_hash_table_remove (pending->self->output_stream_set, pending->out_stream); } @@ -347,22 +350,32 @@ on_out_splice_complete (GObject *object, GAsyncResult *result, gpointer user_data) { - OstreeFetcherPendingURI *pending = user_data; + GTask *task = G_TASK (user_data); + OstreeFetcherPendingURI *pending; + GCancellable *cancellable; gssize bytes_written; GError *local_error = NULL; + pending = g_task_get_task_data (task); + cancellable = g_task_get_cancellable (task); + bytes_written = g_output_stream_splice_finish ((GOutputStream *)object, result, &local_error); if (bytes_written < 0) goto out; - g_input_stream_read_bytes_async (pending->request_body, 8192, G_PRIORITY_DEFAULT, - pending->cancellable, on_stream_read, pending); + g_input_stream_read_bytes_async (pending->request_body, + 8192, G_PRIORITY_DEFAULT, + cancellable, + on_stream_read, + g_object_ref (task)); out: if (local_error) - g_task_return_error (pending->task, local_error); + g_task_return_error (task, local_error); + + g_object_unref (task); } static void @@ -370,11 +383,16 @@ on_stream_read (GObject *object, GAsyncResult *result, gpointer user_data) { - OstreeFetcherPendingURI *pending = user_data; + GTask *task = G_TASK (user_data); + OstreeFetcherPendingURI *pending; + GCancellable *cancellable; g_autoptr(GBytes) bytes = NULL; gsize bytes_read; GError *local_error = NULL; + pending = g_task_get_task_data (task); + cancellable = g_task_get_cancellable (task); + bytes = g_input_stream_read_bytes_finish ((GInputStream*)object, result, &local_error); if (!bytes) goto out; @@ -382,12 +400,11 @@ on_stream_read (GObject *object, bytes_read = g_bytes_get_size (bytes); if (bytes_read == 0) { - if (!finish_stream (pending, pending->cancellable, &local_error)) + if (!finish_stream (pending, cancellable, &local_error)) goto out; - g_task_return_pointer (pending->task, + g_task_return_pointer (task, g_strdup (pending->out_tmpfile), (GDestroyNotify) g_free); - g_object_unref (pending->task); } else { @@ -415,18 +432,17 @@ on_stream_read (GObject *object, g_output_stream_splice_async (pending->out_stream, membuf, G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE, G_PRIORITY_DEFAULT, - pending->cancellable, + cancellable, on_out_splice_complete, - pending); + g_object_ref (task)); } } out: if (local_error) - { - g_task_return_error (pending->task, local_error); - g_object_unref (pending->task); - } + g_task_return_error (task, local_error); + + g_object_unref (task); } static void @@ -434,10 +450,15 @@ on_request_sent (GObject *object, GAsyncResult *result, gpointer user_data) { - OstreeFetcherPendingURI *pending = user_data; + GTask *task = G_TASK (user_data); + OstreeFetcherPendingURI *pending; + GCancellable *cancellable; GError *local_error = NULL; glnx_unref_object SoupMessage *msg = NULL; + pending = g_task_get_task_data (task); + cancellable = g_task_get_cancellable (task); + pending->state = OSTREE_FETCHER_STATE_COMPLETE; pending->request_body = soup_request_send_finish ((SoupRequest*) object, result, &local_error); @@ -455,18 +476,17 @@ on_request_sent (GObject *object, (void) g_input_stream_close (pending->request_body, NULL, NULL); if (pending->is_stream) { - g_task_return_pointer (pending->task, + g_task_return_pointer (task, g_object_ref (pending->request_body), (GDestroyNotify) g_object_unref); } else { - g_task_return_pointer (pending->task, + g_task_return_pointer (task, g_strdup (pending->out_tmpfile), (GDestroyNotify) g_free); } - g_object_unref (pending->task); - return; + goto out; } else if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code)) { @@ -513,16 +533,17 @@ on_request_sent (GObject *object, } pending->out_stream = g_unix_output_stream_new (fd, TRUE); g_hash_table_add (pending->self->output_stream_set, g_object_ref (pending->out_stream)); - g_input_stream_read_bytes_async (pending->request_body, 8192, G_PRIORITY_DEFAULT, - pending->cancellable, on_stream_read, pending); - + g_input_stream_read_bytes_async (pending->request_body, + 8192, G_PRIORITY_DEFAULT, + cancellable, + on_stream_read, + g_object_ref (task)); } else { - g_task_return_pointer (pending->task, + g_task_return_pointer (task, g_object_ref (pending->request_body), (GDestroyNotify) g_object_unref); - g_object_unref (pending->task); } out: @@ -530,12 +551,13 @@ on_request_sent (GObject *object, { if (pending->request_body) (void) g_input_stream_close (pending->request_body, NULL, NULL); - g_task_return_error (pending->task, local_error); - g_object_unref (pending->task); + g_task_return_error (task, local_error); } + + g_object_unref (task); } -static OstreeFetcherPendingURI * +static void ostree_fetcher_request_uri_internal (OstreeFetcher *self, SoupURI *uri, gboolean is_stream, @@ -546,29 +568,30 @@ ostree_fetcher_request_uri_internal (OstreeFetcher *self, gpointer user_data, gpointer source_tag) { + GTask *task; OstreeFetcherPendingURI *pending = g_new0 (OstreeFetcherPendingURI, 1); GError *local_error = NULL; - pending->refcount = 1; pending->request = soup_requester_request_uri (self->requester, uri, &local_error); pending->self = g_object_ref (self); pending->uri = soup_uri_copy (uri); - pending->priority = priority; pending->max_size = max_size; pending->is_stream = is_stream; - pending->cancellable = cancellable ? g_object_ref (cancellable) : NULL; - pending->task = g_task_new (self, - cancellable, - callback, user_data); - g_task_set_source_tag (pending->task, source_tag); - g_task_set_task_data (pending->task, pending, (GDestroyNotify) pending_uri_free); + task = g_task_new (self, cancellable, callback, user_data); + g_task_set_source_tag (task, source_tag); + g_task_set_task_data (task, pending, (GDestroyNotify) pending_uri_free); + + /* We'll use the GTask priority for our own priority queue. */ + g_task_set_priority (task, priority); if (is_stream) { - soup_request_send_async (pending->request, cancellable, - on_request_sent, pending); + soup_request_send_async (pending->request, + cancellable, + on_request_sent, + g_object_ref (task)); } else { @@ -588,7 +611,7 @@ ostree_fetcher_request_uri_internal (OstreeFetcher *self, else { gs_set_error_from_errno (&local_error, errno); - goto fail; + goto out; } } @@ -602,7 +625,9 @@ ostree_fetcher_request_uri_internal (OstreeFetcher *self, pending->out_tmpfile = tmpfile; tmpfile = NULL; /* Transfer ownership */ - g_queue_insert_sorted (&self->pending_queue, pending, pending_uri_compare, NULL); + g_queue_insert_sorted (&self->pending_queue, + g_object_ref (task), + pending_task_compare, NULL); ostree_fetcher_process_pending_queue (self); } @@ -610,13 +635,8 @@ ostree_fetcher_request_uri_internal (OstreeFetcher *self, self->total_requests++; - pending->refcount++; - - return pending; - - fail: - pending_uri_free (pending); - return NULL; +out: + g_object_unref (task); } void