ostree/src/libostree/ostree-fetcher.c

878 lines
27 KiB
C

/* -*- mode: C; c-file-style: "gnu"; indent-tabs-mode: nil; -*-
*
* Copyright (C) 2011 Colin Walters <walters@verbum.org>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*
* Author: Colin Walters <walters@verbum.org>
*/
#include "config.h"
#include <gio/gfiledescriptorbased.h>
#include <gio/gunixoutputstream.h>
#include "ostree-fetcher.h"
#ifdef HAVE_LIBSOUP_CLIENT_CERTS
#include "ostree-tls-cert-interaction.h"
#endif
#include "ostree-enumtypes.h"
#include "ostree.h"
#include "ostree-repo-private.h"
#include "otutil.h"
typedef enum {
OSTREE_FETCHER_STATE_PENDING,
OSTREE_FETCHER_STATE_DOWNLOADING,
OSTREE_FETCHER_STATE_COMPLETE
} OstreeFetcherState;
typedef struct {
OstreeFetcher *self;
SoupURI *uri;
OstreeFetcherState state;
SoupRequest *request;
gboolean is_stream;
GInputStream *request_body;
char *out_tmpfile;
GOutputStream *out_stream;
guint64 max_size;
guint64 current_size;
guint64 content_length;
GTask *task;
} OstreeFetcherPendingURI;
struct OstreeFetcher
{
GObject parent_instance;
OstreeFetcherConfigFlags config_flags;
int tmpdir_dfd;
char *tmpdir_name;
GLnxLockFile tmpdir_lock;
int base_tmpdir_dfd;
GTlsCertificate *client_cert;
SoupSession *session;
SoupRequester *requester;
GHashTable *output_stream_set; /* set<GOutputStream> */
guint64 total_downloaded;
/* Queue for libsoup, see bgo#708591 */
GQueue pending_queue;
GHashTable *outstanding;
gint max_outstanding;
};
enum {
PROP_0,
PROP_CONFIG_FLAGS
};
G_DEFINE_TYPE (OstreeFetcher, _ostree_fetcher, G_TYPE_OBJECT)
static int
pending_task_compare (gconstpointer a,
gconstpointer b,
gpointer unused)
{
gint priority_a = g_task_get_priority (G_TASK (a));
gint priority_b = g_task_get_priority (G_TASK (b));
return (priority_a == priority_b) ? 0 :
(priority_a < priority_b) ? -1 : 1;
}
static void
pending_uri_free (OstreeFetcherPendingURI *pending)
{
g_hash_table_remove (pending->self->outstanding, pending);
soup_uri_free (pending->uri);
g_clear_object (&pending->self);
g_clear_object (&pending->request);
g_clear_object (&pending->request_body);
g_free (pending->out_tmpfile);
g_clear_object (&pending->out_stream);
g_free (pending);
}
static void
_ostree_fetcher_set_property (GObject *object,
guint prop_id,
const GValue *value,
GParamSpec *pspec)
{
OstreeFetcher *self = OSTREE_FETCHER (object);
switch (prop_id)
{
case PROP_CONFIG_FLAGS:
self->config_flags = g_value_get_flags (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static void
_ostree_fetcher_get_property (GObject *object,
guint prop_id,
GValue *value,
GParamSpec *pspec)
{
OstreeFetcher *self = OSTREE_FETCHER (object);
switch (prop_id)
{
case PROP_CONFIG_FLAGS:
g_value_set_flags (value, self->config_flags);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static void
_ostree_fetcher_finalize (GObject *object)
{
OstreeFetcher *self;
self = OSTREE_FETCHER (object);
if (self->tmpdir_dfd != -1)
close (self->tmpdir_dfd);
/* Note: We don't remove the tmpdir here, because that would cause
us to not reuse it on resume. This happens because we use two
fetchers for each pull, so finalizing the first one would remove
all the files to be resumed from the previous second one */
g_free (self->tmpdir_name);
glnx_release_lock_file (&self->tmpdir_lock);
g_clear_object (&self->session);
g_clear_object (&self->client_cert);
g_hash_table_destroy (self->output_stream_set);
while (!g_queue_is_empty (&self->pending_queue))
g_object_unref (g_queue_pop_head (&self->pending_queue));
g_hash_table_destroy (self->outstanding);
G_OBJECT_CLASS (_ostree_fetcher_parent_class)->finalize (object);
}
static void
_ostree_fetcher_class_init (OstreeFetcherClass *klass)
{
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
gobject_class->set_property = _ostree_fetcher_set_property;
gobject_class->get_property = _ostree_fetcher_get_property;
gobject_class->finalize = _ostree_fetcher_finalize;
g_object_class_install_property (gobject_class,
PROP_CONFIG_FLAGS,
g_param_spec_flags ("config-flags",
"",
"",
OSTREE_TYPE_FETCHER_CONFIG_FLAGS,
OSTREE_FETCHER_FLAGS_NONE,
G_PARAM_READWRITE |
G_PARAM_CONSTRUCT_ONLY |
G_PARAM_STATIC_STRINGS));
}
static void
_ostree_fetcher_init (OstreeFetcher *self)
{
gint max_conns;
const char *http_proxy;
GLnxLockFile empty_lockfile = GLNX_LOCK_FILE_INIT;
g_queue_init (&self->pending_queue);
self->session = soup_session_async_new_with_options (SOUP_SESSION_USER_AGENT, "ostree ",
SOUP_SESSION_SSL_USE_SYSTEM_CA_FILE, TRUE,
SOUP_SESSION_USE_THREAD_CONTEXT, TRUE,
SOUP_SESSION_ADD_FEATURE_BY_TYPE, SOUP_TYPE_REQUESTER,
SOUP_SESSION_TIMEOUT, 60,
SOUP_SESSION_IDLE_TIMEOUT, 60,
NULL);
http_proxy = g_getenv ("http_proxy");
if (http_proxy)
{
_ostree_fetcher_set_proxy (self, http_proxy);
}
if (g_getenv ("OSTREE_DEBUG_HTTP"))
soup_session_add_feature (self->session, (SoupSessionFeature*)soup_logger_new (SOUP_LOGGER_LOG_BODY, 500));
if ((self->config_flags & OSTREE_FETCHER_FLAGS_TLS_PERMISSIVE) > 0)
g_object_set (self->session, SOUP_SESSION_SSL_STRICT, FALSE, NULL);
self->requester = (SoupRequester *)soup_session_get_feature (self->session, SOUP_TYPE_REQUESTER);
g_object_get (self->session, "max-conns-per-host", &max_conns, NULL);
if (max_conns <= 8)
{
// We download a lot of small objects in ostree, so this helps a
// lot. Also matches what most modern browsers do.
max_conns = 8;
g_object_set (self->session, "max-conns-per-host", max_conns, NULL);
}
self->max_outstanding = 3 * max_conns;
self->output_stream_set = g_hash_table_new_full (NULL, NULL, NULL, (GDestroyNotify)g_object_unref);
self->outstanding = g_hash_table_new_full (NULL, NULL, NULL, NULL);
self->tmpdir_dfd = -1;
self->tmpdir_lock = empty_lockfile;
}
OstreeFetcher *
_ostree_fetcher_new (int tmpdir_dfd,
OstreeFetcherConfigFlags flags,
GCancellable *cancellable,
GError **error)
{
OstreeFetcher *self;
self = g_object_new (OSTREE_TYPE_FETCHER, "config-flags", flags, NULL);
if (!_ostree_repo_allocate_tmpdir (tmpdir_dfd,
"fetcher-",
&self->tmpdir_name,
&self->tmpdir_dfd,
&self->tmpdir_lock,
NULL,
cancellable, error))
return NULL;
self->tmpdir_dfd = tmpdir_dfd;
return self;
}
int
_ostree_fetcher_get_dfd (OstreeFetcher *fetcher)
{
return fetcher->tmpdir_dfd;
}
void
_ostree_fetcher_set_proxy (OstreeFetcher *self,
const char *http_proxy)
{
SoupURI *proxy_uri = soup_uri_new (http_proxy);
if (!proxy_uri)
{
g_warning ("Invalid proxy URI '%s'", http_proxy);
}
else
{
g_object_set (self->session, SOUP_SESSION_PROXY_URI, proxy_uri, NULL);
soup_uri_free (proxy_uri);
}
}
void
_ostree_fetcher_set_client_cert (OstreeFetcher *fetcher,
GTlsCertificate *cert)
{
g_clear_object (&fetcher->client_cert);
fetcher->client_cert = g_object_ref (cert);
if (fetcher->client_cert)
{
#ifdef HAVE_LIBSOUP_CLIENT_CERTS
g_autoptr(GTlsInteraction) interaction =
(GTlsInteraction*)_ostree_tls_cert_interaction_new (fetcher->client_cert);
g_object_set (fetcher->session, "tls-interaction", interaction, NULL);
#else
g_warning ("This version of OSTree is compiled without client side certificate support");
#endif
}
}
void
_ostree_fetcher_set_tls_database (OstreeFetcher *self,
GTlsDatabase *db)
{
if (db)
g_object_set ((GObject*)self->session, "tls-database", db, NULL);
else
g_object_set ((GObject*)self->session, "ssl-use-system-ca-file", TRUE, NULL);
}
static void
on_request_sent (GObject *object, GAsyncResult *result, gpointer user_data);
static void
ostree_fetcher_process_pending_queue (OstreeFetcher *self)
{
while (g_queue_peek_head (&self->pending_queue) != NULL &&
g_hash_table_size (self->outstanding) < self->max_outstanding)
{
GTask *task;
OstreeFetcherPendingURI *pending;
GCancellable *cancellable;
task = g_queue_pop_head (&self->pending_queue);
pending = g_task_get_task_data (task);
cancellable = g_task_get_cancellable (task);
/* pending_uri_free() removes this. */
g_hash_table_add (self->outstanding, pending);
soup_request_send_async (pending->request,
cancellable,
on_request_sent,
g_object_ref (task));
g_object_unref (task);
}
}
static gboolean
finish_stream (OstreeFetcherPendingURI *pending,
GCancellable *cancellable,
GError **error)
{
gboolean ret = FALSE;
struct stat stbuf;
/* Close it here since we do an async fstat(), where we don't want
* to hit a bad fd.
*/
if (pending->out_stream)
{
if (!g_output_stream_close (pending->out_stream, cancellable, error))
goto out;
g_hash_table_remove (pending->self->output_stream_set, pending->out_stream);
}
pending->state = OSTREE_FETCHER_STATE_COMPLETE;
if (fstatat (pending->self->tmpdir_dfd, pending->out_tmpfile, &stbuf, AT_SYMLINK_NOFOLLOW) != 0)
{
gs_set_error_from_errno (error, errno);
goto out;
}
/* Now that we've finished downloading, continue with other queued
* requests.
*/
ostree_fetcher_process_pending_queue (pending->self);
if (stbuf.st_size < pending->content_length)
{
g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "Download incomplete");
goto out;
}
else
{
pending->self->total_downloaded += stbuf.st_size;
}
ret = TRUE;
out:
(void) g_input_stream_close (pending->request_body, NULL, NULL);
return ret;
}
static void
on_stream_read (GObject *object,
GAsyncResult *result,
gpointer user_data);
static void
on_out_splice_complete (GObject *object,
GAsyncResult *result,
gpointer user_data)
{
GTask *task = G_TASK (user_data);
OstreeFetcherPendingURI *pending;
GCancellable *cancellable;
gssize bytes_written;
GError *local_error = NULL;
pending = g_task_get_task_data (task);
cancellable = g_task_get_cancellable (task);
bytes_written = g_output_stream_splice_finish ((GOutputStream *)object,
result,
&local_error);
if (bytes_written < 0)
goto out;
g_input_stream_read_bytes_async (pending->request_body,
8192, G_PRIORITY_DEFAULT,
cancellable,
on_stream_read,
g_object_ref (task));
out:
if (local_error)
g_task_return_error (task, local_error);
g_object_unref (task);
}
static void
on_stream_read (GObject *object,
GAsyncResult *result,
gpointer user_data)
{
GTask *task = G_TASK (user_data);
OstreeFetcherPendingURI *pending;
GCancellable *cancellable;
g_autoptr(GBytes) bytes = NULL;
gsize bytes_read;
GError *local_error = NULL;
pending = g_task_get_task_data (task);
cancellable = g_task_get_cancellable (task);
bytes = g_input_stream_read_bytes_finish ((GInputStream*)object, result, &local_error);
if (!bytes)
goto out;
bytes_read = g_bytes_get_size (bytes);
if (bytes_read == 0)
{
if (!finish_stream (pending, cancellable, &local_error))
goto out;
g_task_return_pointer (task,
g_strdup (pending->out_tmpfile),
(GDestroyNotify) g_free);
}
else
{
if (pending->max_size > 0)
{
if (bytes_read > pending->max_size ||
(bytes_read + pending->current_size) > pending->max_size)
{
g_autofree char *uristr = soup_uri_to_string (pending->uri, FALSE);
local_error = g_error_new (G_IO_ERROR, G_IO_ERROR_FAILED,
"URI %s exceeded maximum size of %" G_GUINT64_FORMAT " bytes",
uristr, pending->max_size);
goto out;
}
}
pending->current_size += bytes_read;
/* We do this instead of _write_bytes_async() as that's not
* guaranteed to do a complete write.
*/
{
g_autoptr(GInputStream) membuf =
g_memory_input_stream_new_from_bytes (bytes);
g_output_stream_splice_async (pending->out_stream, membuf,
G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE,
G_PRIORITY_DEFAULT,
cancellable,
on_out_splice_complete,
g_object_ref (task));
}
}
out:
if (local_error)
g_task_return_error (task, local_error);
g_object_unref (task);
}
static void
on_request_sent (GObject *object,
GAsyncResult *result,
gpointer user_data)
{
GTask *task = G_TASK (user_data);
OstreeFetcherPendingURI *pending;
GCancellable *cancellable;
GError *local_error = NULL;
glnx_unref_object SoupMessage *msg = NULL;
pending = g_task_get_task_data (task);
cancellable = g_task_get_cancellable (task);
pending->state = OSTREE_FETCHER_STATE_COMPLETE;
pending->request_body = soup_request_send_finish ((SoupRequest*) object,
result, &local_error);
if (!pending->request_body)
goto out;
if (SOUP_IS_REQUEST_HTTP (object))
{
msg = soup_request_http_get_message ((SoupRequestHTTP*) object);
if (msg->status_code == SOUP_STATUS_REQUESTED_RANGE_NOT_SATISFIABLE)
{
// We already have the whole file, so just use it.
pending->state = OSTREE_FETCHER_STATE_COMPLETE;
(void) g_input_stream_close (pending->request_body, NULL, NULL);
if (pending->is_stream)
{
g_task_return_pointer (task,
g_object_ref (pending->request_body),
(GDestroyNotify) g_object_unref);
}
else
{
g_task_return_pointer (task,
g_strdup (pending->out_tmpfile),
(GDestroyNotify) g_free);
}
goto out;
}
else if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code))
{
GIOErrorEnum code;
switch (msg->status_code)
{
case 404:
case 410:
code = G_IO_ERROR_NOT_FOUND;
break;
default:
code = G_IO_ERROR_FAILED;
}
local_error = g_error_new (G_IO_ERROR, code,
"Server returned status %u: %s",
msg->status_code,
soup_status_get_phrase (msg->status_code));
goto out;
}
}
pending->state = OSTREE_FETCHER_STATE_DOWNLOADING;
pending->content_length = soup_request_get_content_length (pending->request);
if (!pending->is_stream)
{
int oflags = O_CREAT | O_WRONLY | O_CLOEXEC;
int fd;
/* If we got partial content, we can append; if the server
* ignored our range request, we need to truncate.
*/
if (msg && msg->status_code == SOUP_STATUS_PARTIAL_CONTENT)
oflags |= O_APPEND;
else
oflags |= O_TRUNC;
fd = openat (pending->self->tmpdir_dfd, pending->out_tmpfile, oflags, 0666);
if (fd == -1)
{
gs_set_error_from_errno (&local_error, errno);
goto out;
}
pending->out_stream = g_unix_output_stream_new (fd, TRUE);
g_hash_table_add (pending->self->output_stream_set, g_object_ref (pending->out_stream));
g_input_stream_read_bytes_async (pending->request_body,
8192, G_PRIORITY_DEFAULT,
cancellable,
on_stream_read,
g_object_ref (task));
}
else
{
g_task_return_pointer (task,
g_object_ref (pending->request_body),
(GDestroyNotify) g_object_unref);
}
out:
if (local_error)
{
if (pending->request_body)
(void) g_input_stream_close (pending->request_body, NULL, NULL);
g_task_return_error (task, local_error);
}
g_object_unref (task);
}
static void
ostree_fetcher_request_uri_internal (OstreeFetcher *self,
SoupURI *uri,
gboolean is_stream,
guint64 max_size,
int priority,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data,
gpointer source_tag)
{
GTask *task;
OstreeFetcherPendingURI *pending = g_new0 (OstreeFetcherPendingURI, 1);
GError *local_error = NULL;
pending->request = soup_requester_request_uri (self->requester, uri, &local_error);
pending->self = g_object_ref (self);
pending->uri = soup_uri_copy (uri);
pending->max_size = max_size;
pending->is_stream = is_stream;
task = g_task_new (self, cancellable, callback, user_data);
g_task_set_source_tag (task, source_tag);
g_task_set_task_data (task, pending, (GDestroyNotify) pending_uri_free);
/* We'll use the GTask priority for our own priority queue. */
g_task_set_priority (task, priority);
if (is_stream)
{
soup_request_send_async (pending->request,
cancellable,
on_request_sent,
g_object_ref (task));
}
else
{
g_autofree char *uristring = soup_uri_to_string (uri, FALSE);
g_autofree char *tmpfile = NULL;
struct stat stbuf;
gboolean exists;
tmpfile = g_compute_checksum_for_string (G_CHECKSUM_SHA256, uristring, strlen (uristring));
if (fstatat (self->tmpdir_dfd, tmpfile, &stbuf, AT_SYMLINK_NOFOLLOW) == 0)
exists = TRUE;
else
{
if (errno == ENOENT)
exists = FALSE;
else
{
gs_set_error_from_errno (&local_error, errno);
goto out;
}
}
if (SOUP_IS_REQUEST_HTTP (pending->request))
{
glnx_unref_object SoupMessage *msg = NULL;
msg = soup_request_http_get_message ((SoupRequestHTTP*) pending->request);
if (exists && stbuf.st_size > 0)
soup_message_headers_set_range (msg->request_headers, stbuf.st_size, -1);
}
pending->out_tmpfile = tmpfile;
tmpfile = NULL; /* Transfer ownership */
g_queue_insert_sorted (&self->pending_queue,
g_object_ref (task),
pending_task_compare, NULL);
ostree_fetcher_process_pending_queue (self);
}
g_assert_no_error (local_error);
out:
g_object_unref (task);
}
void
_ostree_fetcher_request_uri_with_partial_async (OstreeFetcher *self,
SoupURI *uri,
guint64 max_size,
int priority,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data)
{
ostree_fetcher_request_uri_internal (self, uri, FALSE, max_size, priority, cancellable,
callback, user_data,
_ostree_fetcher_request_uri_with_partial_async);
}
char *
_ostree_fetcher_request_uri_with_partial_finish (OstreeFetcher *self,
GAsyncResult *result,
GError **error)
{
g_return_val_if_fail (g_task_is_valid (result, self), NULL);
g_return_val_if_fail (g_async_result_is_tagged (result,
_ostree_fetcher_request_uri_with_partial_async), NULL);
return g_task_propagate_pointer (G_TASK (result), error);
}
static void
ostree_fetcher_stream_uri_async (OstreeFetcher *self,
SoupURI *uri,
guint64 max_size,
int priority,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data)
{
ostree_fetcher_request_uri_internal (self, uri, TRUE, max_size, priority, cancellable,
callback, user_data,
ostree_fetcher_stream_uri_async);
}
static GInputStream *
ostree_fetcher_stream_uri_finish (OstreeFetcher *self,
GAsyncResult *result,
GError **error)
{
g_return_val_if_fail (g_task_is_valid (result, self), NULL);
g_return_val_if_fail (g_async_result_is_tagged (result,
ostree_fetcher_stream_uri_async), NULL);
return g_task_propagate_pointer (G_TASK (result), error);
}
guint64
_ostree_fetcher_bytes_transferred (OstreeFetcher *self)
{
guint64 ret = self->total_downloaded;
GHashTableIter hiter;
gpointer key, value;
g_hash_table_iter_init (&hiter, self->output_stream_set);
while (g_hash_table_iter_next (&hiter, &key, &value))
{
GFileOutputStream *stream = key;
struct stat stbuf;
if (G_IS_FILE_DESCRIPTOR_BASED (stream))
{
if (gs_stream_fstat ((GFileDescriptorBased*)stream, &stbuf, NULL, NULL))
ret += stbuf.st_size;
}
}
return ret;
}
typedef struct
{
GInputStream *result_stream;
gboolean done;
GError **error;
}
FetchUriSyncData;
static void
fetch_uri_sync_on_complete (GObject *object,
GAsyncResult *result,
gpointer user_data)
{
FetchUriSyncData *data = user_data;
data->result_stream = ostree_fetcher_stream_uri_finish ((OstreeFetcher*)object,
result, data->error);
data->done = TRUE;
}
/* Synchronously request a URI - will iterate the thread-default main
* context for historical reasons. If you don't want that, push a
* temporary one.
*/
gboolean
_ostree_fetcher_request_uri_to_membuf (OstreeFetcher *fetcher,
SoupURI *uri,
gboolean add_nul,
gboolean allow_noent,
GBytes **out_contents,
guint64 max_size,
GCancellable *cancellable,
GError **error)
{
gboolean ret = FALSE;
const guint8 nulchar = 0;
g_autofree char *ret_contents = NULL;
g_autoptr(GMemoryOutputStream) buf = NULL;
g_autoptr(GMainContext) mainctx = NULL;
FetchUriSyncData data;
g_assert (error != NULL);
data.result_stream = NULL;
if (g_cancellable_set_error_if_cancelled (cancellable, error))
return FALSE;
mainctx = g_main_context_ref_thread_default ();
data.done = FALSE;
data.error = error;
ostree_fetcher_stream_uri_async (fetcher, uri,
max_size,
OSTREE_FETCHER_DEFAULT_PRIORITY,
cancellable,
fetch_uri_sync_on_complete, &data);
while (!data.done)
g_main_context_iteration (mainctx, TRUE);
if (!data.result_stream)
{
if (allow_noent)
{
if (g_error_matches (*error, G_IO_ERROR, G_IO_ERROR_NOT_FOUND))
{
g_clear_error (error);
ret = TRUE;
*out_contents = NULL;
}
}
goto out;
}
buf = (GMemoryOutputStream*)g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
if (g_output_stream_splice ((GOutputStream*)buf, data.result_stream,
G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE,
cancellable, error) < 0)
goto out;
if (add_nul)
{
if (!g_output_stream_write ((GOutputStream*)buf, &nulchar, 1, cancellable, error))
goto out;
}
if (!g_output_stream_close ((GOutputStream*)buf, cancellable, error))
goto out;
ret = TRUE;
*out_contents = g_memory_output_stream_steal_as_bytes (buf);
out:
g_clear_object (&(data.result_stream));
return ret;
}