From 5c1dc390ae0e51487f9451fe433f76461f4c4127 Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Tue, 28 Aug 2012 09:41:09 -0400 Subject: [PATCH] pull: Asynchronous metadata fetch Create a worker thread for processing metadata, reserving the main thread for HTTP requests. This can create a very significant efficiency win for large pull requests since we are much more likely to keep a full pipeline open. The status display is also nicer now. --- Makefile-otutil.am | 2 + src/libotutil/ot-worker-queue.c | 169 +++++++++ src/libotutil/ot-worker-queue.h | 57 +++ src/libotutil/otutil.h | 1 + src/ostree/ostree-pull.c | 635 ++++++++++++++++++++------------ 5 files changed, 632 insertions(+), 232 deletions(-) create mode 100644 src/libotutil/ot-worker-queue.c create mode 100644 src/libotutil/ot-worker-queue.h diff --git a/Makefile-otutil.am b/Makefile-otutil.am index ea94e3b5..58ea8f83 100644 --- a/Makefile-otutil.am +++ b/Makefile-otutil.am @@ -35,6 +35,8 @@ libotutil_la_SOURCES = \ src/libotutil/ot-spawn-utils.h \ src/libotutil/ot-variant-utils.c \ src/libotutil/ot-variant-utils.h \ + src/libotutil/ot-worker-queue.c \ + src/libotutil/ot-worker-queue.h \ src/libotutil/ot-gio-utils.c \ src/libotutil/ot-gio-utils.h \ src/libotutil/otutil.c \ diff --git a/src/libotutil/ot-worker-queue.c b/src/libotutil/ot-worker-queue.c new file mode 100644 index 00000000..797c1686 --- /dev/null +++ b/src/libotutil/ot-worker-queue.c @@ -0,0 +1,169 @@ +/* -*- mode: C; c-file-style: "gnu"; indent-tabs-mode: nil; -*- + * + * Copyright (C) 2012 Colin Walters + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + * + * Author: Colin Walters + */ + +#include "config.h" + +#include "otutil.h" + +#include + +struct OtWorkerQueue { + GMutex mutex; + GCond cond; + GQueue queue; + + volatile gint holds; + + char *thread_name; + + gboolean destroyed; + + GThread *worker; + + OtWorkerQueueFunc work_func; + OtWorkerQueueFunc work_data; + + GMainContext *idle_context; + OtWorkerQueueIdleFunc idle_callback; + gpointer idle_data; +}; + +static gpointer +ot_worker_queue_thread_main (gpointer user_data); + +OtWorkerQueue * +ot_worker_queue_new (const char *thread_name, + OtWorkerQueueFunc func, + gpointer data) +{ + OtWorkerQueue *queue = g_new0 (OtWorkerQueue, 1); + g_mutex_init (&queue->mutex); + g_cond_init (&queue->cond); + g_queue_init (&queue->queue); + + queue->thread_name = g_strdup (thread_name); + queue->work_func = func; + queue->work_data = data; + + return queue; +} + +void +ot_worker_queue_start (OtWorkerQueue *queue) +{ + queue->worker = g_thread_new (queue->thread_name, ot_worker_queue_thread_main, queue); +} + +void +ot_worker_queue_hold (OtWorkerQueue *queue) +{ + g_atomic_int_inc (&queue->holds); +} + +void +ot_worker_queue_release (OtWorkerQueue *queue) +{ + g_atomic_int_add (&queue->holds, -1); +} + +void +ot_worker_queue_push (OtWorkerQueue *queue, + gpointer data) +{ + g_mutex_lock (&queue->mutex); + g_queue_push_head (&queue->queue, data); + g_cond_signal (&queue->cond); + g_mutex_unlock (&queue->mutex); +} + +static gboolean +invoke_idle_callback (gpointer user_data) +{ + OtWorkerQueue *queue = user_data; + queue->idle_callback (queue->idle_data); + return FALSE; +} + +static gpointer +ot_worker_queue_thread_main (gpointer user_data) +{ + OtWorkerQueue *queue = user_data; + + while (TRUE) + { + gpointer item; + + g_mutex_lock (&queue->mutex); + + while (!g_queue_peek_tail_link (&queue->queue)) + { + if (queue->idle_callback && g_atomic_int_get (&queue->holds) == 0) + g_main_context_invoke (queue->idle_context, + invoke_idle_callback, + queue); + g_cond_wait (&queue->cond, &queue->mutex); + } + + item = g_queue_pop_tail (&queue->queue); + + g_mutex_unlock (&queue->mutex); + + if (!item) + break; + + queue->work_func (item, queue->work_data); + } + + return NULL; +} + +void +ot_worker_queue_set_idle_callback (OtWorkerQueue *queue, + GMainContext *context, + OtWorkerQueueIdleFunc idle_callback, + gpointer data) +{ + g_assert (!queue->worker); + if (!context) + context = g_main_context_default (); + queue->idle_context = g_main_context_ref (context); + queue->idle_callback = idle_callback; + queue->idle_data = data; +} + +void +ot_worker_queue_unref (OtWorkerQueue *queue) +{ + if (queue->worker) + { + ot_worker_queue_push (queue, NULL); + g_thread_join (queue->worker); + } + + g_free (queue->thread_name); + + g_main_context_unref (queue->idle_context); + g_mutex_clear (&queue->mutex); + g_cond_clear (&queue->cond); + g_queue_clear (&queue->queue); + g_free (queue); +} diff --git a/src/libotutil/ot-worker-queue.h b/src/libotutil/ot-worker-queue.h new file mode 100644 index 00000000..590480e0 --- /dev/null +++ b/src/libotutil/ot-worker-queue.h @@ -0,0 +1,57 @@ +/* -*- mode: C; c-file-style: "gnu"; indent-tabs-mode: nil; -*- + * + * Copyright (C) 2012 Colin Walters . + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + * + * Author: Colin Walters + */ + +#ifndef __OSTREE_WORKER_QUEUE_H__ +#define __OSTREE_WORKER_QUEUE_H__ + +#include + +G_BEGIN_DECLS + +typedef struct OtWorkerQueue OtWorkerQueue; + +typedef void (*OtWorkerQueueFunc) (gpointer data, + gpointer user_data); +typedef void (*OtWorkerQueueIdleFunc) (gpointer user_data); + +OtWorkerQueue *ot_worker_queue_new (const char *thread_name, + OtWorkerQueueFunc func, + gpointer data); + +void ot_worker_queue_start (OtWorkerQueue *queue); + +void ot_worker_queue_hold (OtWorkerQueue *queue); +void ot_worker_queue_release (OtWorkerQueue *queue); + +void ot_worker_queue_set_idle_callback (OtWorkerQueue *queue, + GMainContext *context, + OtWorkerQueueIdleFunc idle_callback, + gpointer data); + +void ot_worker_queue_push (OtWorkerQueue *queue, + gpointer data); + +void ot_worker_queue_unref (OtWorkerQueue *queue); + +G_END_DECLS + +#endif diff --git a/src/libotutil/otutil.h b/src/libotutil/otutil.h index c6bb4b31..841ee7b7 100644 --- a/src/libotutil/otutil.h +++ b/src/libotutil/otutil.h @@ -35,6 +35,7 @@ } \ } G_STMT_END; +#include #include #include #include diff --git a/src/ostree/ostree-pull.c b/src/ostree/ostree-pull.c index 3ccd598d..af06bdbf 100644 --- a/src/ostree/ostree-pull.c +++ b/src/ostree/ostree-pull.c @@ -28,9 +28,16 @@ * * Pull refs * For each ref: - * Pull commit + * Queue scan of commit + * + * Mainloop: + * Process requests, await idle scan + * + * Async queue: + * Scan commit + * If already cached, recursively scan content + * If not, queue fetch * - * Pull commits: * For each commit: * Verify checksum * Import @@ -66,14 +73,13 @@ #include "ostree-fetcher.h" + gboolean verbose; gboolean opt_related; -gint opt_depth; static GOptionEntry options[] = { { "verbose", 'v', 0, G_OPTION_ARG_NONE, &verbose, "Show more information", NULL }, { "related", 0, 0, G_OPTION_ARG_NONE, &opt_related, "Download related commits", NULL }, - { "depth", 0, 0, G_OPTION_ARG_INT, &opt_depth, "Download parent commits up to this depth (default: 0)", NULL }, { NULL }, }; @@ -87,12 +93,20 @@ typedef struct { GHashTable *file_checksums_to_fetch; GMainLoop *loop; + GCancellable *cancellable; /* Used in meta fetch phase */ + gboolean fetching_metadata; + volatile gint n_scanned_metadata; + guint n_fetched_metadata; guint outstanding_uri_requests; - guint outstanding_meta_requests; + GThread *metadata_scan_thread; + OtWorkerQueue *metadata_objects_to_scan; + GHashTable *scanned_metadata; /* Maps object name to itself */ + /* Used in content fetch phase */ + guint n_fetched_content; guint outstanding_filemeta_requests; guint outstanding_filecontent_requests; guint outstanding_checksum_requests; @@ -110,6 +124,18 @@ suburi_new (SoupURI *base, const char *first, ...) G_GNUC_NULL_TERMINATED; +static gboolean scan_one_metadata_object (OtPullData *pull_data, + const guchar *csum, + OstreeObjectType objtype, + guint recursion_depth, + GCancellable *cancellable, + GError **error); +static gboolean scan_one_metadata_object_v_name (OtPullData *pull_data, + GVariant *object, + GCancellable *cancellable, + GError **error); + + static SoupURI * suburi_new (SoupURI *base, const char *first, @@ -152,6 +178,13 @@ uri_fetch_update_status (gpointer user_data) status = g_string_new (""); + if (pull_data->fetching_metadata) + { + g_string_append_printf (status, "Metadata phase: %u fetched, %u scanned; ", + g_atomic_int_get (&pull_data->n_fetched_metadata), + g_atomic_int_get (&pull_data->n_scanned_metadata)); + } + if (pull_data->loose_files != NULL) g_string_append_printf (status, "%u loose files to fetch: ", g_hash_table_size (pull_data->loose_files) @@ -162,6 +195,7 @@ uri_fetch_update_status (gpointer user_data) g_string_append_printf (status, "Calculating %u checksums; ", pull_data->outstanding_checksum_requests); + fetcher_status = ostree_fetcher_query_state_text (pull_data->fetcher); g_string_append (status, fetcher_status); if (status->len > pull_data->last_padding) @@ -183,16 +217,9 @@ uri_fetch_update_status (gpointer user_data) } static void -check_outstanding_requests_handle_error (OtPullData *pull_data, - GError *error) +throw_async_error (OtPullData *pull_data, + GError *error) { - if (pull_data->outstanding_uri_requests == 0 && - pull_data->outstanding_meta_requests == 0 && - pull_data->outstanding_filemeta_requests == 0 && - pull_data->outstanding_filecontent_requests == 0 && - pull_data->outstanding_checksum_requests == 0 && - (pull_data->loose_files == NULL || g_hash_table_size (pull_data->loose_files) == 0)) - g_main_loop_quit (pull_data->loop); if (error) { if (!pull_data->caught_error) @@ -209,6 +236,19 @@ check_outstanding_requests_handle_error (OtPullData *pull_data, } static void +check_outstanding_requests_handle_error (OtPullData *pull_data, + GError *error) +{ + if (pull_data->outstanding_uri_requests == 0 && + pull_data->outstanding_filemeta_requests == 0 && + pull_data->outstanding_filecontent_requests == 0 && + pull_data->outstanding_checksum_requests == 0 && + (pull_data->loose_files == NULL || g_hash_table_size (pull_data->loose_files) == 0)) + g_main_loop_quit (pull_data->loop); + throw_async_error (pull_data, error); +} + +static gboolean run_mainloop_monitor_fetcher (OtPullData *pull_data) { GSource *update_timeout = NULL; @@ -229,6 +269,8 @@ run_mainloop_monitor_fetcher (OtPullData *pull_data) g_print ("\n"); g_source_destroy (update_timeout); } + + return !pull_data->caught_error; } typedef struct { @@ -276,9 +318,7 @@ fetch_uri (OtPullData *pull_data, ostree_fetcher_request_uri_async (pull_data->fetcher, uri, cancellable, uri_fetch_on_complete, &fetch_data); - run_mainloop_monitor_fetcher (pull_data); - - if (pull_data->caught_error) + if (!run_mainloop_monitor_fetcher (pull_data)) goto out; ret = TRUE; @@ -321,103 +361,11 @@ fetch_uri_contents_utf8 (OtPullData *pull_data, } static gboolean -fetch_loose_object (OtPullData *pull_data, - const char *checksum, - OstreeObjectType objtype, - GFile **out_temp_path, - GCancellable *cancellable, - GError **error) -{ - gboolean ret = FALSE; - ot_lfree char *objpath = NULL; - ot_lobj GFile *ret_temp_path = NULL; - SoupURI *obj_uri = NULL; - - objpath = ostree_get_relative_object_path (checksum, objtype, - pull_data->remote_mode == OSTREE_REPO_MODE_ARCHIVE_Z); - obj_uri = suburi_new (pull_data->base_uri, objpath, NULL); - - if (!fetch_uri (pull_data, obj_uri, ostree_object_type_to_string (objtype), &ret_temp_path, - cancellable, error)) - goto out; - - ret = TRUE; - ot_transfer_out_value (out_temp_path, &ret_temp_path); - out: - if (obj_uri) - soup_uri_free (obj_uri); - return ret; -} - -static gboolean -fetch_and_store_metadata (OtPullData *pull_data, - const char *checksum, - OstreeObjectType objtype, - GVariant **out_variant, - GCancellable *cancellable, - GError **error) -{ - gboolean ret = FALSE; - gboolean is_stored; - ot_lvariant GVariant *ret_variant = NULL; - ot_lobj GFile *temp_path = NULL; - ot_lvariant GVariant *metadata = NULL; - - g_assert (OSTREE_OBJECT_TYPE_IS_META (objtype)); - - if (!ostree_repo_has_object (pull_data->repo, objtype, checksum, &is_stored, - cancellable, error)) - goto out; - - if (!is_stored) - { - ot_lvariant GVariant *tmp_metadata = NULL; - const GVariantType *vtype; - - if (!fetch_loose_object (pull_data, checksum, objtype, &temp_path, cancellable, error)) - goto out; - - switch (objtype) - { - case OSTREE_OBJECT_TYPE_DIR_TREE: - vtype = OSTREE_TREE_GVARIANT_FORMAT; - break; - case OSTREE_OBJECT_TYPE_DIR_META: - vtype = OSTREE_DIRMETA_GVARIANT_FORMAT; - break; - case OSTREE_OBJECT_TYPE_COMMIT: - vtype = OSTREE_COMMIT_GVARIANT_FORMAT; - break; - default: - g_assert_not_reached (); - } - - if (!ot_util_variant_map (temp_path, vtype, FALSE, &tmp_metadata, error)) - goto out; - - if (!ostree_repo_stage_metadata (pull_data->repo, objtype, checksum, tmp_metadata, NULL, - cancellable, error)) - goto out; - } - - if (!ostree_repo_load_variant (pull_data->repo, objtype, checksum, - &ret_variant, error)) - goto out; - - ret = TRUE; - ot_transfer_out_value (out_variant, &ret_variant); - out: - if (temp_path) - (void) ot_gfile_unlink (temp_path, NULL, NULL); - return ret; -} - -static gboolean -fetch_and_store_tree_metadata_recurse (OtPullData *pull_data, - int depth, - const char *rev, - GCancellable *cancellable, - GError **error) +scan_dirtree_object (OtPullData *pull_data, + const char *checksum, + int recursion_depth, + GCancellable *cancellable, + GError **error) { gboolean ret = FALSE; int i, n; @@ -426,15 +374,15 @@ fetch_and_store_tree_metadata_recurse (OtPullData *pull_data, ot_lvariant GVariant *dirs_variant = NULL; ot_lobj GFile *stored_path = NULL; - if (depth > OSTREE_MAX_RECURSION) + if (recursion_depth > OSTREE_MAX_RECURSION) { g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "Exceeded maximum recursion"); goto out; } - if (!fetch_and_store_metadata (pull_data, rev, OSTREE_OBJECT_TYPE_DIR_TREE, - &tree, cancellable, error)) + if (!ostree_repo_load_variant (pull_data->repo, OSTREE_OBJECT_TYPE_DIR_TREE, checksum, + &tree, error)) goto out; /* PARSE OSTREE_SERIALIZED_TREE_VARIANT */ @@ -473,15 +421,14 @@ fetch_and_store_tree_metadata_recurse (OtPullData *pull_data, if (!ot_util_filename_validate (dirname, error)) goto out; - g_free (tmp_checksum); - tmp_checksum = ostree_checksum_from_bytes_v (meta_csum); - if (!fetch_and_store_metadata (pull_data, tmp_checksum, OSTREE_OBJECT_TYPE_DIR_META, - NULL, cancellable, error)) + if (!scan_one_metadata_object (pull_data, ostree_checksum_bytes_peek (tree_csum), + OSTREE_OBJECT_TYPE_DIR_TREE, recursion_depth + 1, + cancellable, error)) goto out; - - g_free (tmp_checksum); - tmp_checksum = ostree_checksum_from_bytes_v (tree_csum); - if (!fetch_and_store_tree_metadata_recurse (pull_data, depth+1, tmp_checksum, cancellable, error)) + + if (!scan_one_metadata_object (pull_data, ostree_checksum_bytes_peek (meta_csum), + OSTREE_OBJECT_TYPE_DIR_META, recursion_depth + 1, + cancellable, error)) goto out; } @@ -490,94 +437,6 @@ fetch_and_store_tree_metadata_recurse (OtPullData *pull_data, return ret; } -static gboolean -fetch_and_store_commit_metadata_recurse (OtPullData *pull_data, - int parent_depth, - int related_depth, - const char *rev, - GCancellable *cancellable, - GError **error) -{ - gboolean ret = FALSE; - ot_lvariant GVariant *commit = NULL; - ot_lvariant GVariant *related_objects = NULL; - ot_lvariant GVariant *tree_contents_csum = NULL; - ot_lvariant GVariant *tree_meta_csum = NULL; - ot_lfree char *tmp_checksum = NULL; - GVariantIter *iter = NULL; - - if (!fetch_and_store_metadata (pull_data, rev, OSTREE_OBJECT_TYPE_COMMIT, - &commit, cancellable, error)) - goto out; - - /* PARSE OSTREE_SERIALIZED_COMMIT_VARIANT */ - g_variant_get_child (commit, 6, "@ay", &tree_contents_csum); - g_variant_get_child (commit, 7, "@ay", &tree_meta_csum); - - g_free (tmp_checksum); - tmp_checksum = ostree_checksum_from_bytes_v (tree_meta_csum); - if (!fetch_and_store_metadata (pull_data, tmp_checksum, OSTREE_OBJECT_TYPE_DIR_META, - NULL, cancellable, error)) - goto out; - - g_free (tmp_checksum); - tmp_checksum = ostree_checksum_from_bytes_v (tree_contents_csum); - if (!fetch_and_store_tree_metadata_recurse (pull_data, 0, tmp_checksum, - cancellable, error)) - goto out; - - if (opt_related) - { - const char *name; - ot_lvariant GVariant *csum_v = NULL; - - if (parent_depth > OSTREE_MAX_RECURSION - || related_depth > OSTREE_MAX_RECURSION) - { - g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, - "Exceeded maximum recursion"); - goto out; - } - - related_objects = g_variant_get_child_value (commit, 2); - iter = g_variant_iter_new (related_objects); - - while (g_variant_iter_loop (iter, "(&s@ay)", &name, &csum_v)) - { - ot_lfree char *checksum = ostree_checksum_from_bytes_v (csum_v); - - /* Pass opt_depth here to ensure we aren't fetching parents of related */ - if (!fetch_and_store_commit_metadata_recurse (pull_data, opt_depth, - related_depth + 1, checksum, - cancellable, error)) - goto out; - } - } - - if (parent_depth < opt_depth) - { - ot_lvariant GVariant *parent_csum_v = NULL; - - parent_csum_v = g_variant_get_child_value (commit, 1); - - if (g_variant_n_children (parent_csum_v) > 0) - { - ot_lfree char *checksum = ostree_checksum_from_bytes_v (parent_csum_v); - - if (!fetch_and_store_commit_metadata_recurse (pull_data, parent_depth + 1, - 0, checksum, - cancellable, error)) - goto out; - } - } - - ret = TRUE; - out: - if (iter) - g_variant_iter_free (iter); - return ret; -} - static gboolean fetch_ref_contents (OtPullData *pull_data, const char *ref, @@ -702,6 +561,7 @@ content_fetch_on_checksum_complete (GObject *object, cancellable, error)) goto out; + data->pull_data->n_fetched_content++; out: data->pull_data->outstanding_checksum_requests--; check_outstanding_requests_handle_error (data->pull_data, local_error); @@ -918,9 +778,7 @@ fetch_content (OtPullData *pull_data, { enqueue_loose_meta_requests (pull_data); - run_mainloop_monitor_fetcher (pull_data); - - if (pull_data->caught_error) + if (!run_mainloop_monitor_fetcher (pull_data)) goto out; } @@ -935,6 +793,299 @@ fetch_content (OtPullData *pull_data, return ret; } +typedef struct { + OtPullData *pull_data; + GVariant *object; +} IdleFetchMetadataObjectData; + +static void +meta_fetch_on_complete (GObject *object, + GAsyncResult *result, + gpointer user_data) +{ + IdleFetchMetadataObjectData *fetch_data = user_data; + OtPullData *pull_data = fetch_data->pull_data; + ot_lobj GFile *temp_path = NULL; + ot_lvariant GVariant *metadata = NULL; + const char *checksum; + OstreeObjectType objtype; + GError *local_error = NULL; + GError **error = &local_error; + + temp_path = ostree_fetcher_request_uri_finish ((OstreeFetcher*)object, result, error); + if (!temp_path) + goto out; + + ostree_object_name_deserialize (fetch_data->object, &checksum, &objtype); + + if (!ot_util_variant_map (temp_path, ostree_metadata_variant_type (objtype), + FALSE, &metadata, error)) + goto out; + + if (!ostree_repo_stage_metadata (pull_data->repo, objtype, checksum, metadata, (guchar**)NULL, + pull_data->cancellable, error)) + goto out; + + pull_data->n_fetched_metadata++; + + ot_worker_queue_push (pull_data->metadata_objects_to_scan, + g_variant_ref (fetch_data->object)); + ot_worker_queue_release (pull_data->metadata_objects_to_scan); + + out: + (void) ot_gfile_unlink (temp_path, NULL, NULL); + throw_async_error (pull_data, local_error); + g_variant_unref (fetch_data->object); + g_free (fetch_data); +} + +static gboolean +idle_fetch_metadata_object (gpointer data) +{ + IdleFetchMetadataObjectData *fetch_data = data; + OtPullData *pull_data = fetch_data->pull_data; + ot_lfree char *objpath = NULL; + const char *checksum; + OstreeObjectType objtype; + SoupURI *obj_uri = NULL; + gboolean compressed; + + compressed = pull_data->remote_mode == OSTREE_REPO_MODE_ARCHIVE_Z; + + ostree_object_name_deserialize (fetch_data->object, &checksum, &objtype); + + objpath = ostree_get_relative_object_path (checksum, objtype, compressed); + obj_uri = suburi_new (pull_data->base_uri, objpath, NULL); + + ostree_fetcher_request_uri_async (pull_data->fetcher, obj_uri, pull_data->cancellable, + meta_fetch_on_complete, fetch_data); + soup_uri_free (obj_uri); + + return FALSE; +} + +/** + * queue_metadata_object_fetch: + * + * Pass a request to the main thread to fetch a metadata object. + */ +static void +queue_metadata_object_fetch (OtPullData *pull_data, + GVariant *object) +{ + IdleFetchMetadataObjectData *fetch_data = g_new (IdleFetchMetadataObjectData, 1); + fetch_data->pull_data = pull_data; + fetch_data->object = g_variant_ref (object); + ot_worker_queue_hold (fetch_data->pull_data->metadata_objects_to_scan); + g_idle_add (idle_fetch_metadata_object, fetch_data); +} + +static gboolean +scan_commit_object (OtPullData *pull_data, + const char *checksum, + guint recursion_depth, + GCancellable *cancellable, + GError **error) +{ + gboolean ret = FALSE; + ot_lvariant GVariant *commit = NULL; + ot_lvariant GVariant *related_objects = NULL; + ot_lvariant GVariant *tree_contents_csum = NULL; + ot_lvariant GVariant *tree_meta_csum = NULL; + ot_lfree char *tmp_checksum = NULL; + GVariantIter *iter = NULL; + + if (recursion_depth > OSTREE_MAX_RECURSION) + { + g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, + "Exceeded maximum recursion"); + goto out; + } + + if (!ostree_repo_load_variant (pull_data->repo, OSTREE_OBJECT_TYPE_COMMIT, checksum, + &commit, error)) + goto out; + + /* PARSE OSTREE_SERIALIZED_COMMIT_VARIANT */ + g_variant_get_child (commit, 6, "@ay", &tree_contents_csum); + g_variant_get_child (commit, 7, "@ay", &tree_meta_csum); + + if (!scan_one_metadata_object (pull_data, ostree_checksum_bytes_peek (tree_contents_csum), + OSTREE_OBJECT_TYPE_DIR_TREE, recursion_depth + 1, + cancellable, error)) + goto out; + + if (!scan_one_metadata_object (pull_data, ostree_checksum_bytes_peek (tree_meta_csum), + OSTREE_OBJECT_TYPE_DIR_META, recursion_depth + 1, + cancellable, error)) + goto out; + + if (opt_related) + { + const char *name; + ot_lvariant GVariant *csum_v = NULL; + + related_objects = g_variant_get_child_value (commit, 2); + iter = g_variant_iter_new (related_objects); + + while (g_variant_iter_loop (iter, "(&s@ay)", &name, &csum_v)) + { + if (!scan_one_metadata_object (pull_data, ostree_checksum_bytes_peek (csum_v), + OSTREE_OBJECT_TYPE_COMMIT, recursion_depth + 1, + cancellable, error)) + goto out; + } + } + + ret = TRUE; + out: + if (iter) + g_variant_iter_free (iter); + return ret; +} + +static gboolean +scan_one_metadata_object (OtPullData *pull_data, + const guchar *csum, + OstreeObjectType objtype, + guint recursion_depth, + GCancellable *cancellable, + GError **error) +{ + gboolean ret = FALSE; + ot_lvariant GVariant *object = NULL; + ot_lfree char *tmp_checksum = NULL; + gboolean is_stored; + + tmp_checksum = ostree_checksum_from_bytes (csum); + object = ostree_object_name_serialize (tmp_checksum, objtype); + + if (g_hash_table_lookup (pull_data->scanned_metadata, object)) + return TRUE; + + if (!ostree_repo_has_object (pull_data->repo, objtype, tmp_checksum, &is_stored, + cancellable, error)) + goto out; + + if (!is_stored) + { + queue_metadata_object_fetch (pull_data, object); + } + else + { + switch (objtype) + { + case OSTREE_OBJECT_TYPE_COMMIT: + if (!scan_commit_object (pull_data, tmp_checksum, recursion_depth, + pull_data->cancellable, error)) + goto out; + break; + case OSTREE_OBJECT_TYPE_DIR_META: + break; + case OSTREE_OBJECT_TYPE_DIR_TREE: + if (!scan_dirtree_object (pull_data, tmp_checksum, recursion_depth, + pull_data->cancellable, error)) + goto out; + break; + case OSTREE_OBJECT_TYPE_FILE: + g_assert_not_reached (); + break; + } + g_hash_table_insert (pull_data->scanned_metadata, g_variant_ref (object), object); + g_atomic_int_inc (&pull_data->n_scanned_metadata); + } + + + ret = TRUE; + out: + return ret; +} + +static gboolean +scan_one_metadata_object_v_name (OtPullData *pull_data, + GVariant *object, + GCancellable *cancellable, + GError **error) +{ + OstreeObjectType objtype; + const char *checksum = NULL; + ot_lfree guchar *csum = NULL; + + ostree_object_name_deserialize (object, &checksum, &objtype); + csum = ostree_checksum_to_bytes (checksum); + + return scan_one_metadata_object (pull_data, csum, objtype, 0, + cancellable, error); +} + +typedef struct { + OtPullData *pull_data; + GError *error; +} IdleThrowErrorData; + +static gboolean +idle_throw_error (gpointer user_data) +{ + IdleThrowErrorData *data = user_data; + + throw_async_error (data->pull_data, data->error); + + g_free (data); + return FALSE; +} + +/** + * scan_one_metadata_object_dispatch: + * + * Called from the metadatascan worker thread. If we're missing an + * object from one of them, we queue a request to the main thread to + * fetch it. When it's fetched, we get passed the object back and + * scan it. + */ +static void +scan_one_metadata_object_dispatch (gpointer item, + gpointer user_data) +{ + OtPullData *pull_data = user_data; + GError *local_error = NULL; + GError **error = &local_error; + ot_lvariant GVariant *v_item = NULL; + + v_item = item; + + if (!scan_one_metadata_object_v_name (pull_data, v_item, + pull_data->cancellable, error)) + goto out; + + out: + if (local_error) + { + IdleThrowErrorData *throwdata = g_new0 (IdleThrowErrorData, 1); + throwdata->pull_data = pull_data; + throwdata->error = local_error; + g_main_context_invoke (NULL, idle_throw_error, throwdata); + } +} + +static void +on_metadata_worker_idle (gpointer user_data) +{ + OtPullData *pull_data = user_data; + + g_main_loop_quit (pull_data->loop); +} + +static gboolean +idle_start_worker (gpointer user_data) +{ + OtPullData *pull_data = user_data; + + ot_worker_queue_start (pull_data->metadata_objects_to_scan); + + return FALSE; +} + + static gboolean parse_ref_summary (const char *contents, GHashTable **out_refs, @@ -1104,6 +1255,9 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error) pull_data->repo = repo; pull_data->file_checksums_to_fetch = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL); + pull_data->scanned_metadata = g_hash_table_new_full (ostree_hash_object_name, g_variant_equal, + (GDestroyNotify)g_variant_unref, NULL); + if (argc < 2) { ot_util_usage_error (context, "REMOTE must be specified", error); @@ -1232,16 +1386,23 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error) if (!ostree_repo_prepare_transaction (pull_data->repo, NULL, error)) goto out; + pull_data->fetching_metadata = TRUE; + g_print ("Analyzing objects needed...\n"); + pull_data->metadata_objects_to_scan = ot_worker_queue_new ("metadatascan", + scan_one_metadata_object_dispatch, + pull_data); + ot_worker_queue_set_idle_callback (pull_data->metadata_objects_to_scan, + NULL, on_metadata_worker_idle, pull_data); + g_hash_table_iter_init (&hash_iter, commits_to_fetch); while (g_hash_table_iter_next (&hash_iter, &key, &value)) { const char *commit = value; - - if (!fetch_and_store_commit_metadata_recurse (pull_data, 0, 0, commit, - cancellable, error)) - goto out; + + ot_worker_queue_push (pull_data->metadata_objects_to_scan, + ostree_object_name_serialize (commit, OSTREE_OBJECT_TYPE_COMMIT)); } g_hash_table_iter_init (&hash_iter, requested_refs_to_fetch); @@ -1259,26 +1420,26 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error) if (!ostree_repo_resolve_rev (pull_data->repo, remote_ref, TRUE, &original_rev, error)) goto out; - /* Only skip traversal if depth == 0; otherwise, we have to - * handle the case where the user specified a bigger depth than - * they originally did. - */ - if (original_rev && strcmp (sha256, original_rev) == 0 && opt_depth == 0) + if (original_rev && strcmp (sha256, original_rev) == 0) { g_print ("No changes in %s\n", remote_ref); } else { - if (!ostree_validate_checksum_string (sha256, error)) - goto out; - - if (!fetch_and_store_commit_metadata_recurse (pull_data, 0, 0, sha256, cancellable, error)) - goto out; - + ot_worker_queue_push (pull_data->metadata_objects_to_scan, + ostree_object_name_serialize (sha256, OSTREE_OBJECT_TYPE_COMMIT)); g_hash_table_insert (updated_refs, g_strdup (ref), g_strdup (sha256)); } } + g_idle_add (idle_start_worker, pull_data); + + /* Handle queued metadata requests here */ + if (!run_mainloop_monitor_fetcher (pull_data)) + goto out; + + pull_data->fetching_metadata = FALSE; + if (!fetch_content (pull_data, cancellable, error)) goto out; @@ -1303,7 +1464,15 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error) bytes_transferred = ostree_fetcher_bytes_transferred (pull_data->fetcher); if (bytes_transferred > 0) { - g_print ("%" G_GUINT64_FORMAT " KiB transferred\n", (guint64)(bytes_transferred / 1024.0)); + guint shift; + if (bytes_transferred < 1024) + shift = 1; + else + shift = 1024; + g_print ("%u metadata, %u content objects fetched; %" G_GUINT64_FORMAT " %s transferred\n", + pull_data->n_fetched_metadata, pull_data->n_fetched_content, + (guint64)(bytes_transferred / shift), + shift == 1 ? "B" : "KiB"); } ret = TRUE; @@ -1317,6 +1486,8 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error) g_free (pull_data->remote_name); if (pull_data->base_uri) soup_uri_free (pull_data->base_uri); + g_clear_pointer (&pull_data->metadata_objects_to_scan, (GDestroyNotify) ot_worker_queue_unref); + g_clear_pointer (&pull_data->scanned_metadata, (GDestroyNotify) g_hash_table_unref); g_clear_pointer (&pull_data->file_checksums_to_fetch, (GDestroyNotify) g_hash_table_unref); g_clear_pointer (&remote_config, (GDestroyNotify) g_key_file_unref); if (summary_uri)