fetcher: Rework reference counting

Have OstreeFetcherPendingURI be the GTask's task_data and pass the GTask
around in queues and callback closures.  The reference counting before
was a little confusing and this helps clarify it, at least to me.

OstreeFetcherPendingURI no longer needs its own reference count.
This commit is contained in:
Matthew Barnes 2015-09-23 17:11:51 -04:00
parent 330a99c40b
commit 771075d319
1 changed files with 87 additions and 67 deletions

View File

@ -39,10 +39,8 @@ typedef enum {
} OstreeFetcherState; } OstreeFetcherState;
typedef struct { typedef struct {
guint refcount;
OstreeFetcher *self; OstreeFetcher *self;
SoupURI *uri; SoupURI *uri;
int priority;
OstreeFetcherState state; OstreeFetcherState state;
@ -57,37 +55,30 @@ typedef struct {
guint64 current_size; guint64 current_size;
guint64 content_length; guint64 content_length;
GCancellable *cancellable;
GTask *task; GTask *task;
} OstreeFetcherPendingURI; } OstreeFetcherPendingURI;
static int static int
pending_uri_compare (gconstpointer a, pending_task_compare (gconstpointer a,
gconstpointer b, gconstpointer b,
gpointer unused) gpointer unused)
{ {
const OstreeFetcherPendingURI *pending_a = a; gint priority_a = g_task_get_priority (G_TASK (a));
const OstreeFetcherPendingURI *pending_b = b; gint priority_b = g_task_get_priority (G_TASK (b));
return (pending_a->priority == pending_b->priority) ? 0 : return (priority_a == priority_b) ? 0 :
(pending_a->priority < pending_b->priority) ? -1 : 1; (priority_a < priority_b) ? -1 : 1;
} }
static void static void
pending_uri_free (OstreeFetcherPendingURI *pending) pending_uri_free (OstreeFetcherPendingURI *pending)
{ {
g_assert (pending->refcount > 0);
pending->refcount--;
if (pending->refcount > 0)
return;
soup_uri_free (pending->uri); soup_uri_free (pending->uri);
g_clear_object (&pending->self); g_clear_object (&pending->self);
g_clear_object (&pending->request); g_clear_object (&pending->request);
g_clear_object (&pending->request_body); g_clear_object (&pending->request_body);
g_free (pending->out_tmpfile); g_free (pending->out_tmpfile);
g_clear_object (&pending->out_stream); g_clear_object (&pending->out_stream);
g_clear_object (&pending->cancellable);
g_free (pending); g_free (pending);
} }
@ -130,7 +121,8 @@ _ostree_fetcher_finalize (GObject *object)
g_hash_table_destroy (self->sending_messages); g_hash_table_destroy (self->sending_messages);
g_hash_table_destroy (self->output_stream_set); g_hash_table_destroy (self->output_stream_set);
g_queue_clear (&self->pending_queue); while (!g_queue_is_empty (&self->pending_queue))
g_object_unref (g_queue_pop_head (&self->pending_queue));
G_OBJECT_CLASS (_ostree_fetcher_parent_class)->finalize (object); G_OBJECT_CLASS (_ostree_fetcher_parent_class)->finalize (object);
} }
@ -282,11 +274,22 @@ ostree_fetcher_process_pending_queue (OstreeFetcher *self)
while (g_queue_peek_head (&self->pending_queue) != NULL && while (g_queue_peek_head (&self->pending_queue) != NULL &&
self->outstanding < self->max_outstanding) self->outstanding < self->max_outstanding)
{ {
OstreeFetcherPendingURI *next = g_queue_pop_head (&self->pending_queue); GTask *task;
OstreeFetcherPendingURI *pending;
GCancellable *cancellable;
task = g_queue_pop_head (&self->pending_queue);
pending = g_task_get_task_data (task);
cancellable = g_task_get_cancellable (task);
self->outstanding++; self->outstanding++;
soup_request_send_async (next->request, next->cancellable, soup_request_send_async (pending->request,
on_request_sent, next); cancellable,
on_request_sent,
g_object_ref (task));
g_object_unref (task);
} }
} }
@ -303,7 +306,7 @@ finish_stream (OstreeFetcherPendingURI *pending,
*/ */
if (pending->out_stream) if (pending->out_stream)
{ {
if (!g_output_stream_close (pending->out_stream, pending->cancellable, error)) if (!g_output_stream_close (pending->out_stream, cancellable, error))
goto out; goto out;
g_hash_table_remove (pending->self->output_stream_set, pending->out_stream); g_hash_table_remove (pending->self->output_stream_set, pending->out_stream);
} }
@ -347,22 +350,32 @@ on_out_splice_complete (GObject *object,
GAsyncResult *result, GAsyncResult *result,
gpointer user_data) gpointer user_data)
{ {
OstreeFetcherPendingURI *pending = user_data; GTask *task = G_TASK (user_data);
OstreeFetcherPendingURI *pending;
GCancellable *cancellable;
gssize bytes_written; gssize bytes_written;
GError *local_error = NULL; GError *local_error = NULL;
pending = g_task_get_task_data (task);
cancellable = g_task_get_cancellable (task);
bytes_written = g_output_stream_splice_finish ((GOutputStream *)object, bytes_written = g_output_stream_splice_finish ((GOutputStream *)object,
result, result,
&local_error); &local_error);
if (bytes_written < 0) if (bytes_written < 0)
goto out; goto out;
g_input_stream_read_bytes_async (pending->request_body, 8192, G_PRIORITY_DEFAULT, g_input_stream_read_bytes_async (pending->request_body,
pending->cancellable, on_stream_read, pending); 8192, G_PRIORITY_DEFAULT,
cancellable,
on_stream_read,
g_object_ref (task));
out: out:
if (local_error) if (local_error)
g_task_return_error (pending->task, local_error); g_task_return_error (task, local_error);
g_object_unref (task);
} }
static void static void
@ -370,11 +383,16 @@ on_stream_read (GObject *object,
GAsyncResult *result, GAsyncResult *result,
gpointer user_data) gpointer user_data)
{ {
OstreeFetcherPendingURI *pending = user_data; GTask *task = G_TASK (user_data);
OstreeFetcherPendingURI *pending;
GCancellable *cancellable;
g_autoptr(GBytes) bytes = NULL; g_autoptr(GBytes) bytes = NULL;
gsize bytes_read; gsize bytes_read;
GError *local_error = NULL; GError *local_error = NULL;
pending = g_task_get_task_data (task);
cancellable = g_task_get_cancellable (task);
bytes = g_input_stream_read_bytes_finish ((GInputStream*)object, result, &local_error); bytes = g_input_stream_read_bytes_finish ((GInputStream*)object, result, &local_error);
if (!bytes) if (!bytes)
goto out; goto out;
@ -382,12 +400,11 @@ on_stream_read (GObject *object,
bytes_read = g_bytes_get_size (bytes); bytes_read = g_bytes_get_size (bytes);
if (bytes_read == 0) if (bytes_read == 0)
{ {
if (!finish_stream (pending, pending->cancellable, &local_error)) if (!finish_stream (pending, cancellable, &local_error))
goto out; goto out;
g_task_return_pointer (pending->task, g_task_return_pointer (task,
g_strdup (pending->out_tmpfile), g_strdup (pending->out_tmpfile),
(GDestroyNotify) g_free); (GDestroyNotify) g_free);
g_object_unref (pending->task);
} }
else else
{ {
@ -415,18 +432,17 @@ on_stream_read (GObject *object,
g_output_stream_splice_async (pending->out_stream, membuf, g_output_stream_splice_async (pending->out_stream, membuf,
G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE, G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE,
G_PRIORITY_DEFAULT, G_PRIORITY_DEFAULT,
pending->cancellable, cancellable,
on_out_splice_complete, on_out_splice_complete,
pending); g_object_ref (task));
} }
} }
out: out:
if (local_error) if (local_error)
{ g_task_return_error (task, local_error);
g_task_return_error (pending->task, local_error);
g_object_unref (pending->task); g_object_unref (task);
}
} }
static void static void
@ -434,10 +450,15 @@ on_request_sent (GObject *object,
GAsyncResult *result, GAsyncResult *result,
gpointer user_data) gpointer user_data)
{ {
OstreeFetcherPendingURI *pending = user_data; GTask *task = G_TASK (user_data);
OstreeFetcherPendingURI *pending;
GCancellable *cancellable;
GError *local_error = NULL; GError *local_error = NULL;
glnx_unref_object SoupMessage *msg = NULL; glnx_unref_object SoupMessage *msg = NULL;
pending = g_task_get_task_data (task);
cancellable = g_task_get_cancellable (task);
pending->state = OSTREE_FETCHER_STATE_COMPLETE; pending->state = OSTREE_FETCHER_STATE_COMPLETE;
pending->request_body = soup_request_send_finish ((SoupRequest*) object, pending->request_body = soup_request_send_finish ((SoupRequest*) object,
result, &local_error); result, &local_error);
@ -455,18 +476,17 @@ on_request_sent (GObject *object,
(void) g_input_stream_close (pending->request_body, NULL, NULL); (void) g_input_stream_close (pending->request_body, NULL, NULL);
if (pending->is_stream) if (pending->is_stream)
{ {
g_task_return_pointer (pending->task, g_task_return_pointer (task,
g_object_ref (pending->request_body), g_object_ref (pending->request_body),
(GDestroyNotify) g_object_unref); (GDestroyNotify) g_object_unref);
} }
else else
{ {
g_task_return_pointer (pending->task, g_task_return_pointer (task,
g_strdup (pending->out_tmpfile), g_strdup (pending->out_tmpfile),
(GDestroyNotify) g_free); (GDestroyNotify) g_free);
} }
g_object_unref (pending->task); goto out;
return;
} }
else if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code)) else if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code))
{ {
@ -513,16 +533,17 @@ on_request_sent (GObject *object,
} }
pending->out_stream = g_unix_output_stream_new (fd, TRUE); pending->out_stream = g_unix_output_stream_new (fd, TRUE);
g_hash_table_add (pending->self->output_stream_set, g_object_ref (pending->out_stream)); g_hash_table_add (pending->self->output_stream_set, g_object_ref (pending->out_stream));
g_input_stream_read_bytes_async (pending->request_body, 8192, G_PRIORITY_DEFAULT, g_input_stream_read_bytes_async (pending->request_body,
pending->cancellable, on_stream_read, pending); 8192, G_PRIORITY_DEFAULT,
cancellable,
on_stream_read,
g_object_ref (task));
} }
else else
{ {
g_task_return_pointer (pending->task, g_task_return_pointer (task,
g_object_ref (pending->request_body), g_object_ref (pending->request_body),
(GDestroyNotify) g_object_unref); (GDestroyNotify) g_object_unref);
g_object_unref (pending->task);
} }
out: out:
@ -530,12 +551,13 @@ on_request_sent (GObject *object,
{ {
if (pending->request_body) if (pending->request_body)
(void) g_input_stream_close (pending->request_body, NULL, NULL); (void) g_input_stream_close (pending->request_body, NULL, NULL);
g_task_return_error (pending->task, local_error); g_task_return_error (task, local_error);
g_object_unref (pending->task);
} }
g_object_unref (task);
} }
static OstreeFetcherPendingURI * static void
ostree_fetcher_request_uri_internal (OstreeFetcher *self, ostree_fetcher_request_uri_internal (OstreeFetcher *self,
SoupURI *uri, SoupURI *uri,
gboolean is_stream, gboolean is_stream,
@ -546,29 +568,30 @@ ostree_fetcher_request_uri_internal (OstreeFetcher *self,
gpointer user_data, gpointer user_data,
gpointer source_tag) gpointer source_tag)
{ {
GTask *task;
OstreeFetcherPendingURI *pending = g_new0 (OstreeFetcherPendingURI, 1); OstreeFetcherPendingURI *pending = g_new0 (OstreeFetcherPendingURI, 1);
GError *local_error = NULL; GError *local_error = NULL;
pending->refcount = 1;
pending->request = soup_requester_request_uri (self->requester, uri, &local_error); pending->request = soup_requester_request_uri (self->requester, uri, &local_error);
pending->self = g_object_ref (self); pending->self = g_object_ref (self);
pending->uri = soup_uri_copy (uri); pending->uri = soup_uri_copy (uri);
pending->priority = priority;
pending->max_size = max_size; pending->max_size = max_size;
pending->is_stream = is_stream; pending->is_stream = is_stream;
pending->cancellable = cancellable ? g_object_ref (cancellable) : NULL;
pending->task = g_task_new (self,
cancellable,
callback, user_data);
g_task_set_source_tag (pending->task, source_tag); task = g_task_new (self, cancellable, callback, user_data);
g_task_set_task_data (pending->task, pending, (GDestroyNotify) pending_uri_free); g_task_set_source_tag (task, source_tag);
g_task_set_task_data (task, pending, (GDestroyNotify) pending_uri_free);
/* We'll use the GTask priority for our own priority queue. */
g_task_set_priority (task, priority);
if (is_stream) if (is_stream)
{ {
soup_request_send_async (pending->request, cancellable, soup_request_send_async (pending->request,
on_request_sent, pending); cancellable,
on_request_sent,
g_object_ref (task));
} }
else else
{ {
@ -588,7 +611,7 @@ ostree_fetcher_request_uri_internal (OstreeFetcher *self,
else else
{ {
gs_set_error_from_errno (&local_error, errno); gs_set_error_from_errno (&local_error, errno);
goto fail; goto out;
} }
} }
@ -602,7 +625,9 @@ ostree_fetcher_request_uri_internal (OstreeFetcher *self,
pending->out_tmpfile = tmpfile; pending->out_tmpfile = tmpfile;
tmpfile = NULL; /* Transfer ownership */ tmpfile = NULL; /* Transfer ownership */
g_queue_insert_sorted (&self->pending_queue, pending, pending_uri_compare, NULL); g_queue_insert_sorted (&self->pending_queue,
g_object_ref (task),
pending_task_compare, NULL);
ostree_fetcher_process_pending_queue (self); ostree_fetcher_process_pending_queue (self);
} }
@ -610,13 +635,8 @@ ostree_fetcher_request_uri_internal (OstreeFetcher *self,
self->total_requests++; self->total_requests++;
pending->refcount++; out:
g_object_unref (task);
return pending;
fail:
pending_uri_free (pending);
return NULL;
} }
void void