From 4e51701bea3458bac654e89c3f2c1b5702e3db7e Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Thu, 4 Oct 2012 15:29:39 -0400 Subject: [PATCH] pull: Merge metadata and content fetch phases This is a notable speedup when the metadata scanner is working, and we don't have a lot of traffic; we can pull down data at the same time. --- src/ostree/ostree-pull.c | 242 ++++++++++++++------------------------- 1 file changed, 86 insertions(+), 156 deletions(-) diff --git a/src/ostree/ostree-pull.c b/src/ostree/ostree-pull.c index af06bdbf..247fa497 100644 --- a/src/ostree/ostree-pull.c +++ b/src/ostree/ostree-pull.c @@ -90,27 +90,25 @@ typedef struct { OstreeFetcher *fetcher; SoupURI *base_uri; - GHashTable *file_checksums_to_fetch; - GMainLoop *loop; GCancellable *cancellable; - /* Used in meta fetch phase */ - gboolean fetching_metadata; + gboolean metadata_scan_active; volatile gint n_scanned_metadata; + volatile gint n_requested_metadata; + volatile gint n_requested_content; guint n_fetched_metadata; guint outstanding_uri_requests; GThread *metadata_scan_thread; OtWorkerQueue *metadata_objects_to_scan; GHashTable *scanned_metadata; /* Maps object name to itself */ + GHashTable *requested_content; /* Maps object name to itself */ - /* Used in content fetch phase */ guint n_fetched_content; guint outstanding_filemeta_requests; guint outstanding_filecontent_requests; guint outstanding_checksum_requests; - GHashTable *loose_files; GError **async_error; gboolean caught_error; @@ -119,6 +117,17 @@ typedef struct { guint last_padding; } OtPullData; +typedef struct { + OtPullData *pull_data; + + gboolean fetching_content; + + GFile *meta_path; + GFile *content_path; + + char *checksum; +} OtFetchOneContentItemData; + static SoupURI * suburi_new (SoupURI *base, const char *first, @@ -178,24 +187,16 @@ uri_fetch_update_status (gpointer user_data) status = g_string_new (""); - if (pull_data->fetching_metadata) - { - g_string_append_printf (status, "Metadata phase: %u fetched, %u scanned; ", - g_atomic_int_get (&pull_data->n_fetched_metadata), - g_atomic_int_get (&pull_data->n_scanned_metadata)); - } - - if (pull_data->loose_files != NULL) - g_string_append_printf (status, "%u loose files to fetch: ", - g_hash_table_size (pull_data->loose_files) - + pull_data->outstanding_filemeta_requests - + pull_data->outstanding_filecontent_requests); + g_string_append_printf (status, "%u/%u metadata %u/%u content fetched; ", + g_atomic_int_get (&pull_data->n_fetched_metadata), + g_atomic_int_get (&pull_data->n_requested_metadata), + pull_data->n_fetched_content, + g_atomic_int_get (&pull_data->n_requested_content)); if (pull_data->outstanding_checksum_requests > 0) g_string_append_printf (status, "Calculating %u checksums; ", pull_data->outstanding_checksum_requests); - fetcher_status = ostree_fetcher_query_state_text (pull_data->fetcher); g_string_append (status, fetcher_status); if (status->len > pull_data->last_padding) @@ -239,11 +240,11 @@ static void check_outstanding_requests_handle_error (OtPullData *pull_data, GError *error) { - if (pull_data->outstanding_uri_requests == 0 && + if (!pull_data->metadata_scan_active && + pull_data->outstanding_uri_requests == 0 && pull_data->outstanding_filemeta_requests == 0 && pull_data->outstanding_filecontent_requests == 0 && - pull_data->outstanding_checksum_requests == 0 && - (pull_data->loose_files == NULL || g_hash_table_size (pull_data->loose_files) == 0)) + pull_data->outstanding_checksum_requests == 0) g_main_loop_quit (pull_data->loop); throw_async_error (pull_data, error); } @@ -360,6 +361,9 @@ fetch_uri_contents_utf8 (OtPullData *pull_data, return ret; } +static gboolean +idle_queue_content_request (gpointer user_data); + static gboolean scan_dirtree_object (OtPullData *pull_data, const char *checksum, @@ -369,6 +373,7 @@ scan_dirtree_object (OtPullData *pull_data, { gboolean ret = FALSE; int i, n; + gboolean compressed = pull_data->remote_mode == OSTREE_REPO_MODE_ARCHIVE_Z; ot_lvariant GVariant *tree = NULL; ot_lvariant GVariant *files_variant = NULL; ot_lvariant GVariant *dirs_variant = NULL; @@ -393,18 +398,39 @@ scan_dirtree_object (OtPullData *pull_data, for (i = 0; i < n; i++) { const char *filename; + gboolean file_is_stored; + OtFetchOneContentItemData *idle_fetch_data; ot_lvariant GVariant *csum = NULL; + ot_lfree char *file_checksum; g_variant_get_child (files_variant, i, "(&s@ay)", &filename, &csum); if (!ot_util_filename_validate (filename, error)) goto out; - { - char *duped_key = ostree_checksum_from_bytes_v (csum); - g_hash_table_replace (pull_data->file_checksums_to_fetch, - duped_key, duped_key); - } + file_checksum = ostree_checksum_from_bytes_v (csum); + + if (!ostree_repo_has_object (pull_data->repo, OSTREE_OBJECT_TYPE_FILE, file_checksum, + &file_is_stored, cancellable, error)) + goto out; + + if (!file_is_stored && !g_hash_table_lookup (pull_data->requested_content, file_checksum)) + { + char *duped_checksum; + + idle_fetch_data = g_new0 (OtFetchOneContentItemData, 1); + idle_fetch_data->pull_data = pull_data; + idle_fetch_data->checksum = file_checksum; + idle_fetch_data->fetching_content = compressed; + file_checksum = NULL; /* Transfer ownership */ + + duped_checksum = g_strdup (idle_fetch_data->checksum); + g_hash_table_insert (pull_data->requested_content, duped_checksum, duped_checksum); + + g_atomic_int_inc (&pull_data->n_requested_content); + ot_worker_queue_hold (pull_data->metadata_objects_to_scan); + g_main_context_invoke (NULL, idle_queue_content_request, idle_fetch_data); + } } n = g_variant_n_children (dirs_variant); @@ -466,17 +492,6 @@ fetch_ref_contents (OtPullData *pull_data, return ret; } -typedef struct { - OtPullData *pull_data; - - gboolean fetching_content; - - GFile *meta_path; - GFile *content_path; - - char *checksum; -} OtFetchOneContentItemData; - static void destroy_fetch_one_content_item_data (OtFetchOneContentItemData *data) { @@ -568,9 +583,6 @@ content_fetch_on_checksum_complete (GObject *object, destroy_fetch_one_content_item_data (data); } -static void -enqueue_loose_meta_requests (OtPullData *pull_data); - static void content_fetch_on_complete (GObject *object, GAsyncResult *result, @@ -682,115 +694,35 @@ content_fetch_on_complete (GObject *object, if (was_content_fetch) data->pull_data->outstanding_filecontent_requests--; else - { - data->pull_data->outstanding_filemeta_requests--; - enqueue_loose_meta_requests (data->pull_data); - } + data->pull_data->outstanding_filemeta_requests--; check_outstanding_requests_handle_error (data->pull_data, local_error); } -static void -enqueue_loose_meta_requests (OtPullData *pull_data) -{ - GHashTableIter hash_iter; - gpointer key, value; - GCancellable *cancellable = NULL; - - g_hash_table_iter_init (&hash_iter, pull_data->loose_files); - while (g_hash_table_iter_next (&hash_iter, &key, &value)) - { - const char *checksum = key; - ot_lfree char *objpath = NULL; - SoupURI *obj_uri = NULL; - OtFetchOneContentItemData *one_item_data; - gboolean compressed = pull_data->remote_mode == OSTREE_REPO_MODE_ARCHIVE_Z; - - one_item_data = g_new0 (OtFetchOneContentItemData, 1); - one_item_data->pull_data = pull_data; - one_item_data->checksum = g_strdup (checksum); - one_item_data->fetching_content = compressed; - - objpath = ostree_get_relative_object_path (checksum, OSTREE_OBJECT_TYPE_FILE, compressed); - obj_uri = suburi_new (pull_data->base_uri, objpath, NULL); - - ostree_fetcher_request_uri_async (pull_data->fetcher, obj_uri, cancellable, - content_fetch_on_complete, one_item_data); - soup_uri_free (obj_uri); - - if (compressed) - pull_data->outstanding_filecontent_requests++; - else - pull_data->outstanding_filemeta_requests++; - g_hash_table_iter_remove (&hash_iter); - - /* Don't let too many requests queue up; when we're fetching - * files we need to process the actual content. - */ - if (pull_data->outstanding_filemeta_requests > 20) - break; - } -} - static gboolean -fetch_content (OtPullData *pull_data, - GCancellable *cancellable, - GError **error) +idle_queue_content_request (gpointer user_data) { - gboolean ret = FALSE; - GHashTableIter hash_iter; - gpointer key, value; - ot_lobj GFile *temp_path = NULL; - ot_lobj GFile *content_temp_path = NULL; - ot_lhash GHashTable *loose_files = NULL; - SoupURI *content_uri = NULL; - guint n_objects_to_fetch = 0; + OtFetchOneContentItemData *data = user_data; + OtPullData *pull_data = data->pull_data; + const char *checksum = data->checksum; + ot_lfree char *objpath = NULL; + SoupURI *obj_uri = NULL; + gboolean compressed = pull_data->remote_mode == OSTREE_REPO_MODE_ARCHIVE_Z; + + objpath = ostree_get_relative_object_path (checksum, OSTREE_OBJECT_TYPE_FILE, compressed); + obj_uri = suburi_new (pull_data->base_uri, objpath, NULL); - loose_files = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL); + ostree_fetcher_request_uri_async (pull_data->fetcher, obj_uri, pull_data->cancellable, + content_fetch_on_complete, data); + soup_uri_free (obj_uri); + + if (compressed) + pull_data->outstanding_filecontent_requests++; + else + pull_data->outstanding_filemeta_requests++; + + ot_worker_queue_release (pull_data->metadata_objects_to_scan); - g_hash_table_iter_init (&hash_iter, pull_data->file_checksums_to_fetch); - while (g_hash_table_iter_next (&hash_iter, &key, &value)) - { - const char *checksum = key; - gboolean is_stored; - - if (!ostree_repo_has_object (pull_data->repo, OSTREE_OBJECT_TYPE_FILE, checksum, &is_stored, - cancellable, error)) - goto out; - - if (!is_stored) - { - char *key = g_strdup (checksum); - g_hash_table_insert (loose_files, key, key); - n_objects_to_fetch++; - } - } - - if (n_objects_to_fetch > 0) - g_print ("%u content objects to fetch\n", n_objects_to_fetch); - - if (g_hash_table_size (loose_files) > 0) - g_print ("Fetching %u loose objects\n", - g_hash_table_size (loose_files)); - - pull_data->loose_files = loose_files; - - if (g_hash_table_size (loose_files) > 0) - { - enqueue_loose_meta_requests (pull_data); - - if (!run_mainloop_monitor_fetcher (pull_data)) - goto out; - } - - ret = TRUE; - out: - if (content_uri) - soup_uri_free (content_uri); - if (temp_path) - (void) ot_gfile_unlink (temp_path, NULL, NULL); - if (content_temp_path) - (void) ot_gfile_unlink (content_temp_path, NULL, NULL); - return ret; + return FALSE; } typedef struct { @@ -969,6 +901,7 @@ scan_one_metadata_object (OtPullData *pull_data, if (!is_stored) { + g_atomic_int_inc (&pull_data->n_requested_metadata); queue_metadata_object_fetch (pull_data, object); } else @@ -1071,8 +1004,10 @@ static void on_metadata_worker_idle (gpointer user_data) { OtPullData *pull_data = user_data; + + pull_data->metadata_scan_active = FALSE; - g_main_loop_quit (pull_data->loop); + check_outstanding_requests_handle_error (pull_data, NULL); } static gboolean @@ -1085,7 +1020,6 @@ idle_start_worker (gpointer user_data) return FALSE; } - static gboolean parse_ref_summary (const char *contents, GHashTable **out_refs, @@ -1253,10 +1187,11 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error) pull_data->loop = g_main_loop_new (NULL, FALSE); pull_data->repo = repo; - pull_data->file_checksums_to_fetch = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL); pull_data->scanned_metadata = g_hash_table_new_full (ostree_hash_object_name, g_variant_equal, (GDestroyNotify)g_variant_unref, NULL); + pull_data->requested_content = g_hash_table_new_full (g_str_hash, g_str_equal, + (GDestroyNotify)g_free, NULL); if (argc < 2) { @@ -1386,9 +1321,7 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error) if (!ostree_repo_prepare_transaction (pull_data->repo, NULL, error)) goto out; - pull_data->fetching_metadata = TRUE; - - g_print ("Analyzing objects needed...\n"); + pull_data->metadata_scan_active = TRUE; pull_data->metadata_objects_to_scan = ot_worker_queue_new ("metadatascan", scan_one_metadata_object_dispatch, @@ -1434,15 +1367,12 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error) g_idle_add (idle_start_worker, pull_data); - /* Handle queued metadata requests here */ + /* Start metadata thread, which kicks off further metadata requests + * as well as content fetches. + */ if (!run_mainloop_monitor_fetcher (pull_data)) goto out; - pull_data->fetching_metadata = FALSE; - - if (!fetch_content (pull_data, cancellable, error)) - goto out; - if (!ostree_repo_commit_transaction (pull_data->repo, cancellable, error)) goto out; @@ -1488,7 +1418,7 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error) soup_uri_free (pull_data->base_uri); g_clear_pointer (&pull_data->metadata_objects_to_scan, (GDestroyNotify) ot_worker_queue_unref); g_clear_pointer (&pull_data->scanned_metadata, (GDestroyNotify) g_hash_table_unref); - g_clear_pointer (&pull_data->file_checksums_to_fetch, (GDestroyNotify) g_hash_table_unref); + g_clear_pointer (&pull_data->requested_content, (GDestroyNotify) g_hash_table_unref); g_clear_pointer (&remote_config, (GDestroyNotify) g_key_file_unref); if (summary_uri) soup_uri_free (summary_uri);