ostree/src/libostree/ostree-fetcher-soup.c

1382 lines
46 KiB
C
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/*
* Copyright (C) 2011 Colin Walters <walters@verbum.org>
*
* SPDX-License-Identifier: LGPL-2.0+
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*
* Author: Colin Walters <walters@verbum.org>
*/
#include "config.h"
#include <gio/gio.h>
#include <gio/gfiledescriptorbased.h>
#include <gio/gunixoutputstream.h>
#define LIBSOUP_USE_UNSTABLE_REQUEST_API
#include <libsoup/soup.h>
#include <libsoup/soup-requester.h>
#include <libsoup/soup-request-http.h>
#include "libglnx.h"
#include "ostree-fetcher.h"
#include "ostree-fetcher-util.h"
#ifdef HAVE_LIBSOUP_CLIENT_CERTS
#include "ostree-tls-cert-interaction.h"
#endif
#include "ostree-enumtypes.h"
#include "ostree.h"
#include "ostree-repo-private.h"
#include "otutil.h"
typedef enum {
OSTREE_FETCHER_STATE_PENDING,
OSTREE_FETCHER_STATE_DOWNLOADING,
OSTREE_FETCHER_STATE_COMPLETE
} OstreeFetcherState;
typedef struct {
int ref_count; /* atomic */
SoupSession *session; /* not referenced */
GMainContext *main_context;
volatile gint running;
GError *initialization_error; /* Any failure to load the db */
char *remote_name;
int base_tmpdir_dfd;
GVariant *extra_headers;
gboolean transfer_gzip;
/* Our active HTTP requests */
GHashTable *outstanding;
/* Shared across threads; be sure to lock. */
GHashTable *output_stream_set; /* set<GOutputStream> */
GMutex output_stream_set_lock;
/* Also protected by output_stream_set_lock. */
guint64 total_downloaded;
GError *oob_error;
} ThreadClosure;
typedef struct {
int ref_count; /* atomic */
ThreadClosure *thread_closure;
GPtrArray *mirrorlist; /* list of base URIs */
char *filename; /* relative name to fetch or NULL */
guint mirrorlist_idx;
OstreeFetcherState state;
SoupRequest *request;
gboolean is_membuf;
OstreeFetcherRequestFlags flags;
char *if_none_match; /* request ETag */
guint64 if_modified_since; /* seconds since the epoch */
GInputStream *request_body;
GLnxTmpfile tmpf;
GOutputStream *out_stream;
gboolean out_not_modified; /* TRUE if the server gave a HTTP 304 Not Modified response, which we dont propagate as an error */
char *out_etag; /* response ETag */
guint64 out_last_modified; /* response Last-Modified, seconds since the epoch */
guint64 max_size;
guint64 current_size;
guint64 content_length;
} OstreeFetcherPendingURI;
/* Used by session_thread_idle_add() */
typedef void (*SessionThreadFunc) (ThreadClosure *thread_closure,
gpointer data);
/* Used by session_thread_idle_add() */
typedef struct {
ThreadClosure *thread_closure;
SessionThreadFunc function;
gpointer data;
GDestroyNotify notify;
} IdleClosure;
struct OstreeFetcher
{
GObject parent_instance;
OstreeFetcherConfigFlags config_flags;
GThread *session_thread;
ThreadClosure *thread_closure;
};
enum {
PROP_0,
PROP_CONFIG_FLAGS
};
G_DEFINE_TYPE (OstreeFetcher, _ostree_fetcher, G_TYPE_OBJECT)
static ThreadClosure *
thread_closure_ref (ThreadClosure *thread_closure)
{
int refcount;
g_return_val_if_fail (thread_closure != NULL, NULL);
refcount = g_atomic_int_add (&thread_closure->ref_count, 1);
g_assert (refcount > 0);
return thread_closure;
}
static void
thread_closure_unref (ThreadClosure *thread_closure)
{
g_return_if_fail (thread_closure != NULL);
if (g_atomic_int_dec_and_test (&thread_closure->ref_count))
{
/* The session thread should have cleared this by now. */
g_assert (thread_closure->session == NULL);
g_clear_pointer (&thread_closure->main_context, g_main_context_unref);
g_clear_pointer (&thread_closure->extra_headers, (GDestroyNotify)g_variant_unref);
g_clear_pointer (&thread_closure->output_stream_set, g_hash_table_unref);
g_mutex_clear (&thread_closure->output_stream_set_lock);
g_clear_pointer (&thread_closure->oob_error, g_error_free);
g_free (thread_closure->remote_name);
g_slice_free (ThreadClosure, thread_closure);
}
}
static void
idle_closure_free (IdleClosure *idle_closure)
{
g_clear_pointer (&idle_closure->thread_closure, thread_closure_unref);
if (idle_closure->notify != NULL)
idle_closure->notify (idle_closure->data);
g_slice_free (IdleClosure, idle_closure);
}
static OstreeFetcherPendingURI *
pending_uri_ref (OstreeFetcherPendingURI *pending)
{
gint refcount;
g_return_val_if_fail (pending != NULL, NULL);
refcount = g_atomic_int_add (&pending->ref_count, 1);
g_assert (refcount > 0);
return pending;
}
static void
pending_uri_unref (OstreeFetcherPendingURI *pending)
{
if (!g_atomic_int_dec_and_test (&pending->ref_count))
return;
g_clear_pointer (&pending->thread_closure, thread_closure_unref);
g_clear_pointer (&pending->mirrorlist, g_ptr_array_unref);
g_free (pending->filename);
g_clear_object (&pending->request);
g_clear_object (&pending->request_body);
g_free (pending->if_none_match);
glnx_tmpfile_clear (&pending->tmpf);
g_clear_object (&pending->out_stream);
g_free (pending->out_etag);
g_free (pending);
}
static gboolean
session_thread_idle_dispatch (gpointer data)
{
IdleClosure *idle_closure = data;
idle_closure->function (idle_closure->thread_closure,
idle_closure->data);
return G_SOURCE_REMOVE;
}
static void
session_thread_idle_add (ThreadClosure *thread_closure,
SessionThreadFunc function,
gpointer data,
GDestroyNotify notify)
{
IdleClosure *idle_closure;
g_return_if_fail (thread_closure != NULL);
g_return_if_fail (function != NULL);
idle_closure = g_slice_new (IdleClosure);
idle_closure->thread_closure = thread_closure_ref (thread_closure);
idle_closure->function = function;
idle_closure->data = data;
idle_closure->notify = notify;
g_main_context_invoke_full (thread_closure->main_context,
G_PRIORITY_DEFAULT,
session_thread_idle_dispatch,
idle_closure, /* takes ownership */
(GDestroyNotify) idle_closure_free);
}
static void
session_thread_add_logger (ThreadClosure *thread_closure,
gpointer data)
{
glnx_unref_object SoupLogger *logger = NULL;
logger = soup_logger_new (SOUP_LOGGER_LOG_BODY, 500);
soup_session_add_feature (thread_closure->session,
SOUP_SESSION_FEATURE (logger));
}
static void
session_thread_config_flags (ThreadClosure *thread_closure,
gpointer data)
{
OstreeFetcherConfigFlags config_flags;
config_flags = GPOINTER_TO_UINT (data);
if ((config_flags & OSTREE_FETCHER_FLAGS_TLS_PERMISSIVE) > 0)
{
g_object_set (thread_closure->session,
SOUP_SESSION_SSL_STRICT,
FALSE, NULL);
}
}
static void
on_authenticate (SoupSession *session, SoupMessage *msg, SoupAuth *auth,
gboolean retrying, gpointer user_data)
{
ThreadClosure *thread_closure = user_data;
if (msg->status_code == SOUP_STATUS_PROXY_UNAUTHORIZED)
{
SoupURI *uri = NULL;
g_object_get (session, SOUP_SESSION_PROXY_URI, &uri, NULL);
if (retrying)
{
g_autofree char *s = soup_uri_to_string (uri, FALSE);
g_set_error (&thread_closure->oob_error,
G_IO_ERROR, G_IO_ERROR_PROXY_AUTH_FAILED,
"Invalid username or password for proxy '%s'", s);
}
else
soup_auth_authenticate (auth, soup_uri_get_user (uri),
soup_uri_get_password (uri));
}
}
static void
session_thread_set_proxy_cb (ThreadClosure *thread_closure,
gpointer data)
{
SoupURI *proxy_uri = data;
g_object_set (thread_closure->session,
SOUP_SESSION_PROXY_URI,
proxy_uri, NULL);
/* libsoup won't necessarily pass any embedded username and password to proxy
* requests, so we have to be ready to handle 407 and handle them ourselves.
* See also: https://bugzilla.gnome.org/show_bug.cgi?id=772932
* */
if (soup_uri_get_user (proxy_uri) &&
soup_uri_get_password (proxy_uri))
{
g_signal_connect (thread_closure->session, "authenticate",
G_CALLBACK (on_authenticate), thread_closure);
}
}
static void
session_thread_set_cookie_jar_cb (ThreadClosure *thread_closure,
gpointer data)
{
SoupCookieJar *jar = data;
soup_session_add_feature (thread_closure->session,
SOUP_SESSION_FEATURE (jar));
}
static void
session_thread_set_headers_cb (ThreadClosure *thread_closure,
gpointer data)
{
GVariant *headers = data;
g_clear_pointer (&thread_closure->extra_headers, (GDestroyNotify)g_variant_unref);
thread_closure->extra_headers = g_variant_ref (headers);
}
#ifdef HAVE_LIBSOUP_CLIENT_CERTS
static void
session_thread_set_tls_interaction_cb (ThreadClosure *thread_closure,
gpointer data)
{
const char *cert_and_key_path = data; /* str\0str\0 in one malloc buf */
const char *cert_path = cert_and_key_path;
const char *key_path = cert_and_key_path + strlen (cert_and_key_path) + 1;
g_autoptr(OstreeTlsCertInteraction) interaction = NULL;
/* The GTlsInteraction instance must be created in the
* session thread so it uses the correct GMainContext. */
interaction = _ostree_tls_cert_interaction_new (cert_path, key_path);
g_object_set (thread_closure->session,
SOUP_SESSION_TLS_INTERACTION,
interaction, NULL);
}
#endif
static void
session_thread_set_tls_database_cb (ThreadClosure *thread_closure,
gpointer data)
{
const char *db_path = data;
if (db_path != NULL)
{
glnx_unref_object GTlsDatabase *tlsdb = NULL;
g_clear_error (&thread_closure->initialization_error);
tlsdb = g_tls_file_database_new (db_path, &thread_closure->initialization_error);
if (tlsdb)
g_object_set (thread_closure->session,
SOUP_SESSION_TLS_DATABASE,
tlsdb, NULL);
}
else
{
g_object_set (thread_closure->session,
SOUP_SESSION_SSL_USE_SYSTEM_CA_FILE,
TRUE, NULL);
}
}
static void
session_thread_set_extra_user_agent_cb (ThreadClosure *thread_closure,
gpointer data)
{
const char *extra_user_agent = data;
if (extra_user_agent != NULL)
{
g_autofree char *ua =
g_strdup_printf ("%s %s", OSTREE_FETCHER_USERAGENT_STRING, extra_user_agent);
g_object_set (thread_closure->session, SOUP_SESSION_USER_AGENT, ua, NULL);
}
else
{
g_object_set (thread_closure->session, SOUP_SESSION_USER_AGENT,
OSTREE_FETCHER_USERAGENT_STRING, NULL);
}
}
static void
on_request_sent (GObject *object, GAsyncResult *result, gpointer user_data);
static void
start_pending_request (ThreadClosure *thread_closure,
GTask *task)
{
OstreeFetcherPendingURI *pending;
GCancellable *cancellable;
pending = g_task_get_task_data (task);
cancellable = g_task_get_cancellable (task);
g_hash_table_add (thread_closure->outstanding, pending_uri_ref (pending));
soup_request_send_async (pending->request,
cancellable,
on_request_sent,
g_object_ref (task));
}
static void
create_pending_soup_request (OstreeFetcherPendingURI *pending,
GError **error)
{
OstreeFetcherURI *next_mirror = NULL;
g_autoptr(OstreeFetcherURI) uri = NULL;
g_assert (pending->mirrorlist);
g_assert (pending->mirrorlist_idx < pending->mirrorlist->len);
next_mirror = g_ptr_array_index (pending->mirrorlist, pending->mirrorlist_idx);
if (pending->filename)
uri = _ostree_fetcher_uri_new_subpath (next_mirror, pending->filename);
g_clear_object (&pending->request);
pending->request = soup_session_request_uri (pending->thread_closure->session,
(SoupURI*)(uri ? uri : next_mirror), error);
/* Add caching headers. */
if (SOUP_IS_REQUEST_HTTP (pending->request) && pending->if_none_match != NULL)
{
glnx_unref_object SoupMessage *msg = soup_request_http_get_message ((SoupRequestHTTP*) pending->request);
soup_message_headers_append (msg->request_headers, "If-None-Match", pending->if_none_match);
}
if (SOUP_IS_REQUEST_HTTP (pending->request) && pending->if_modified_since > 0)
{
glnx_unref_object SoupMessage *msg = soup_request_http_get_message ((SoupRequestHTTP*) pending->request);
g_autoptr(GDateTime) date_time = g_date_time_new_from_unix_utc (pending->if_modified_since);
g_autofree char *mod_date = g_date_time_format (date_time, "%a, %d %b %Y %H:%M:%S %Z");
soup_message_headers_append (msg->request_headers, "If-Modified-Since", mod_date);
}
}
static void
session_thread_request_uri (ThreadClosure *thread_closure,
gpointer data)
{
GTask *task = G_TASK (data);
OstreeFetcherPendingURI *pending;
GCancellable *cancellable;
GError *local_error = NULL;
pending = g_task_get_task_data (task);
cancellable = g_task_get_cancellable (task);
/* If we caught an error in init, re-throw it for every request */
if (thread_closure->initialization_error)
{
g_task_return_error (task, g_error_copy (thread_closure->initialization_error));
return;
}
create_pending_soup_request (pending, &local_error);
if (local_error != NULL)
{
g_task_return_error (task, local_error);
return;
}
if (SOUP_IS_REQUEST_HTTP (pending->request) && thread_closure->extra_headers)
{
glnx_unref_object SoupMessage *msg = soup_request_http_get_message ((SoupRequestHTTP*) pending->request);
g_autoptr(GVariantIter) viter = g_variant_iter_new (thread_closure->extra_headers);
const char *key;
const char *value;
while (g_variant_iter_next (viter, "(&s&s)", &key, &value))
soup_message_headers_append (msg->request_headers, key, value);
}
if (pending->is_membuf)
{
soup_request_send_async (pending->request,
cancellable,
on_request_sent,
g_object_ref (task));
}
else
{
start_pending_request (thread_closure, task);
}
}
static gpointer
ostree_fetcher_session_thread (gpointer data)
{
ThreadClosure *closure = data;
g_autoptr(GMainContext) mainctx = g_main_context_ref (closure->main_context);
/* This becomes the GMainContext that SoupSession schedules async
* callbacks and emits signals from. Make it the thread-default
* context for this thread before creating the session. */
g_main_context_push_thread_default (mainctx);
/* We retain ownership of the SoupSession reference. */
closure->session = soup_session_async_new_with_options (SOUP_SESSION_USER_AGENT, OSTREE_FETCHER_USERAGENT_STRING,
SOUP_SESSION_SSL_USE_SYSTEM_CA_FILE, TRUE,
SOUP_SESSION_USE_THREAD_CONTEXT, TRUE,
SOUP_SESSION_ADD_FEATURE_BY_TYPE, SOUP_TYPE_REQUESTER,
SOUP_SESSION_TIMEOUT, 60,
SOUP_SESSION_IDLE_TIMEOUT, 60,
NULL);
if (closure->transfer_gzip)
soup_session_add_feature_by_type (closure->session, SOUP_TYPE_CONTENT_DECODER);
/* XXX: Now that we have mirrorlist support, we could make this even smarter
* by spreading requests across mirrors. */
gint max_conns;
g_object_get (closure->session, "max-conns-per-host", &max_conns, NULL);
if (max_conns < _OSTREE_MAX_OUTSTANDING_FETCHER_REQUESTS)
{
/* We download a lot of small objects in ostree, so this
* helps a lot. Also matches what most modern browsers do.
*
* Note since https://github.com/ostreedev/ostree/commit/f4d1334e19ce3ab2f8872b1e28da52044f559401
* we don't do queuing in this libsoup backend, but we still
* want to override libsoup's currently conservative
* #define SOUP_SESSION_MAX_CONNS_PER_HOST_DEFAULT 2 (as of 2018-02-14).
*/
max_conns = _OSTREE_MAX_OUTSTANDING_FETCHER_REQUESTS;
g_object_set (closure->session,
"max-conns-per-host",
max_conns, NULL);
}
/* This model ensures we don't hit a race using g_main_loop_quit();
* see also what pull_termination_condition() in ostree-repo-pull.c
* is doing.
*/
while (g_atomic_int_get (&closure->running))
g_main_context_iteration (closure->main_context, TRUE);
/* Since the ThreadClosure may be finalized from any thread we
* unreference all data related to the SoupSession ourself to ensure
* it's freed in the same thread where it was created. */
g_clear_pointer (&closure->outstanding, g_hash_table_unref);
g_clear_pointer (&closure->session, g_object_unref);
thread_closure_unref (closure);
/* Do this last, since libsoup uses g_main_current_source() which
* relies on it.
*/
g_main_context_pop_thread_default (mainctx);
return NULL;
}
static void
_ostree_fetcher_set_property (GObject *object,
guint prop_id,
const GValue *value,
GParamSpec *pspec)
{
OstreeFetcher *self = OSTREE_FETCHER (object);
switch (prop_id)
{
case PROP_CONFIG_FLAGS:
self->config_flags = g_value_get_flags (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static void
_ostree_fetcher_get_property (GObject *object,
guint prop_id,
GValue *value,
GParamSpec *pspec)
{
OstreeFetcher *self = OSTREE_FETCHER (object);
switch (prop_id)
{
case PROP_CONFIG_FLAGS:
g_value_set_flags (value, self->config_flags);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static void
_ostree_fetcher_finalize (GObject *object)
{
OstreeFetcher *self = OSTREE_FETCHER (object);
/* Terminate the session thread. */
g_atomic_int_set (&self->thread_closure->running, 0);
g_main_context_wakeup (self->thread_closure->main_context);
if (self->session_thread)
{
/* We need to explicitly synchronize to clean up TLS */
if (self->session_thread != g_thread_self ())
g_thread_join (self->session_thread);
else
g_clear_pointer (&self->session_thread, g_thread_unref);
}
g_clear_pointer (&self->thread_closure, thread_closure_unref);
G_OBJECT_CLASS (_ostree_fetcher_parent_class)->finalize (object);
}
static void
_ostree_fetcher_constructed (GObject *object)
{
OstreeFetcher *self = OSTREE_FETCHER (object);
g_autoptr(GMainContext) main_context = NULL;
const char *http_proxy;
main_context = g_main_context_new ();
self->thread_closure = g_slice_new0 (ThreadClosure);
self->thread_closure->ref_count = 1;
self->thread_closure->main_context = g_main_context_ref (main_context);
self->thread_closure->running = 1;
self->thread_closure->transfer_gzip = (self->config_flags & OSTREE_FETCHER_FLAGS_TRANSFER_GZIP) != 0;
self->thread_closure->outstanding = g_hash_table_new_full (NULL, NULL, NULL, (GDestroyNotify)pending_uri_unref);
self->thread_closure->output_stream_set = g_hash_table_new_full (NULL, NULL,
(GDestroyNotify) NULL,
(GDestroyNotify) g_object_unref);
g_mutex_init (&self->thread_closure->output_stream_set_lock);
if (g_getenv ("OSTREE_DEBUG_HTTP"))
{
session_thread_idle_add (self->thread_closure,
session_thread_add_logger,
NULL, (GDestroyNotify) NULL);
}
if (self->config_flags != 0)
{
session_thread_idle_add (self->thread_closure,
session_thread_config_flags,
GUINT_TO_POINTER (self->config_flags),
(GDestroyNotify) NULL);
}
http_proxy = g_getenv ("http_proxy");
if (http_proxy != NULL && http_proxy[0] != '\0')
_ostree_fetcher_set_proxy (self, http_proxy);
/* FIXME Maybe implement GInitableIface and use g_thread_try_new()
* so we can try to handle thread creation errors gracefully? */
self->session_thread = g_thread_new ("fetcher-session-thread",
ostree_fetcher_session_thread,
thread_closure_ref (self->thread_closure));
G_OBJECT_CLASS (_ostree_fetcher_parent_class)->constructed (object);
}
static void
_ostree_fetcher_class_init (OstreeFetcherClass *klass)
{
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
gobject_class->set_property = _ostree_fetcher_set_property;
gobject_class->get_property = _ostree_fetcher_get_property;
gobject_class->finalize = _ostree_fetcher_finalize;
gobject_class->constructed = _ostree_fetcher_constructed;
g_object_class_install_property (gobject_class,
PROP_CONFIG_FLAGS,
g_param_spec_flags ("config-flags",
"",
"",
OSTREE_TYPE_FETCHER_CONFIG_FLAGS,
OSTREE_FETCHER_FLAGS_NONE,
G_PARAM_READWRITE |
G_PARAM_CONSTRUCT_ONLY |
G_PARAM_STATIC_STRINGS));
}
static void
_ostree_fetcher_init (OstreeFetcher *self)
{
}
OstreeFetcher *
_ostree_fetcher_new (int tmpdir_dfd,
const char *remote_name,
OstreeFetcherConfigFlags flags)
{
OstreeFetcher *self;
self = g_object_new (OSTREE_TYPE_FETCHER, "config-flags", flags, NULL);
self->thread_closure->remote_name = g_strdup (remote_name);
self->thread_closure->base_tmpdir_dfd = tmpdir_dfd;
return self;
}
int
_ostree_fetcher_get_dfd (OstreeFetcher *fetcher)
{
return fetcher->thread_closure->base_tmpdir_dfd;
}
void
_ostree_fetcher_set_proxy (OstreeFetcher *self,
const char *http_proxy)
{
SoupURI *proxy_uri;
g_return_if_fail (OSTREE_IS_FETCHER (self));
g_return_if_fail (http_proxy != NULL && http_proxy[0] != '\0');
proxy_uri = soup_uri_new (http_proxy);
if (!proxy_uri)
{
g_warning ("Invalid proxy URI '%s'", http_proxy);
}
else
{
session_thread_idle_add (self->thread_closure,
session_thread_set_proxy_cb,
proxy_uri, /* takes ownership */
(GDestroyNotify) soup_uri_free);
}
}
void
_ostree_fetcher_set_cookie_jar (OstreeFetcher *self,
const char *jar_path)
{
SoupCookieJar *jar;
g_return_if_fail (OSTREE_IS_FETCHER (self));
g_return_if_fail (jar_path != NULL);
jar = soup_cookie_jar_text_new (jar_path, TRUE);
session_thread_idle_add (self->thread_closure,
session_thread_set_cookie_jar_cb,
jar, /* takes ownership */
(GDestroyNotify) g_object_unref);
}
void
_ostree_fetcher_set_client_cert (OstreeFetcher *self,
const char *cert_path,
const char *key_path)
{
g_autoptr(GString) buf = NULL;
g_return_if_fail (OSTREE_IS_FETCHER (self));
if (cert_path)
{
buf = g_string_new (cert_path);
g_string_append_c (buf, '\0');
g_string_append (buf, key_path);
}
#ifdef HAVE_LIBSOUP_CLIENT_CERTS
session_thread_idle_add (self->thread_closure,
session_thread_set_tls_interaction_cb,
g_string_free (g_steal_pointer (&buf), FALSE),
(GDestroyNotify) g_free);
#else
g_warning ("This version of OSTree is compiled without client side certificate support");
#endif
}
void
_ostree_fetcher_set_tls_database (OstreeFetcher *self,
const char *tlsdb_path)
{
g_return_if_fail (OSTREE_IS_FETCHER (self));
session_thread_idle_add (self->thread_closure,
session_thread_set_tls_database_cb,
g_strdup (tlsdb_path),
(GDestroyNotify) g_free);
}
void
_ostree_fetcher_set_extra_headers (OstreeFetcher *self,
GVariant *extra_headers)
{
session_thread_idle_add (self->thread_closure,
session_thread_set_headers_cb,
g_variant_ref (extra_headers),
(GDestroyNotify) g_variant_unref);
}
void
_ostree_fetcher_set_extra_user_agent (OstreeFetcher *self,
const char *extra_user_agent)
{
session_thread_idle_add (self->thread_closure,
session_thread_set_extra_user_agent_cb,
g_strdup (extra_user_agent),
(GDestroyNotify) g_free);
}
static gboolean
finish_stream (OstreeFetcherPendingURI *pending,
GCancellable *cancellable,
GError **error)
{
gboolean ret = FALSE;
struct stat stbuf;
/* Close it here since we do an async fstat(), where we don't want
* to hit a bad fd.
*/
if (pending->out_stream)
{
if ((pending->flags & OSTREE_FETCHER_REQUEST_NUL_TERMINATION) > 0)
{
const guint8 nulchar = 0;
gsize bytes_written;
if (!g_output_stream_write_all (pending->out_stream, &nulchar, 1, &bytes_written,
cancellable, error))
goto out;
}
if (!g_output_stream_close (pending->out_stream, cancellable, error))
goto out;
g_mutex_lock (&pending->thread_closure->output_stream_set_lock);
g_hash_table_remove (pending->thread_closure->output_stream_set,
pending->out_stream);
g_mutex_unlock (&pending->thread_closure->output_stream_set_lock);
}
if (!pending->is_membuf)
{
if (!glnx_fstat (pending->tmpf.fd, &stbuf, error))
goto out;
}
pending->state = OSTREE_FETCHER_STATE_COMPLETE;
if (!pending->is_membuf)
{
if (stbuf.st_size < pending->content_length)
{
g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "Download incomplete");
goto out;
}
else
{
g_mutex_lock (&pending->thread_closure->output_stream_set_lock);
pending->thread_closure->total_downloaded += stbuf.st_size;
g_mutex_unlock (&pending->thread_closure->output_stream_set_lock);
}
}
ret = TRUE;
out:
(void) g_input_stream_close (pending->request_body, NULL, NULL);
return ret;
}
static void
on_stream_read (GObject *object,
GAsyncResult *result,
gpointer user_data);
static void
remove_pending (OstreeFetcherPendingURI *pending)
{
/* Hold a temporary ref to ensure the reference to
* pending->thread_closure is valid.
*/
pending_uri_ref (pending);
g_hash_table_remove (pending->thread_closure->outstanding, pending);
pending_uri_unref (pending);
}
static void
on_out_splice_complete (GObject *object,
GAsyncResult *result,
gpointer user_data)
{
GTask *task = G_TASK (user_data);
OstreeFetcherPendingURI *pending;
GCancellable *cancellable;
gssize bytes_written;
GError *local_error = NULL;
pending = g_task_get_task_data (task);
cancellable = g_task_get_cancellable (task);
bytes_written = g_output_stream_splice_finish ((GOutputStream *)object,
result,
&local_error);
if (bytes_written < 0)
goto out;
g_input_stream_read_bytes_async (pending->request_body,
8192, G_PRIORITY_DEFAULT,
cancellable,
on_stream_read,
g_object_ref (task));
out:
if (local_error)
{
g_task_return_error (task, local_error);
remove_pending (pending);
}
g_object_unref (task);
}
static void
on_stream_read (GObject *object,
GAsyncResult *result,
gpointer user_data)
{
GTask *task = G_TASK (user_data);
OstreeFetcherPendingURI *pending;
GCancellable *cancellable;
g_autoptr(GBytes) bytes = NULL;
gsize bytes_read;
GError *local_error = NULL;
pending = g_task_get_task_data (task);
cancellable = g_task_get_cancellable (task);
/* Only open the output stream on demand to ensure we use as
* few file descriptors as possible.
*/
if (!pending->out_stream)
{
if (!pending->is_membuf)
{
if (!_ostree_fetcher_tmpf_from_flags (pending->flags, pending->thread_closure->base_tmpdir_dfd,
&pending->tmpf, &local_error))
goto out;
pending->out_stream = g_unix_output_stream_new (pending->tmpf.fd, FALSE);
}
else
{
pending->out_stream = g_memory_output_stream_new_resizable ();
}
g_mutex_lock (&pending->thread_closure->output_stream_set_lock);
g_hash_table_add (pending->thread_closure->output_stream_set,
g_object_ref (pending->out_stream));
g_mutex_unlock (&pending->thread_closure->output_stream_set_lock);
}
/* Get a GBytes buffer */
bytes = g_input_stream_read_bytes_finish ((GInputStream*)object, result, &local_error);
if (!bytes)
goto out;
bytes_read = g_bytes_get_size (bytes);
/* Was this the end of the stream? */
if (bytes_read == 0)
{
if (!finish_stream (pending, cancellable, &local_error))
goto out;
if (pending->is_membuf)
{
g_task_return_pointer (task,
g_memory_output_stream_steal_as_bytes ((GMemoryOutputStream*)pending->out_stream),
(GDestroyNotify) g_bytes_unref);
}
else
{
if (lseek (pending->tmpf.fd, 0, SEEK_SET) < 0)
{
glnx_set_error_from_errno (&local_error);
g_task_return_error (task, g_steal_pointer (&local_error));
}
else
g_task_return_boolean (task, TRUE);
}
remove_pending (pending);
}
else
{
/* Verify max size */
if (pending->max_size > 0)
{
if (bytes_read > pending->max_size ||
(bytes_read + pending->current_size) > pending->max_size)
{
g_autofree char *uristr =
soup_uri_to_string (soup_request_get_uri (pending->request), FALSE);
local_error = g_error_new (G_IO_ERROR, G_IO_ERROR_FAILED,
"URI %s exceeded maximum size of %" G_GUINT64_FORMAT " bytes",
uristr, pending->max_size);
goto out;
}
}
pending->current_size += bytes_read;
/* We do this instead of _write_bytes_async() as that's not
* guaranteed to do a complete write.
*/
{
g_autoptr(GInputStream) membuf =
g_memory_input_stream_new_from_bytes (bytes);
g_output_stream_splice_async (pending->out_stream, membuf,
G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE,
G_PRIORITY_DEFAULT,
cancellable,
on_out_splice_complete,
g_object_ref (task));
}
}
out:
if (local_error)
{
g_task_return_error (task, local_error);
remove_pending (pending);
}
g_object_unref (task);
}
static void
on_request_sent (GObject *object,
GAsyncResult *result,
gpointer user_data)
{
GTask *task = G_TASK (user_data);
/* Hold a ref to the pending across this function, since we remove
* it from the hash early in some cases, not in others. */
OstreeFetcherPendingURI *pending = pending_uri_ref (g_task_get_task_data (task));
GCancellable *cancellable = g_task_get_cancellable (task);
GError *local_error = NULL;
glnx_unref_object SoupMessage *msg = NULL;
pending->state = OSTREE_FETCHER_STATE_COMPLETE;
pending->request_body = soup_request_send_finish ((SoupRequest*) object,
result, &local_error);
if (!pending->request_body)
goto out;
g_assert_no_error (local_error);
if (SOUP_IS_REQUEST_HTTP (object))
{
msg = soup_request_http_get_message ((SoupRequestHTTP*) object);
if (msg->status_code == SOUP_STATUS_NOT_MODIFIED &&
(pending->if_none_match != NULL || pending->if_modified_since > 0))
{
/* Version on the server is unchanged from the version we have cached locally;
* report this as an out-argument, a zero-length response buffer, and no error */
pending->out_not_modified = TRUE;
}
else if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code))
{
/* is there another mirror we can try? */
if (pending->mirrorlist_idx + 1 < pending->mirrorlist->len)
{
pending->mirrorlist_idx++;
create_pending_soup_request (pending, &local_error);
if (local_error != NULL)
goto out;
(void) g_input_stream_close (pending->request_body, NULL, NULL);
start_pending_request (pending->thread_closure, task);
}
else
{
g_autofree char *uristring =
soup_uri_to_string (soup_request_get_uri (pending->request), FALSE);
GIOErrorEnum code;
switch (msg->status_code)
{
/* These statuses are internal to libsoup, and not standard HTTP ones: */
case SOUP_STATUS_CANCELLED:
code = G_IO_ERROR_CANCELLED;
break;
case SOUP_STATUS_CANT_RESOLVE:
case SOUP_STATUS_CANT_CONNECT:
code = G_IO_ERROR_HOST_NOT_FOUND;
break;
case SOUP_STATUS_IO_ERROR:
#if !GLIB_CHECK_VERSION(2, 44, 0)
code = G_IO_ERROR_BROKEN_PIPE;
#else
code = G_IO_ERROR_CONNECTION_CLOSED;
#endif
break;
default:
code = _ostree_fetcher_http_status_code_to_io_error (msg->status_code);
break;
}
{
g_autofree char *errmsg =
g_strdup_printf ("Server returned status %u: %s",
msg->status_code,
soup_status_get_phrase (msg->status_code));
/* Let's make OOB errors be the final one since they're probably
* the cause for the error here. */
if (pending->thread_closure->oob_error)
{
local_error =
g_error_copy (pending->thread_closure->oob_error);
g_prefix_error (&local_error, "%s: ", errmsg);
}
else
local_error = g_error_new_literal (G_IO_ERROR, code, errmsg);
}
if (pending->mirrorlist->len > 1)
g_prefix_error (&local_error,
"All %u mirrors failed. Last error was: ",
pending->mirrorlist->len);
if (pending->thread_closure->remote_name &&
!((pending->flags & OSTREE_FETCHER_REQUEST_OPTIONAL_CONTENT) > 0 &&
code == G_IO_ERROR_NOT_FOUND))
_ostree_fetcher_journal_failure (pending->thread_closure->remote_name,
uristring, local_error->message);
}
goto out;
}
/* Grab cache properties from the response */
pending->out_etag = g_strdup (soup_message_headers_get_one (msg->response_headers, "ETag"));
pending->out_last_modified = 0;
const char *last_modified_str = soup_message_headers_get_one (msg->response_headers, "Last-Modified");
if (last_modified_str != NULL)
{
SoupDate *soup_date = soup_date_new_from_string (last_modified_str);
if (soup_date != NULL)
{
pending->out_last_modified = soup_date_to_time_t (soup_date);
soup_date_free (soup_date);
}
}
}
pending->state = OSTREE_FETCHER_STATE_DOWNLOADING;
pending->content_length = soup_request_get_content_length (pending->request);
g_input_stream_read_bytes_async (pending->request_body,
8192, G_PRIORITY_DEFAULT,
cancellable,
on_stream_read,
g_object_ref (task));
out:
if (local_error)
{
if (pending->request_body)
(void) g_input_stream_close (pending->request_body, NULL, NULL);
g_task_return_error (task, local_error);
remove_pending (pending);
}
pending_uri_unref (pending);
g_object_unref (task);
}
static void
_ostree_fetcher_request_async (OstreeFetcher *self,
GPtrArray *mirrorlist,
const char *filename,
OstreeFetcherRequestFlags flags,
const char *if_none_match,
guint64 if_modified_since,
gboolean is_membuf,
guint64 max_size,
int priority,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data)
{
g_autoptr(GTask) task = NULL;
OstreeFetcherPendingURI *pending;
g_return_if_fail (OSTREE_IS_FETCHER (self));
g_return_if_fail (mirrorlist != NULL);
g_return_if_fail (mirrorlist->len > 0);
/* SoupRequest is created in session thread. */
pending = g_new0 (OstreeFetcherPendingURI, 1);
pending->ref_count = 1;
pending->thread_closure = thread_closure_ref (self->thread_closure);
pending->mirrorlist = g_ptr_array_ref (mirrorlist);
pending->filename = g_strdup (filename);
pending->flags = flags;
pending->if_none_match = g_strdup (if_none_match);
pending->if_modified_since = if_modified_since;
pending->max_size = max_size;
pending->is_membuf = is_membuf;
task = g_task_new (self, cancellable, callback, user_data);
g_task_set_source_tag (task, _ostree_fetcher_request_async);
g_task_set_task_data (task, pending, (GDestroyNotify) pending_uri_unref);
/* We'll use the GTask priority for our own priority queue. */
g_task_set_priority (task, priority);
session_thread_idle_add (self->thread_closure,
session_thread_request_uri,
g_object_ref (task),
(GDestroyNotify) g_object_unref);
}
void
_ostree_fetcher_request_to_tmpfile (OstreeFetcher *self,
GPtrArray *mirrorlist,
const char *filename,
OstreeFetcherRequestFlags flags,
const char *if_none_match,
guint64 if_modified_since,
guint64 max_size,
int priority,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data)
{
_ostree_fetcher_request_async (self, mirrorlist, filename, flags,
if_none_match, if_modified_since, FALSE,
max_size, priority, cancellable,
callback, user_data);
}
gboolean
_ostree_fetcher_request_to_tmpfile_finish (OstreeFetcher *self,
GAsyncResult *result,
GLnxTmpfile *out_tmpf,
gboolean *out_not_modified,
char **out_etag,
guint64 *out_last_modified,
GError **error)
{
GTask *task;
OstreeFetcherPendingURI *pending;
gpointer ret;
g_return_val_if_fail (g_task_is_valid (result, self), FALSE);
g_return_val_if_fail (g_async_result_is_tagged (result, _ostree_fetcher_request_async), FALSE);
task = (GTask*)result;
pending = g_task_get_task_data (task);
ret = g_task_propagate_pointer (task, error);
if (!ret)
return FALSE;
g_assert (!pending->is_membuf);
*out_tmpf = pending->tmpf;
pending->tmpf.initialized = FALSE; /* Transfer ownership */
if (out_not_modified != NULL)
*out_not_modified = pending->out_not_modified;
if (out_etag != NULL)
*out_etag = g_steal_pointer (&pending->out_etag);
if (out_last_modified != NULL)
*out_last_modified = pending->out_last_modified;
return TRUE;
}
void
_ostree_fetcher_request_to_membuf (OstreeFetcher *self,
GPtrArray *mirrorlist,
const char *filename,
OstreeFetcherRequestFlags flags,
const char *if_none_match,
guint64 if_modified_since,
guint64 max_size,
int priority,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data)
{
_ostree_fetcher_request_async (self, mirrorlist, filename, flags,
if_none_match, if_modified_since, TRUE,
max_size, priority, cancellable,
callback, user_data);
}
gboolean
_ostree_fetcher_request_to_membuf_finish (OstreeFetcher *self,
GAsyncResult *result,
GBytes **out_buf,
gboolean *out_not_modified,
char **out_etag,
guint64 *out_last_modified,
GError **error)
{
GTask *task;
OstreeFetcherPendingURI *pending;
gpointer ret;
g_return_val_if_fail (g_task_is_valid (result, self), FALSE);
g_return_val_if_fail (g_async_result_is_tagged (result, _ostree_fetcher_request_async), FALSE);
task = (GTask*)result;
pending = g_task_get_task_data (task);
ret = g_task_propagate_pointer (task, error);
if (!ret)
return FALSE;
g_assert (pending->is_membuf);
g_assert (out_buf);
*out_buf = ret;
if (out_not_modified != NULL)
*out_not_modified = pending->out_not_modified;
if (out_etag != NULL)
*out_etag = g_steal_pointer (&pending->out_etag);
if (out_last_modified != NULL)
*out_last_modified = pending->out_last_modified;
return TRUE;
}
guint64
_ostree_fetcher_bytes_transferred (OstreeFetcher *self)
{
g_return_val_if_fail (OSTREE_IS_FETCHER (self), 0);
g_mutex_lock (&self->thread_closure->output_stream_set_lock);
guint64 ret = self->thread_closure->total_downloaded;
GLNX_HASH_TABLE_FOREACH (self->thread_closure->output_stream_set,
GFileOutputStream*, stream)
{
if (G_IS_FILE_DESCRIPTOR_BASED (stream))
{
int fd = g_file_descriptor_based_get_fd ((GFileDescriptorBased*)stream);
struct stat stbuf;
if (glnx_fstat (fd, &stbuf, NULL))
ret += stbuf.st_size;
}
}
g_mutex_unlock (&self->thread_closure->output_stream_set_lock);
return ret;
}