diff --git a/src/libostree/ostree-fetcher.c b/src/libostree/ostree-fetcher.c index c3bacd11..2964a1aa 100644 --- a/src/libostree/ostree-fetcher.c +++ b/src/libostree/ostree-fetcher.c @@ -41,7 +41,29 @@ typedef enum { } OstreeFetcherState; typedef struct { - OstreeFetcher *self; + volatile int ref_count; + + SoupSession *session; + GMainContext *main_context; + GMainLoop *main_loop; + + int tmpdir_dfd; + int max_outstanding; + + /* Queue for libsoup, see bgo#708591 */ + GQueue pending_queue; + GHashTable *outstanding; + + /* Shared across threads; be sure to lock. */ + GHashTable *output_stream_set; /* set */ + GMutex output_stream_set_lock; + + /* Also protected by output_stream_set_lock. */ + guint64 total_downloaded; +} ThreadClosure; + +typedef struct { + ThreadClosure *thread_closure; SoupURI *uri; OstreeFetcherState state; @@ -60,29 +82,30 @@ typedef struct { GTask *task; } OstreeFetcherPendingURI; +/* Used by session_thread_idle_add() */ +typedef void (*SessionThreadFunc) (ThreadClosure *thread_closure, + gpointer data); + +/* Used by session_thread_idle_add() */ +typedef struct { + ThreadClosure *thread_closure; + SessionThreadFunc function; + gpointer data; + GDestroyNotify notify; +} IdleClosure; + struct OstreeFetcher { GObject parent_instance; OstreeFetcherConfigFlags config_flags; - int tmpdir_dfd; + char *tmpdir_name; GLnxLockFile tmpdir_lock; int base_tmpdir_dfd; - GTlsCertificate *client_cert; - - SoupSession *session; - SoupRequester *requester; - - GHashTable *output_stream_set; /* set */ - - guint64 total_downloaded; - - /* Queue for libsoup, see bgo#708591 */ - GQueue pending_queue; - GHashTable *outstanding; - gint max_outstanding; + GThread *session_thread; + ThreadClosure *thread_closure; }; enum { @@ -92,6 +115,56 @@ enum { G_DEFINE_TYPE (OstreeFetcher, _ostree_fetcher, G_TYPE_OBJECT) +static ThreadClosure * +thread_closure_ref (ThreadClosure *thread_closure) +{ + g_return_val_if_fail (thread_closure != NULL, NULL); + g_return_val_if_fail (thread_closure->ref_count > 0, NULL); + + g_atomic_int_inc (&thread_closure->ref_count); + + return thread_closure; +} + +static void +thread_closure_unref (ThreadClosure *thread_closure) +{ + g_return_if_fail (thread_closure != NULL); + g_return_if_fail (thread_closure->ref_count > 0); + + if (g_atomic_int_dec_and_test (&thread_closure->ref_count)) + { + g_clear_object (&thread_closure->session); + + g_clear_pointer (&thread_closure->main_context, g_main_context_unref); + g_clear_pointer (&thread_closure->main_loop, g_main_loop_unref); + + if (thread_closure->tmpdir_dfd != -1) + close (thread_closure->tmpdir_dfd); + + while (!g_queue_is_empty (&thread_closure->pending_queue)) + g_object_unref (g_queue_pop_head (&thread_closure->pending_queue)); + + g_clear_pointer (&thread_closure->outstanding, g_hash_table_unref); + + g_clear_pointer (&thread_closure->output_stream_set, g_hash_table_unref); + g_mutex_clear (&thread_closure->output_stream_set_lock); + + g_slice_free (ThreadClosure, thread_closure); + } +} + +static void +idle_closure_free (IdleClosure *idle_closure) +{ + g_clear_pointer (&idle_closure->thread_closure, thread_closure_unref); + + if (idle_closure->notify != NULL) + idle_closure->notify (idle_closure->data); + + g_slice_free (IdleClosure, idle_closure); +} + static int pending_task_compare (gconstpointer a, gconstpointer b, @@ -107,10 +180,11 @@ pending_task_compare (gconstpointer a, static void pending_uri_free (OstreeFetcherPendingURI *pending) { - g_hash_table_remove (pending->self->outstanding, pending); + g_hash_table_remove (pending->thread_closure->outstanding, pending); + + g_clear_pointer (&pending->thread_closure, thread_closure_unref); soup_uri_free (pending->uri); - g_clear_object (&pending->self); g_clear_object (&pending->request); g_clear_object (&pending->request_body); g_free (pending->out_tmpfile); @@ -118,6 +192,250 @@ pending_uri_free (OstreeFetcherPendingURI *pending) g_free (pending); } +static gboolean +session_thread_idle_dispatch (gpointer data) +{ + IdleClosure *idle_closure = data; + + idle_closure->function (idle_closure->thread_closure, + idle_closure->data); + + return G_SOURCE_REMOVE; +} + +static void +session_thread_idle_add (ThreadClosure *thread_closure, + SessionThreadFunc function, + gpointer data, + GDestroyNotify notify) +{ + IdleClosure *idle_closure; + + g_return_if_fail (thread_closure != NULL); + g_return_if_fail (function != NULL); + + idle_closure = g_slice_new (IdleClosure); + idle_closure->thread_closure = thread_closure_ref (thread_closure); + idle_closure->function = function; + idle_closure->data = data; + idle_closure->notify = notify; + + g_main_context_invoke_full (thread_closure->main_context, + G_PRIORITY_DEFAULT, + session_thread_idle_dispatch, + idle_closure, /* takes ownership */ + (GDestroyNotify) idle_closure_free); +} + +static void +session_thread_add_logger (ThreadClosure *thread_closure, + gpointer data) +{ + glnx_unref_object SoupLogger *logger = NULL; + + logger = soup_logger_new (SOUP_LOGGER_LOG_BODY, 500); + soup_session_add_feature (thread_closure->session, + SOUP_SESSION_FEATURE (logger)); +} + +static void +session_thread_config_flags (ThreadClosure *thread_closure, + gpointer data) +{ + OstreeFetcherConfigFlags config_flags; + + config_flags = GPOINTER_TO_UINT (data); + + if ((config_flags & OSTREE_FETCHER_FLAGS_TLS_PERMISSIVE) > 0) + { + g_object_set (thread_closure->session, + SOUP_SESSION_SSL_STRICT, + FALSE, NULL); + } +} + +static void +session_thread_set_proxy_cb (ThreadClosure *thread_closure, + gpointer data) +{ + SoupURI *proxy_uri = data; + + g_object_set (thread_closure->session, + SOUP_SESSION_PROXY_URI, + proxy_uri, NULL); +} + +static void +session_thread_set_tls_interaction_cb (ThreadClosure *thread_closure, + gpointer data) +{ + GTlsInteraction *interaction = data; + + g_object_set (thread_closure->session, + SOUP_SESSION_TLS_INTERACTION, + interaction, NULL); +} + +static void +session_thread_set_tls_database_cb (ThreadClosure *thread_closure, + gpointer data) +{ + GTlsDatabase *database = data; + + if (database != NULL) + { + g_object_set (thread_closure->session, + SOUP_SESSION_TLS_DATABASE, + database, NULL); + } + else + { + g_object_set (thread_closure->session, + SOUP_SESSION_SSL_USE_SYSTEM_CA_FILE, + TRUE, NULL); + } +} + +static void +on_request_sent (GObject *object, GAsyncResult *result, gpointer user_data); + +static void +session_thread_process_pending_queue (ThreadClosure *thread_closure) +{ + + while (g_queue_peek_head (&thread_closure->pending_queue) != NULL && + g_hash_table_size (thread_closure->outstanding) < thread_closure->max_outstanding) + { + GTask *task; + OstreeFetcherPendingURI *pending; + GCancellable *cancellable; + + task = g_queue_pop_head (&thread_closure->pending_queue); + + pending = g_task_get_task_data (task); + cancellable = g_task_get_cancellable (task); + + /* pending_uri_free() removes this. */ + g_hash_table_add (thread_closure->outstanding, pending); + + soup_request_send_async (pending->request, + cancellable, + on_request_sent, + g_object_ref (task)); + + g_object_unref (task); + } +} + +static void +session_thread_request_uri (ThreadClosure *thread_closure, + gpointer data) +{ + GTask *task = G_TASK (data); + OstreeFetcherPendingURI *pending; + GCancellable *cancellable; + GError *local_error = NULL; + + pending = g_task_get_task_data (task); + cancellable = g_task_get_cancellable (task); + + pending->request = soup_session_request_uri (thread_closure->session, + pending->uri, + &local_error); + + if (local_error != NULL) + { + g_task_return_error (task, local_error); + return; + } + + if (pending->is_stream) + { + soup_request_send_async (pending->request, + cancellable, + on_request_sent, + g_object_ref (task)); + } + else + { + g_autofree char *uristring = soup_uri_to_string (pending->uri, FALSE); + g_autofree char *tmpfile = NULL; + struct stat stbuf; + gboolean exists; + + tmpfile = g_compute_checksum_for_string (G_CHECKSUM_SHA256, uristring, strlen (uristring)); + + if (fstatat (thread_closure->tmpdir_dfd, tmpfile, &stbuf, AT_SYMLINK_NOFOLLOW) == 0) + exists = TRUE; + else + { + if (errno == ENOENT) + exists = FALSE; + else + { + gs_set_error_from_errno (&local_error, errno); + g_task_return_error (task, local_error); + return; + } + } + + if (SOUP_IS_REQUEST_HTTP (pending->request)) + { + glnx_unref_object SoupMessage *msg = NULL; + msg = soup_request_http_get_message ((SoupRequestHTTP*) pending->request); + if (exists && stbuf.st_size > 0) + soup_message_headers_set_range (msg->request_headers, stbuf.st_size, -1); + } + pending->out_tmpfile = tmpfile; + tmpfile = NULL; /* Transfer ownership */ + + g_queue_insert_sorted (&thread_closure->pending_queue, + g_object_ref (task), + pending_task_compare, NULL); + session_thread_process_pending_queue (thread_closure); + } +} + +static gpointer +ostree_fetcher_session_thread (gpointer data) +{ + ThreadClosure *closure = data; + gint max_conns; + + /* This becomes the GMainContext that SoupSession schedules async + * callbacks and emits signals from. Make it the thread-default + * context for this thread before creating the session. */ + g_main_context_push_thread_default (closure->main_context); + + closure->session = soup_session_async_new_with_options (SOUP_SESSION_USER_AGENT, "ostree ", + SOUP_SESSION_SSL_USE_SYSTEM_CA_FILE, TRUE, + SOUP_SESSION_USE_THREAD_CONTEXT, TRUE, + SOUP_SESSION_ADD_FEATURE_BY_TYPE, SOUP_TYPE_REQUESTER, + SOUP_SESSION_TIMEOUT, 60, + SOUP_SESSION_IDLE_TIMEOUT, 60, + NULL); + + g_object_get (closure->session, "max-conns-per-host", &max_conns, NULL); + if (max_conns < 8) + { + /* We download a lot of small objects in ostree, so this + * helps a lot. Also matches what most modern browsers do. */ + max_conns = 8; + g_object_set (closure->session, + "max-conns-per-host", + max_conns, NULL); + } + closure->max_outstanding = 3 * max_conns; + + g_main_loop_run (closure->main_loop); + + g_main_context_pop_thread_default (closure->main_context); + + thread_closure_unref (closure); + + return NULL; +} + static void _ostree_fetcher_set_property (GObject *object, guint prop_id, @@ -159,12 +477,12 @@ _ostree_fetcher_get_property (GObject *object, static void _ostree_fetcher_finalize (GObject *object) { - OstreeFetcher *self; + OstreeFetcher *self = OSTREE_FETCHER (object); - self = OSTREE_FETCHER (object); - - if (self->tmpdir_dfd != -1) - close (self->tmpdir_dfd); + /* Terminate the session thread. */ + g_main_loop_quit (self->thread_closure->main_loop); + g_clear_pointer (&self->session_thread, g_thread_unref); + g_clear_pointer (&self->thread_closure, thread_closure_unref); /* Note: We don't remove the tmpdir here, because that would cause us to not reuse it on resume. This happens because we use two @@ -174,19 +492,57 @@ _ostree_fetcher_finalize (GObject *object) g_free (self->tmpdir_name); glnx_release_lock_file (&self->tmpdir_lock); - g_clear_object (&self->session); - g_clear_object (&self->client_cert); - - g_hash_table_destroy (self->output_stream_set); - - while (!g_queue_is_empty (&self->pending_queue)) - g_object_unref (g_queue_pop_head (&self->pending_queue)); - - g_hash_table_destroy (self->outstanding); - G_OBJECT_CLASS (_ostree_fetcher_parent_class)->finalize (object); } +static void +_ostree_fetcher_constructed (GObject *object) +{ + OstreeFetcher *self = OSTREE_FETCHER (object); + g_autoptr(GMainContext) main_context = NULL; + const char *http_proxy; + + main_context = g_main_context_new (); + + self->thread_closure = g_slice_new0 (ThreadClosure); + self->thread_closure->ref_count = 1; + self->thread_closure->main_context = g_main_context_ref (main_context); + self->thread_closure->main_loop = g_main_loop_new (main_context, FALSE); + self->thread_closure->tmpdir_dfd = -1; + + self->thread_closure->outstanding = g_hash_table_new (NULL, NULL); + self->thread_closure->output_stream_set = g_hash_table_new_full (NULL, NULL, + (GDestroyNotify) NULL, + (GDestroyNotify) g_object_unref); + + if (g_getenv ("OSTREE_DEBUG_HTTP")) + { + session_thread_idle_add (self->thread_closure, + session_thread_add_logger, + NULL, (GDestroyNotify) NULL); + } + + if (self->config_flags != 0) + { + session_thread_idle_add (self->thread_closure, + session_thread_config_flags, + GUINT_TO_POINTER (self->config_flags), + (GDestroyNotify) NULL); + } + + http_proxy = g_getenv ("http_proxy"); + if (http_proxy != NULL) + _ostree_fetcher_set_proxy (self, http_proxy); + + /* FIXME Maybe implement GInitableIface and use g_thread_try_new() + * so we can try to handle thread creation errors gracefully? */ + self->session_thread = g_thread_new ("fetcher-session-thread", + ostree_fetcher_session_thread, + thread_closure_ref (self->thread_closure)); + + G_OBJECT_CLASS (_ostree_fetcher_parent_class)->constructed (object); +} + static void _ostree_fetcher_class_init (OstreeFetcherClass *klass) { @@ -195,6 +551,7 @@ _ostree_fetcher_class_init (OstreeFetcherClass *klass) gobject_class->set_property = _ostree_fetcher_set_property; gobject_class->get_property = _ostree_fetcher_get_property; gobject_class->finalize = _ostree_fetcher_finalize; + gobject_class->constructed = _ostree_fetcher_constructed; g_object_class_install_property (gobject_class, PROP_CONFIG_FLAGS, @@ -211,50 +568,9 @@ _ostree_fetcher_class_init (OstreeFetcherClass *klass) static void _ostree_fetcher_init (OstreeFetcher *self) { - gint max_conns; - const char *http_proxy; GLnxLockFile empty_lockfile = GLNX_LOCK_FILE_INIT; - g_queue_init (&self->pending_queue); - self->session = soup_session_async_new_with_options (SOUP_SESSION_USER_AGENT, "ostree ", - SOUP_SESSION_SSL_USE_SYSTEM_CA_FILE, TRUE, - SOUP_SESSION_USE_THREAD_CONTEXT, TRUE, - SOUP_SESSION_ADD_FEATURE_BY_TYPE, SOUP_TYPE_REQUESTER, - SOUP_SESSION_TIMEOUT, 60, - SOUP_SESSION_IDLE_TIMEOUT, 60, - NULL); - - http_proxy = g_getenv ("http_proxy"); - if (http_proxy) - { - _ostree_fetcher_set_proxy (self, http_proxy); - } - - if (g_getenv ("OSTREE_DEBUG_HTTP")) - soup_session_add_feature (self->session, (SoupSessionFeature*)soup_logger_new (SOUP_LOGGER_LOG_BODY, 500)); - - if ((self->config_flags & OSTREE_FETCHER_FLAGS_TLS_PERMISSIVE) > 0) - g_object_set (self->session, SOUP_SESSION_SSL_STRICT, FALSE, NULL); - - self->requester = (SoupRequester *)soup_session_get_feature (self->session, SOUP_TYPE_REQUESTER); - g_object_get (self->session, "max-conns-per-host", &max_conns, NULL); - if (max_conns <= 8) - { - // We download a lot of small objects in ostree, so this helps a - // lot. Also matches what most modern browsers do. - max_conns = 8; - g_object_set (self->session, "max-conns-per-host", max_conns, NULL); - } - - self->max_outstanding = 3 * max_conns; - - self->output_stream_set = g_hash_table_new_full (NULL, NULL, NULL, (GDestroyNotify)g_object_unref); - - self->outstanding = g_hash_table_new_full (NULL, NULL, NULL, NULL); - - self->tmpdir_dfd = -1; self->tmpdir_lock = empty_lockfile; - } OstreeFetcher * @@ -270,13 +586,13 @@ _ostree_fetcher_new (int tmpdir_dfd, if (!_ostree_repo_allocate_tmpdir (tmpdir_dfd, "fetcher-", &self->tmpdir_name, - &self->tmpdir_dfd, + &self->thread_closure->tmpdir_dfd, &self->tmpdir_lock, NULL, cancellable, error)) return NULL; - self->tmpdir_dfd = tmpdir_dfd; + self->base_tmpdir_dfd = tmpdir_dfd; return self; } @@ -284,81 +600,69 @@ _ostree_fetcher_new (int tmpdir_dfd, int _ostree_fetcher_get_dfd (OstreeFetcher *fetcher) { - return fetcher->tmpdir_dfd; + return fetcher->thread_closure->tmpdir_dfd; } void _ostree_fetcher_set_proxy (OstreeFetcher *self, const char *http_proxy) { - SoupURI *proxy_uri = soup_uri_new (http_proxy); + SoupURI *proxy_uri; + + g_return_if_fail (OSTREE_IS_FETCHER (self)); + g_return_if_fail (http_proxy != NULL); + + proxy_uri = soup_uri_new (http_proxy); + if (!proxy_uri) { g_warning ("Invalid proxy URI '%s'", http_proxy); } else { - g_object_set (self->session, SOUP_SESSION_PROXY_URI, proxy_uri, NULL); - soup_uri_free (proxy_uri); + session_thread_idle_add (self->thread_closure, + session_thread_set_proxy_cb, + proxy_uri, /* takes ownership */ + (GDestroyNotify) soup_uri_free); } } void -_ostree_fetcher_set_client_cert (OstreeFetcher *fetcher, - GTlsCertificate *cert) +_ostree_fetcher_set_client_cert (OstreeFetcher *self, + GTlsCertificate *cert) { - g_clear_object (&fetcher->client_cert); - fetcher->client_cert = g_object_ref (cert); - if (fetcher->client_cert) - { + g_return_if_fail (OSTREE_IS_FETCHER (self)); + g_return_if_fail (G_IS_TLS_CERTIFICATE (cert)); + #ifdef HAVE_LIBSOUP_CLIENT_CERTS - g_autoptr(GTlsInteraction) interaction = - (GTlsInteraction*)_ostree_tls_cert_interaction_new (fetcher->client_cert); - g_object_set (fetcher->session, "tls-interaction", interaction, NULL); + session_thread_idle_add (self->thread_closure, + session_thread_set_tls_interaction_cb, + _ostree_tls_cert_interaction_new (cert), + (GDestroyNotify) g_object_unref); #else - g_warning ("This version of OSTree is compiled without client side certificate support"); + g_warning ("This version of OSTree is compiled without client side certificate support"); #endif - } } void _ostree_fetcher_set_tls_database (OstreeFetcher *self, GTlsDatabase *db) { - if (db) - g_object_set ((GObject*)self->session, "tls-database", db, NULL); - else - g_object_set ((GObject*)self->session, "ssl-use-system-ca-file", TRUE, NULL); -} + g_return_if_fail (OSTREE_IS_FETCHER (self)); + g_return_if_fail (db == NULL || G_IS_TLS_DATABASE (db)); -static void -on_request_sent (GObject *object, GAsyncResult *result, gpointer user_data); - -static void -ostree_fetcher_process_pending_queue (OstreeFetcher *self) -{ - - while (g_queue_peek_head (&self->pending_queue) != NULL && - g_hash_table_size (self->outstanding) < self->max_outstanding) + if (db != NULL) { - GTask *task; - OstreeFetcherPendingURI *pending; - GCancellable *cancellable; - - task = g_queue_pop_head (&self->pending_queue); - - pending = g_task_get_task_data (task); - cancellable = g_task_get_cancellable (task); - - /* pending_uri_free() removes this. */ - g_hash_table_add (self->outstanding, pending); - - soup_request_send_async (pending->request, - cancellable, - on_request_sent, - g_object_ref (task)); - - g_object_unref (task); + session_thread_idle_add (self->thread_closure, + session_thread_set_tls_database_cb, + g_object_ref (db), + (GDestroyNotify) g_object_unref); + } + else + { + session_thread_idle_add (self->thread_closure, + session_thread_set_tls_database_cb, + NULL, (GDestroyNotify) NULL); } } @@ -377,11 +681,17 @@ finish_stream (OstreeFetcherPendingURI *pending, { if (!g_output_stream_close (pending->out_stream, cancellable, error)) goto out; - g_hash_table_remove (pending->self->output_stream_set, pending->out_stream); + + g_mutex_lock (&pending->thread_closure->output_stream_set_lock); + g_hash_table_remove (pending->thread_closure->output_stream_set, + pending->out_stream); + g_mutex_unlock (&pending->thread_closure->output_stream_set_lock); } pending->state = OSTREE_FETCHER_STATE_COMPLETE; - if (fstatat (pending->self->tmpdir_dfd, pending->out_tmpfile, &stbuf, AT_SYMLINK_NOFOLLOW) != 0) + if (fstatat (pending->thread_closure->tmpdir_dfd, + pending->out_tmpfile, + &stbuf, AT_SYMLINK_NOFOLLOW) != 0) { gs_set_error_from_errno (error, errno); goto out; @@ -390,7 +700,7 @@ finish_stream (OstreeFetcherPendingURI *pending, /* Now that we've finished downloading, continue with other queued * requests. */ - ostree_fetcher_process_pending_queue (pending->self); + session_thread_process_pending_queue (pending->thread_closure); if (stbuf.st_size < pending->content_length) { @@ -399,7 +709,9 @@ finish_stream (OstreeFetcherPendingURI *pending, } else { - pending->self->total_downloaded += stbuf.st_size; + g_mutex_lock (&pending->thread_closure->output_stream_set_lock); + pending->thread_closure->total_downloaded += stbuf.st_size; + g_mutex_unlock (&pending->thread_closure->output_stream_set_lock); } ret = TRUE; @@ -593,14 +905,20 @@ on_request_sent (GObject *object, else oflags |= O_TRUNC; - fd = openat (pending->self->tmpdir_dfd, pending->out_tmpfile, oflags, 0666); + fd = openat (pending->thread_closure->tmpdir_dfd, + pending->out_tmpfile, oflags, 0666); if (fd == -1) { gs_set_error_from_errno (&local_error, errno); goto out; } pending->out_stream = g_unix_output_stream_new (fd, TRUE); - g_hash_table_add (pending->self->output_stream_set, g_object_ref (pending->out_stream)); + + g_mutex_lock (&pending->thread_closure->output_stream_set_lock); + g_hash_table_add (pending->thread_closure->output_stream_set, + g_object_ref (pending->out_stream)); + g_mutex_unlock (&pending->thread_closure->output_stream_set_lock); + g_input_stream_read_bytes_async (pending->request_body, 8192, G_PRIORITY_DEFAULT, cancellable, @@ -636,13 +954,15 @@ ostree_fetcher_request_uri_internal (OstreeFetcher *self, gpointer user_data, gpointer source_tag) { - GTask *task; - OstreeFetcherPendingURI *pending = g_new0 (OstreeFetcherPendingURI, 1); - GError *local_error = NULL; + g_autoptr(GTask) task = NULL; + OstreeFetcherPendingURI *pending; - pending->request = soup_requester_request_uri (self->requester, uri, &local_error); + g_return_if_fail (OSTREE_IS_FETCHER (self)); + g_return_if_fail (uri != NULL); - pending->self = g_object_ref (self); + /* SoupRequest is created in session thread. */ + pending = g_new0 (OstreeFetcherPendingURI, 1); + pending->thread_closure = thread_closure_ref (self->thread_closure); pending->uri = soup_uri_copy (uri); pending->max_size = max_size; pending->is_stream = is_stream; @@ -654,55 +974,10 @@ ostree_fetcher_request_uri_internal (OstreeFetcher *self, /* We'll use the GTask priority for our own priority queue. */ g_task_set_priority (task, priority); - if (is_stream) - { - soup_request_send_async (pending->request, - cancellable, - on_request_sent, - g_object_ref (task)); - } - else - { - g_autofree char *uristring = soup_uri_to_string (uri, FALSE); - g_autofree char *tmpfile = NULL; - struct stat stbuf; - gboolean exists; - - tmpfile = g_compute_checksum_for_string (G_CHECKSUM_SHA256, uristring, strlen (uristring)); - - if (fstatat (self->tmpdir_dfd, tmpfile, &stbuf, AT_SYMLINK_NOFOLLOW) == 0) - exists = TRUE; - else - { - if (errno == ENOENT) - exists = FALSE; - else - { - gs_set_error_from_errno (&local_error, errno); - goto out; - } - } - - if (SOUP_IS_REQUEST_HTTP (pending->request)) - { - glnx_unref_object SoupMessage *msg = NULL; - msg = soup_request_http_get_message ((SoupRequestHTTP*) pending->request); - if (exists && stbuf.st_size > 0) - soup_message_headers_set_range (msg->request_headers, stbuf.st_size, -1); - } - pending->out_tmpfile = tmpfile; - tmpfile = NULL; /* Transfer ownership */ - - g_queue_insert_sorted (&self->pending_queue, - g_object_ref (task), - pending_task_compare, NULL); - ostree_fetcher_process_pending_queue (self); - } - - g_assert_no_error (local_error); - -out: - g_object_unref (task); + session_thread_idle_add (self->thread_closure, + session_thread_request_uri, + g_object_ref (task), + (GDestroyNotify) g_object_unref); } void @@ -760,11 +1035,17 @@ ostree_fetcher_stream_uri_finish (OstreeFetcher *self, guint64 _ostree_fetcher_bytes_transferred (OstreeFetcher *self) { - guint64 ret = self->total_downloaded; GHashTableIter hiter; gpointer key, value; + guint64 ret; - g_hash_table_iter_init (&hiter, self->output_stream_set); + g_return_val_if_fail (OSTREE_IS_FETCHER (self), 0); + + g_mutex_lock (&self->thread_closure->output_stream_set_lock); + + ret = self->thread_closure->total_downloaded; + + g_hash_table_iter_init (&hiter, self->thread_closure->output_stream_set); while (g_hash_table_iter_next (&hiter, &key, &value)) { GFileOutputStream *stream = key; @@ -776,7 +1057,9 @@ _ostree_fetcher_bytes_transferred (OstreeFetcher *self) ret += stbuf.st_size; } } - + + g_mutex_unlock (&self->thread_closure->output_stream_set_lock); + return ret; }