From e485bace0189625c5ffc422c4c8b5987a6de5e79 Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Sun, 20 May 2012 16:21:57 -0400 Subject: [PATCH] pull: Download and checksum asynchronously This is quite a noticeable speedup when downloading loose objects. --- Makefile-ostree.am | 2 + src/libotutil/ot-gio-utils.c | 44 +++ src/libotutil/ot-gio-utils.h | 10 + src/ostree/ostree-fetcher.c | 313 +++++++++++++++++++++ src/ostree/ostree-fetcher.h | 64 +++++ src/ostree/ostree-pull.c | 524 +++++++++++++++++++++++++---------- 6 files changed, 813 insertions(+), 144 deletions(-) create mode 100644 src/ostree/ostree-fetcher.c create mode 100644 src/ostree/ostree-fetcher.h diff --git a/Makefile-ostree.am b/Makefile-ostree.am index 1f7ab0eb..80249749 100644 --- a/Makefile-ostree.am +++ b/Makefile-ostree.am @@ -53,6 +53,8 @@ if USE_LIBSOUP_GNOME bin_PROGRAMS += ostree-pull ostree_pull_SOURCES = src/ostree/ot-main.h \ src/ostree/ot-main.c \ + src/ostree/ostree-fetcher.h \ + src/ostree/ostree-fetcher.c \ src/ostree/ostree-pull.c ostree_pull_CFLAGS = $(ostree_bin_shared_cflags) $(OT_DEP_SOUP_CFLAGS) diff --git a/src/libotutil/ot-gio-utils.c b/src/libotutil/ot-gio-utils.c index cf1e0cdc..b27507dc 100644 --- a/src/libotutil/ot-gio-utils.c +++ b/src/libotutil/ot-gio-utils.c @@ -358,6 +358,50 @@ ot_gio_checksum_stream (GInputStream *in, return ot_gio_splice_get_checksum (NULL, in, out_csum, cancellable, error); } +static void +checksum_stream_thread (GSimpleAsyncResult *result, + GObject *object, + GCancellable *cancellable) +{ + GError *error = NULL; + guchar *csum; + + if (!ot_gio_checksum_stream ((GInputStream*)object, &csum, + cancellable, &error)) + g_simple_async_result_take_error (result, error); + else + g_simple_async_result_set_op_res_gpointer (result, csum, g_free); +} + +void +ot_gio_checksum_stream_async (GInputStream *in, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + GSimpleAsyncResult *result; + + result = g_simple_async_result_new ((GObject*) in, + callback, user_data, + ot_gio_checksum_stream_async); + + g_simple_async_result_run_in_thread (result, checksum_stream_thread, io_priority, cancellable); + g_object_unref (result); +} + +guchar * +ot_gio_checksum_stream_finish (GInputStream *in, + GAsyncResult *result, + GError **error) +{ + GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result); + + g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == ot_gio_checksum_stream_async); + return g_memdup (g_simple_async_result_get_op_res_gpointer (simple), 32); + +} + gboolean ot_gfile_merge_dirs (GFile *destination, GFile *src, diff --git a/src/libotutil/ot-gio-utils.h b/src/libotutil/ot-gio-utils.h index 16b2f123..9299a44b 100644 --- a/src/libotutil/ot-gio-utils.h +++ b/src/libotutil/ot-gio-utils.h @@ -88,6 +88,16 @@ gboolean ot_gio_checksum_stream (GInputStream *in, GCancellable *cancellable, GError **error); +void ot_gio_checksum_stream_async (GInputStream *in, + int io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data); + +guchar * ot_gio_checksum_stream_finish (GInputStream *in, + GAsyncResult *result, + GError **error); + gboolean ot_gfile_merge_dirs (GFile *destination, GFile *src, GCancellable *cancellable, diff --git a/src/ostree/ostree-fetcher.c b/src/ostree/ostree-fetcher.c new file mode 100644 index 00000000..cd422616 --- /dev/null +++ b/src/ostree/ostree-fetcher.c @@ -0,0 +1,313 @@ +/* -*- mode: C; c-file-style: "gnu"; indent-tabs-mode: nil; -*- + * + * Copyright (C) 2011 Colin Walters + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + * + * Author: Colin Walters + */ + +#include "config.h" + +#include "ostree-fetcher.h" +#include "ostree.h" + +typedef enum { + OSTREE_FETCHER_STATE_PENDING, + OSTREE_FETCHER_STATE_DOWNLOADING, + OSTREE_FETCHER_STATE_COMPLETE +} OstreeFetcherState; + +typedef struct { + OstreeFetcher *self; + SoupURI *uri; + + OstreeFetcherState state; + + SoupRequest *request; + + GFile *tmpfile; + GInputStream *request_body; + GOutputStream *out_stream; + + guint64 content_length; + + GCancellable *cancellable; + GSimpleAsyncResult *result; +} OstreeFetcherPendingURI; + +static void +pending_uri_free (OstreeFetcherPendingURI *pending) +{ + g_clear_object (&pending->self); + g_clear_object (&pending->tmpfile); + g_clear_object (&pending->request); + g_clear_object (&pending->request_body); + g_clear_object (&pending->out_stream); + g_clear_object (&pending->cancellable); + g_free (pending); +} + +struct OstreeFetcher +{ + GObject parent_instance; + + GFile *tmpdir; + + SoupSession *session; + SoupRequester *requester; + + SoupMessage *sending_message; + + GHashTable *message_to_request; + + guint64 total_downloaded; +}; + +G_DEFINE_TYPE (OstreeFetcher, ostree_fetcher, G_TYPE_OBJECT) + +static void +ostree_fetcher_finalize (GObject *object) +{ + OstreeFetcher *self; + + self = OSTREE_FETCHER (object); + + g_clear_object (&self->session); + + G_OBJECT_CLASS (ostree_fetcher_parent_class)->finalize (object); +} + +static void +ostree_fetcher_class_init (OstreeFetcherClass *klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + + gobject_class->finalize = ostree_fetcher_finalize; +} + +static void +on_request_started (SoupSession *session, + SoupMessage *msg, + SoupSocket *socket, + gpointer user_data) +{ + OstreeFetcher *self = user_data; + self->sending_message = msg; +} + +static void +on_request_unqueued (SoupSession *session, + SoupMessage *msg, + gpointer user_data) +{ + OstreeFetcher *self = user_data; + if (msg == self->sending_message) + { + self->sending_message = NULL; + g_hash_table_remove (self->message_to_request, msg); + } +} + +static void +ostree_fetcher_init (OstreeFetcher *self) +{ + self->session = soup_session_async_new_with_options (SOUP_SESSION_USER_AGENT, "ostree ", + SOUP_SESSION_USE_THREAD_CONTEXT, TRUE, + SOUP_SESSION_ADD_FEATURE_BY_TYPE, SOUP_TYPE_REQUESTER, + NULL); + self->requester = (SoupRequester *)soup_session_get_feature (self->session, SOUP_TYPE_REQUESTER); + + g_signal_connect (self->session, "request-started", + G_CALLBACK (on_request_started), self); + g_signal_connect (self->session, "request-unqueued", + G_CALLBACK (on_request_unqueued), self); + + self->message_to_request = g_hash_table_new_full (NULL, NULL, (GDestroyNotify)g_object_unref, NULL); +} + +OstreeFetcher * +ostree_fetcher_new (GFile *tmpdir) +{ + OstreeFetcher *self = (OstreeFetcher*)g_object_new (OSTREE_TYPE_FETCHER, NULL); + + self->tmpdir = g_object_ref (tmpdir); + + return self; +} + +static void +on_splice_complete (GObject *object, + GAsyncResult *result, + gpointer user_data) +{ + OstreeFetcherPendingURI *pending = user_data; + ot_lobj GFileInfo *file_info = NULL; + + pending->state = OSTREE_FETCHER_STATE_COMPLETE; + file_info = g_file_query_info (pending->tmpfile, OSTREE_GIO_FAST_QUERYINFO, + G_FILE_QUERY_INFO_NOFOLLOW_SYMLINKS, + NULL, NULL); + if (file_info) + pending->self->total_downloaded += g_file_info_get_size (file_info); + + (void) g_input_stream_close (pending->request_body, NULL, NULL); + + g_simple_async_result_complete (pending->result); + g_object_unref (pending->result); +} + +static void +on_request_sent (GObject *object, + GAsyncResult *result, + gpointer user_data) +{ + OstreeFetcherPendingURI *pending = user_data; + GError *local_error = NULL; + + pending->request_body = soup_request_send_finish ((SoupRequest*) object, + result, &local_error); + if (!pending->request_body) + { + pending->state = OSTREE_FETCHER_STATE_COMPLETE; + g_simple_async_result_take_error (pending->result, local_error); + g_simple_async_result_complete (pending->result); + } + else + { + GOutputStreamSpliceFlags flags = G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET; + + pending->state = OSTREE_FETCHER_STATE_DOWNLOADING; + + pending->content_length = soup_request_get_content_length (pending->request); + + /* TODO - make this async */ + if (!ostree_create_temp_regular_file (pending->self->tmpdir, + NULL, NULL, + &pending->tmpfile, + &pending->out_stream, + NULL, &local_error)) + { + g_simple_async_result_take_error (pending->result, local_error); + g_simple_async_result_complete (pending->result); + return; + } + + g_output_stream_splice_async (pending->out_stream, pending->request_body, flags, G_PRIORITY_DEFAULT, + pending->cancellable, on_splice_complete, pending); + } +} + +void +ostree_fetcher_request_uri_async (OstreeFetcher *self, + SoupURI *uri, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + OstreeFetcherPendingURI *pending; + GError *local_error = NULL; + + pending = g_new0 (OstreeFetcherPendingURI, 1); + pending->self = g_object_ref (self); + pending->uri = soup_uri_copy (uri); + pending->cancellable = cancellable ? g_object_ref (cancellable) : NULL; + pending->request = soup_requester_request_uri (self->requester, uri, &local_error); + g_assert_no_error (local_error); + + g_hash_table_insert (self->message_to_request, + soup_request_http_get_message ((SoupRequestHTTP*)pending->request), + pending); + + pending->result = g_simple_async_result_new ((GObject*) self, + callback, user_data, + ostree_fetcher_request_uri_async); + g_simple_async_result_set_op_res_gpointer (pending->result, pending, + (GDestroyNotify) pending_uri_free); + + soup_request_send_async (pending->request, cancellable, + on_request_sent, pending); + +} + +GFile * +ostree_fetcher_request_uri_finish (OstreeFetcher *self, + GAsyncResult *result, + GError **error) +{ + GSimpleAsyncResult *simple; + OstreeFetcherPendingURI *pending; + + g_return_val_if_fail (g_simple_async_result_is_valid (result, (GObject*)self, ostree_fetcher_request_uri_async), FALSE); + + simple = G_SIMPLE_ASYNC_RESULT (result); + if (g_simple_async_result_propagate_error (simple, error)) + return NULL; + pending = g_simple_async_result_get_op_res_gpointer (simple); + + return g_object_ref (pending->tmpfile); +} + +static char * +format_size_pair (guint64 start, + guint64 max) +{ + if (max < 1024) + return g_strdup_printf ("%lu/%lu bytes", + (gulong) start, + (gulong) max); + else + return g_strdup_printf ("%.1f/%.1f KiB", ((double) start) / 1024, + ((double) max) / 1024); +} + +char * +ostree_fetcher_query_state_text (OstreeFetcher *self) +{ + OstreeFetcherPendingURI *active; + + if (self->sending_message) + active = g_hash_table_lookup (self->message_to_request, self->sending_message); + else + active = NULL; + if (active) + { + ot_lfree char *active_uri = soup_uri_to_string (active->uri, TRUE); + + if (active->tmpfile) + { + ot_lobj GFileInfo *file_info = NULL; + + file_info = g_file_query_info (active->tmpfile, OSTREE_GIO_FAST_QUERYINFO, + G_FILE_QUERY_INFO_NOFOLLOW_SYMLINKS, + NULL, NULL); + if (file_info) + { + ot_lfree char *size = format_size_pair (g_file_info_get_size (file_info), + active->content_length); + return g_strdup_printf ("Downloading %s [ %s, %.1f KiB downloaded ]", + active_uri, size, ((double)self->total_downloaded) / 1024); + } + } + else + { + return g_strdup_printf ("Requesting %s [ %.1f KiB downloaded ]", + active_uri, ((double)self->total_downloaded) / 1024); + } + } + + return g_strdup_printf ("Idle [ %.1f KiB downloaded ]", ((double)self->total_downloaded) / 1024); +} diff --git a/src/ostree/ostree-fetcher.h b/src/ostree/ostree-fetcher.h new file mode 100644 index 00000000..d32ea6f9 --- /dev/null +++ b/src/ostree/ostree-fetcher.h @@ -0,0 +1,64 @@ +/* -*- mode: C; c-file-style: "gnu"; indent-tabs-mode: nil; -*- + * + * Copyright (C) 2012 Colin Walters + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#ifndef _OSTREE_FETCHER +#define _OSTREE_FETCHER + +#define LIBSOUP_USE_UNSTABLE_REQUEST_API +#include +#include +#include + +G_BEGIN_DECLS + +#define OSTREE_TYPE_FETCHER (ostree_fetcher_get_type ()) +#define OSTREE_FETCHER(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), OSTREE_TYPE_FETCHER, OstreeFetcher)) +#define OSTREE_FETCHER_CLASS(k) (G_TYPE_CHECK_CLASS_CAST((k), OSTREE_TYPE_FETCHER, OstreeFetcherClass)) +#define OSTREE_IS_FETCHER(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), OSTREE_TYPE_FETCHER)) +#define OSTREE_IS_FETCHER_CLASS(k) (G_TYPE_CHECK_CLASS_TYPE ((k), OSTREE_TYPE_FETCHER)) +#define OSTREE_FETCHER_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), OSTREE_TYPE_FETCHER, OstreeFetcherClass)) + +typedef struct OstreeFetcherClass OstreeFetcherClass; +typedef struct OstreeFetcher OstreeFetcher; + +struct OstreeFetcherClass +{ + GObjectClass parent_class; +}; + +GType ostree_fetcher_get_type (void) G_GNUC_CONST; + +OstreeFetcher *ostree_fetcher_new (GFile *tmpdir); + +char * ostree_fetcher_query_state_text (OstreeFetcher *self); + +void ostree_fetcher_request_uri_async (OstreeFetcher *self, + SoupURI *uri, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data); + +GFile *ostree_fetcher_request_uri_finish (OstreeFetcher *self, + GAsyncResult *result, + GError **error); + +G_END_DECLS + +#endif diff --git a/src/ostree/ostree-pull.c b/src/ostree/ostree-pull.c index f8ba1d84..a7be92dd 100644 --- a/src/ostree/ostree-pull.c +++ b/src/ostree/ostree-pull.c @@ -20,13 +20,49 @@ * Author: Colin Walters */ +/** + * DESIGN: + * + * Pull refs + * For each ref: + * Pull commit + * + * Pull commits: + * For each commit: + * Verify checksum + * Import + * Traverse and queue dirtree/dirmeta + * + * Pull dirtrees: + * For each dirtree: + * Verify checksum + * Import + * Traverse and queue content/dirtree/dirmeta + * + * Pull content meta: + * For each content: + * Pull meta + * If contentcontent needed: + * Queue contentcontent + * else: + * Import + * + * Pull contentcontent: + * For each contentcontent + * Verify checksum + * Import + * + * + */ + #include "config.h" -#include #include "ostree.h" #include "ot-main.h" +#include "ostree-fetcher.h" + gboolean verbose; gboolean opt_prefer_loose; gboolean opt_related; @@ -43,7 +79,7 @@ static GOptionEntry options[] = { typedef struct { OstreeRepo *repo; char *remote_name; - SoupSession *session; + OstreeFetcher *fetcher; SoupURI *base_uri; gboolean fetched_packs; @@ -52,10 +88,22 @@ typedef struct { GHashTable *file_checksums_to_fetch; - gboolean stdout_is_tty; + GMainLoop *loop; - guint64 dl_current_bytes; - guint64 dl_total_bytes; + /* Used in meta fetch phase */ + guint outstanding_uri_requests; + guint outstanding_meta_requests; + + /* Used in content fetch phase */ + guint outstanding_filemeta_requests; + guint outstanding_filecontent_requests; + guint outstanding_checksum_requests; + GHashTable *loose_files; + + GError **async_error; + gboolean caught_error; + + gboolean stdout_is_tty; } OtPullData; static SoupURI * @@ -96,55 +144,87 @@ suburi_new (SoupURI *base, return ret; } +static gboolean +uri_fetch_update_status (gpointer user_data) +{ + OtPullData *pull_data = user_data; + ot_lfree char *fetcher_status; + GString *status; + + status = g_string_new (""); + + if (pull_data->loose_files != NULL) + g_string_append_printf (status, "%u loose files to fetch: ", + g_hash_table_size (pull_data->loose_files) + + pull_data->outstanding_filemeta_requests + + pull_data->outstanding_filecontent_requests); + + if (pull_data->outstanding_checksum_requests > 0) + g_string_append_printf (status, "Calculating %u checksums; ", + pull_data->outstanding_checksum_requests); + + fetcher_status = ostree_fetcher_query_state_text (pull_data->fetcher); + g_string_append (status, fetcher_status); + g_print ("%s\n", status->str); + + g_string_free (status, TRUE); + + return TRUE; +} + +static void +check_outstanding_requests_handle_error (OtPullData *pull_data, + GError *error) +{ + if (pull_data->outstanding_uri_requests == 0 && + pull_data->outstanding_meta_requests == 0 && + pull_data->outstanding_filemeta_requests == 0 && + pull_data->outstanding_filecontent_requests == 0 && + pull_data->outstanding_checksum_requests == 0 && + (pull_data->loose_files == NULL || g_hash_table_size (pull_data->loose_files) == 0)) + g_main_loop_quit (pull_data->loop); + if (error) + { + pull_data->caught_error = TRUE; + if (pull_data->async_error) + g_error_free (error); + else + g_propagate_error (pull_data->async_error, error); + } +} + +static void +run_mainloop_monitor_fetcher (OtPullData *pull_data) +{ + GSource *update_timeout = NULL; + + update_timeout = g_timeout_source_new_seconds (1); + g_source_set_callback (update_timeout, uri_fetch_update_status, pull_data, NULL); + g_source_attach (update_timeout, g_main_loop_get_context (pull_data->loop)); + g_source_unref (update_timeout); + + g_main_loop_run (pull_data->loop); + + g_source_destroy (update_timeout); +} + typedef struct { OtPullData *pull_data; - GOutputStream *stream; - gboolean had_error; - GError **error; -} OstreeSoupChunkData; + GFile *result_file; +} OstreeFetchUriData; static void -sync_progress (OtPullData *pull_data) +uri_fetch_on_complete (GObject *object, + GAsyncResult *result, + gpointer user_data) { - if (pull_data->stdout_is_tty) - { - g_print ("%c8%" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT " KiB", - 0x1b, (pull_data->dl_current_bytes / 1024), (pull_data->dl_total_bytes / 1024)); - } -} + OstreeFetchUriData *data = user_data; + GError *local_error = NULL; -static void -on_got_chunk (SoupMessage *msg, - SoupBuffer *buf, - gpointer user_data) -{ - OstreeSoupChunkData *data = user_data; - gsize bytes_written; - - data->pull_data->dl_current_bytes += buf->length; - sync_progress (data->pull_data); - - if (!g_output_stream_write_all (data->stream, buf->data, buf->length, - &bytes_written, NULL, data->error)) - { - data->had_error = TRUE; - soup_session_cancel_message (data->pull_data->session, msg, 500); - } -} - -static void -on_got_content_length (SoupMessage *msg, - OtPullData *pull_data) -{ - goffset size; - - g_assert (msg->response_headers); - - size = soup_message_headers_get_content_length (msg->response_headers); - if (size > 0) - pull_data->dl_total_bytes = (guint64) size; - - sync_progress (pull_data); + data->result_file = ostree_fetcher_request_uri_finish ((OstreeFetcher*)object, + result, &local_error); + data->pull_data->outstanding_uri_requests--; + check_outstanding_requests_handle_error (data->pull_data, local_error); } static gboolean @@ -156,67 +236,31 @@ fetch_uri (OtPullData *pull_data, GError **error) { gboolean ret = FALSE; - guint response; ot_lfree char *uri_string = NULL; - ot_lobj GFile *ret_temp_filename = NULL; - ot_lobj GOutputStream *output_stream = NULL; - ot_lobj SoupMessage *msg = NULL; - OstreeSoupChunkData chunkdata; + ot_lobj SoupRequest *request = NULL; + OstreeFetchUriData fetch_data; - if (!ostree_create_temp_regular_file (ostree_repo_get_tmpdir (pull_data->repo), - tmp_prefix, NULL, - &ret_temp_filename, - &output_stream, - NULL, error)) - goto out; + if (g_cancellable_set_error_if_cancelled (cancellable, error)) + return FALSE; + + memset (&fetch_data, 0, sizeof (fetch_data)); + fetch_data.pull_data = pull_data; - chunkdata.pull_data = pull_data; - chunkdata.stream = output_stream; - chunkdata.had_error = FALSE; - chunkdata.error = error; - uri_string = soup_uri_to_string (uri, FALSE); g_print ("Fetching %s\n", uri_string); - if (pull_data->stdout_is_tty) - { - g_print ("%c7", 0x1B); - g_print ("0/? KiB"); - pull_data->dl_current_bytes = 0; - pull_data->dl_total_bytes = 0; - sync_progress (pull_data); - } + pull_data->outstanding_uri_requests++; + ostree_fetcher_request_uri_async (pull_data->fetcher, uri, cancellable, + uri_fetch_on_complete, &fetch_data); - msg = soup_message_new_from_uri (SOUP_METHOD_GET, uri); + run_mainloop_monitor_fetcher (pull_data); - soup_message_body_set_accumulate (msg->response_body, FALSE); - - soup_message_add_header_handler (msg, "got-headers", - "Content-Length", - G_CALLBACK (on_got_content_length), - pull_data); - g_signal_connect (msg, "got-chunk", G_CALLBACK (on_got_chunk), &chunkdata); - - response = soup_session_send_message (pull_data->session, msg); - if (response != 200) - { - g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, - "Failed to retrieve '%s': %d %s", - uri_string, response, msg->reason_phrase); - goto out; - } - - if (!g_output_stream_close (output_stream, NULL, error)) + if (pull_data->caught_error) goto out; - if (pull_data->stdout_is_tty) - g_print ("\n"); - ret = TRUE; - ot_transfer_out_value (out_temp_filename, &ret_temp_filename); + ot_transfer_out_value (out_temp_filename, &fetch_data.result_file); out: - if (ret_temp_filename) - (void) unlink (ot_gfile_get_path_cached (ret_temp_filename)); return ret; } @@ -924,6 +968,231 @@ store_file_from_pack (OtPullData *pull_data, return ret; } +typedef struct { + OtPullData *pull_data; + + gboolean fetching_content; + + GFile *meta_path; + GFile *content_path; + + char *checksum; +} OtFetchOneContentItemData; + +static void +destroy_fetch_one_content_item_data (OtFetchOneContentItemData *data) +{ + if (data->meta_path) + (void) ot_gfile_unlink (data->meta_path, NULL, NULL); + g_clear_object (&data->meta_path); + if (data->content_path) + (void) ot_gfile_unlink (data->content_path, NULL, NULL); + g_clear_object (&data->content_path); + g_free (data->checksum); + g_free (data); +} + +static void +content_fetch_on_checksum_complete (GObject *object, + GAsyncResult *result, + gpointer user_data) +{ + OtFetchOneContentItemData *data = user_data; + GError *local_error = NULL; + GError **error = &local_error; + guint64 length; + GCancellable *cancellable = NULL; + ot_lfree guchar *csum; + ot_lvariant GVariant *file_meta = NULL; + ot_lobj GFileInfo *file_info = NULL; + ot_lvariant GVariant *xattrs = NULL; + ot_lobj GInputStream *content_input = NULL; + ot_lobj GInputStream *file_object_input = NULL; + ot_lfree char *checksum; + + csum = ot_gio_checksum_stream_finish ((GInputStream*)object, result, error); + if (!csum) + goto out; + + if (!ot_util_variant_map (data->meta_path, OSTREE_FILE_HEADER_GVARIANT_FORMAT, FALSE, + &file_meta, error)) + goto out; + + if (!ostree_file_header_parse (file_meta, &file_info, &xattrs, error)) + goto out; + + if (data->content_path) + { + content_input = (GInputStream*)g_file_read (data->content_path, cancellable, error); + if (!content_input) + goto out; + } + + if (!ostree_raw_file_to_content_stream (content_input, file_info, xattrs, + &file_object_input, &length, + cancellable, error)) + goto out; + + checksum = ostree_checksum_from_bytes (csum); + + if (strcmp (checksum, data->checksum) != 0) + { + g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, + "Corrupted object %s (actual checksum is %s)", + data->checksum, checksum); + goto out; + } + + if (!ostree_repo_stage_file_object_trusted (data->pull_data->repo, checksum, + FALSE, file_object_input, length, + cancellable, error)) + goto out; + + out: + data->pull_data->outstanding_checksum_requests--; + check_outstanding_requests_handle_error (data->pull_data, local_error); + destroy_fetch_one_content_item_data (data); +} + +static void +enqueue_loose_meta_requests (OtPullData *pull_data); + +static void +content_fetch_on_complete (GObject *object, + GAsyncResult *result, + gpointer user_data) +{ + OtFetchOneContentItemData *data = user_data; + GError *local_error = NULL; + GError **error = &local_error; + GCancellable *cancellable = NULL; + gboolean was_content_fetch = FALSE; + gboolean need_content_fetch = FALSE; + ot_lvariant GVariant *file_meta = NULL; + ot_lobj GFileInfo *file_info = NULL; + ot_lobj GInputStream *content_input = NULL; + ot_lobj GInputStream *file_object_input = NULL; + ot_lvariant GVariant *xattrs = NULL; + + was_content_fetch = data->fetching_content; + + if (was_content_fetch) + { + data->content_path = ostree_fetcher_request_uri_finish ((OstreeFetcher*)object, result, error); + if (!data->content_path) + goto out; + } + else + { + data->meta_path = ostree_fetcher_request_uri_finish ((OstreeFetcher*)object, result, error); + if (!data->meta_path) + goto out; + } + + if (!was_content_fetch) + { + if (!ot_util_variant_map (data->meta_path, OSTREE_FILE_HEADER_GVARIANT_FORMAT, FALSE, + &file_meta, error)) + goto out; + + if (!ostree_file_header_parse (file_meta, &file_info, &xattrs, error)) + goto out; + + if (g_file_info_get_file_type (file_info) == G_FILE_TYPE_REGULAR) + { + ot_lfree char *content_path = ostree_get_relative_archive_content_path (data->checksum); + SoupURI *content_uri; + + content_uri = suburi_new (data->pull_data->base_uri, content_path, NULL); + + data->pull_data->outstanding_filecontent_requests++; + need_content_fetch = TRUE; + data->fetching_content = TRUE; + + ostree_fetcher_request_uri_async (data->pull_data->fetcher, content_uri, cancellable, + content_fetch_on_complete, data); + soup_uri_free (content_uri); + } + } + + if (!need_content_fetch) + { + if (data->content_path) + { + content_input = (GInputStream*)g_file_read (data->content_path, cancellable, error); + if (!content_input) + goto out; + } + + if (file_meta == NULL) + { + if (!ot_util_variant_map (data->meta_path, OSTREE_FILE_HEADER_GVARIANT_FORMAT, FALSE, + &file_meta, error)) + goto out; + + if (!ostree_file_header_parse (file_meta, &file_info, &xattrs, error)) + goto out; + } + + if (!ostree_raw_file_to_content_stream (content_input, file_info, xattrs, + &file_object_input, NULL, + cancellable, error)) + goto out; + + data->pull_data->outstanding_checksum_requests++; + ot_gio_checksum_stream_async (file_object_input, G_PRIORITY_DEFAULT, NULL, + content_fetch_on_checksum_complete, data); + } + + out: + if (was_content_fetch) + data->pull_data->outstanding_filecontent_requests--; + else + { + data->pull_data->outstanding_filemeta_requests--; + enqueue_loose_meta_requests (data->pull_data); + } + check_outstanding_requests_handle_error (data->pull_data, local_error); +} + +static void +enqueue_loose_meta_requests (OtPullData *pull_data) +{ + GHashTableIter hash_iter; + gpointer key, value; + GCancellable *cancellable = NULL; + + g_hash_table_iter_init (&hash_iter, pull_data->loose_files); + while (g_hash_table_iter_next (&hash_iter, &key, &value)) + { + const char *checksum = key; + ot_lfree char *objpath = NULL; + SoupURI *obj_uri = NULL; + OtFetchOneContentItemData *one_item_data; + + one_item_data = g_new0 (OtFetchOneContentItemData, 1); + one_item_data->pull_data = pull_data; + one_item_data->checksum = g_strdup (checksum); + one_item_data->fetching_content = FALSE; + + objpath = ostree_get_relative_object_path (checksum, OSTREE_OBJECT_TYPE_FILE); + obj_uri = suburi_new (pull_data->base_uri, objpath, NULL); + + ostree_fetcher_request_uri_async (pull_data->fetcher, obj_uri, cancellable, + content_fetch_on_complete, one_item_data); + soup_uri_free (obj_uri); + + pull_data->outstanding_filemeta_requests++; + g_hash_table_iter_remove (&hash_iter); + + /* Don't let too many requests queue up; when we're fetching + * files we need to process the actual content. + */ + if (pull_data->outstanding_filemeta_requests > 20) + break; + } +} + static gboolean fetch_content (OtPullData *pull_data, GCancellable *cancellable, @@ -1015,51 +1284,16 @@ fetch_content (OtPullData *pull_data, if (g_hash_table_size (loose_files) > 0) g_print ("Fetching %u loose objects\n", g_hash_table_size (loose_files)); - - g_hash_table_iter_init (&hash_iter, loose_files); - while (g_hash_table_iter_next (&hash_iter, &key, &value)) + + pull_data->loose_files = loose_files; + + if (g_hash_table_size (loose_files) > 0) { - const char *checksum = key; - guint64 length; - ot_lobj GInputStream *file_object_input = NULL; - ot_lvariant GVariant *file_meta = NULL; - ot_lobj GFileInfo *file_info = NULL; - ot_lvariant GVariant *xattrs = NULL; - ot_lobj GInputStream *content_input = NULL; + enqueue_loose_meta_requests (pull_data); - if (!fetch_loose_object (pull_data, checksum, OSTREE_OBJECT_TYPE_FILE, &temp_path, - cancellable, error)) - goto out; + run_mainloop_monitor_fetcher (pull_data); - if (!ot_util_variant_map (temp_path, OSTREE_FILE_HEADER_GVARIANT_FORMAT, FALSE, - &file_meta, error)) - goto out; - - if (!ostree_file_header_parse (file_meta, &file_info, &xattrs, error)) - goto out; - - if (g_file_info_get_file_type (file_info) == G_FILE_TYPE_REGULAR) - { - ot_lfree char *content_path = ostree_get_relative_archive_content_path (checksum); - content_uri = suburi_new (pull_data->base_uri, content_path, NULL); - - if (!fetch_uri (pull_data, content_uri, "filecontent", &content_temp_path, - cancellable, error)) - goto out; - - content_input = (GInputStream*)g_file_read (content_temp_path, cancellable, error); - if (!content_input) - goto out; - } - - if (!ostree_raw_file_to_content_stream (content_input, file_info, xattrs, - &file_object_input, &length, - cancellable, error)) - goto out; - - if (!ostree_repo_stage_file_object (pull_data->repo, checksum, - file_object_input, length, - cancellable, error)) + if (pull_data->caught_error) goto out; } @@ -1203,6 +1437,9 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error) if (!ostree_repo_check (repo, error)) goto out; + pull_data->async_error = error; + pull_data->loop = g_main_loop_new (NULL, FALSE); + pull_data->repo = repo; pull_data->file_checksums_to_fetch = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL); @@ -1215,10 +1452,7 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error) pull_data->stdout_is_tty = isatty (1); pull_data->remote_name = g_strdup (argv[1]); - pull_data->session = soup_session_sync_new_with_options (SOUP_SESSION_USER_AGENT, "ostree ", - SOUP_SESSION_ADD_FEATURE_BY_TYPE, SOUP_TYPE_COOKIE_JAR, - NULL); - + pull_data->fetcher = ostree_fetcher_new (ostree_repo_get_tmpdir (pull_data->repo)); config = ostree_repo_get_config (repo); remote_key = g_strdup_printf ("remote \"%s\"", pull_data->remote_name); @@ -1384,10 +1618,12 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error) ret = TRUE; out: + if (pull_data->loop) + g_main_loop_unref (pull_data->loop); g_strfreev (configured_branches); if (context) g_option_context_free (context); - g_clear_object (&pull_data->session); + g_clear_object (&pull_data->fetcher); g_free (pull_data->remote_name); if (pull_data->base_uri) soup_uri_free (pull_data->base_uri);