From d9f43cd2fbbd48b34c06552205eaf27eed1e1ae0 Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Tue, 3 Jan 2017 10:45:58 -0500 Subject: [PATCH] pull: Rework delta superblock fetches to be async For the pending libcurl port, the backend is a bit more sensitive to the main context setup. The delta superblock fetch here is a synchronous request in the section that's supposed to be async. Now, libsoup definitely supports mixing sync and async requests, but it wasn't hard to help the libcurl port here by making this one async. Now fetchers are either sync or async. Closes: #636 Approved by: jlebon --- src/libostree/ostree-repo-pull.c | 218 +++++++++++++++++-------------- 1 file changed, 120 insertions(+), 98 deletions(-) diff --git a/src/libostree/ostree-repo-pull.c b/src/libostree/ostree-repo-pull.c index 92efd592..54a3a922 100644 --- a/src/libostree/ostree-repo-pull.c +++ b/src/libostree/ostree-repo-pull.c @@ -67,6 +67,7 @@ typedef struct { gint n_scanned_metadata; gboolean gpg_verify; + gboolean require_static_deltas; gboolean gpg_verify_summary; gboolean has_tombstone_commits; @@ -178,6 +179,10 @@ update_progress (gpointer user_data) if (! pull_data->progress) return FALSE; + /* In dry run, we only emit progress once metadata is done */ + if (pull_data->dry_run && pull_data->n_outstanding_metadata_fetches > 0) + return TRUE; + outstanding_writes = pull_data->n_outstanding_content_write_requests + pull_data->n_outstanding_metadata_write_requests + pull_data->n_outstanding_deltapart_write_requests; @@ -1416,75 +1421,6 @@ load_remote_repo_config (OtPullData *pull_data, return ret; } -static gboolean -request_static_delta_superblock_sync (OtPullData *pull_data, - const char *from_revision, - const char *to_revision, - GVariant **out_delta_superblock, - GCancellable *cancellable, - GError **error) -{ - gboolean ret = FALSE; - g_autoptr(GVariant) ret_delta_superblock = NULL; - g_autofree char *delta_name = - _ostree_get_relative_static_delta_superblock_path (from_revision, to_revision); - g_autoptr(GBytes) delta_superblock_data = NULL; - - if (!_ostree_fetcher_mirrored_request_to_membuf (pull_data->fetcher, - pull_data->content_mirrorlist, - delta_name, FALSE, TRUE, - &delta_superblock_data, - OSTREE_MAX_METADATA_SIZE, - pull_data->cancellable, error)) - goto out; - - if (delta_superblock_data) - { - { - g_autofree gchar *delta = NULL; - g_autofree guchar *ret_csum = NULL; - guchar *summary_csum; - g_autoptr (GInputStream) summary_is = NULL; - - summary_is = g_memory_input_stream_new_from_data (g_bytes_get_data (delta_superblock_data, NULL), - g_bytes_get_size (delta_superblock_data), - NULL); - - if (!ot_gio_checksum_stream (summary_is, &ret_csum, cancellable, error)) - goto out; - - delta = g_strconcat (from_revision ? from_revision : "", from_revision ? "-" : "", to_revision, NULL); - summary_csum = g_hash_table_lookup (pull_data->summary_deltas_checksums, delta); - - /* At this point we've GPG verified the data, so in theory - * could trust that they provided the right data, but let's - * make this a hard error. - */ - if (pull_data->gpg_verify_summary && !summary_csum) - { - g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, - "GPG verification enabled, but no summary signatures found (use gpg-verify-summary=false in remote config to disable)"); - goto out; - } - - if (summary_csum && memcmp (summary_csum, ret_csum, 32)) - { - g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "Invalid checksum for static delta %s", delta); - goto out; - } - } - - ret_delta_superblock = g_variant_ref_sink (g_variant_new_from_bytes ((GVariantType*)OSTREE_STATIC_DELTA_SUPERBLOCK_FORMAT, - delta_superblock_data, FALSE)); - } - - ret = TRUE; - if (out_delta_superblock) - *out_delta_superblock = g_steal_pointer (&ret_delta_superblock); - out: - return ret; -} - static gboolean process_one_static_delta_fallback (OtPullData *pull_data, gboolean delta_byteswap, @@ -1749,6 +1685,100 @@ process_one_static_delta (OtPullData *pull_data, return ret; } +typedef struct { + OtPullData *pull_data; + char *from_revision; + char *to_revision; +} FetchDeltaSuperData; + +static void +on_superblock_fetched (GObject *src, + GAsyncResult *res, + gpointer data) + +{ + FetchDeltaSuperData *fdata = data; + OtPullData *pull_data = fdata->pull_data; + GError *local_error = NULL; + GError **error = &local_error; + g_autoptr(GBytes) delta_superblock_data = NULL; + const char *from_revision = fdata->from_revision; + const char *to_revision = fdata->to_revision; + + if (!_ostree_fetcher_request_to_membuf_finish ((OstreeFetcher*)src, + res, + &delta_superblock_data, + error)) + { + if (!g_error_matches (local_error, G_IO_ERROR, G_IO_ERROR_NOT_FOUND)) + goto out; + + g_clear_error (&local_error); + + if (pull_data->require_static_deltas) + { + g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, + "Static deltas required, but none found for %s to %s", + from_revision, to_revision); + goto out; + } + + queue_scan_one_metadata_object (pull_data, to_revision, OSTREE_OBJECT_TYPE_COMMIT, NULL, 0); + } + else + { + g_autofree gchar *delta = NULL; + g_autofree guchar *ret_csum = NULL; + guchar *summary_csum; + g_autoptr (GInputStream) summary_is = NULL; + g_autoptr(GVariant) delta_superblock = NULL; + + summary_is = g_memory_input_stream_new_from_data (g_bytes_get_data (delta_superblock_data, NULL), + g_bytes_get_size (delta_superblock_data), + NULL); + + if (!ot_gio_checksum_stream (summary_is, &ret_csum, pull_data->cancellable, error)) + goto out; + + delta = g_strconcat (from_revision ? from_revision : "", from_revision ? "-" : "", to_revision, NULL); + summary_csum = g_hash_table_lookup (pull_data->summary_deltas_checksums, delta); + + /* At this point we've GPG verified the data, so in theory + * could trust that they provided the right data, but let's + * make this a hard error. + */ + if (pull_data->gpg_verify_summary && !summary_csum) + { + g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, + "GPG verification enabled, but no summary signatures found (use gpg-verify-summary=false in remote config to disable)"); + goto out; + } + + if (summary_csum && memcmp (summary_csum, ret_csum, 32)) + { + g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "Invalid checksum for static delta %s", delta); + goto out; + } + + delta_superblock = g_variant_ref_sink (g_variant_new_from_bytes ((GVariantType*)OSTREE_STATIC_DELTA_SUPERBLOCK_FORMAT, + delta_superblock_data, FALSE)); + + g_ptr_array_add (pull_data->static_delta_superblocks, g_variant_ref (delta_superblock)); + if (!process_one_static_delta (pull_data, from_revision, to_revision, delta_superblock, + pull_data->cancellable, error)) + goto out; + } + + out: + g_free (fdata->from_revision); + g_free (fdata->to_revision); + g_free (fdata); + g_assert (pull_data->n_outstanding_metadata_fetches > 0); + pull_data->n_outstanding_metadata_fetches--; + pull_data->n_fetched_metadata++; + check_outstanding_requests_handle_error (pull_data, local_error); +} + static gboolean validate_variant_is_csum (GVariant *csum, GError **error) @@ -2344,7 +2374,6 @@ ostree_repo_pull_with_options (OstreeRepo *self, g_autofree char **override_commit_ids = NULL; GSource *update_timeout = NULL; gboolean disable_static_deltas = FALSE; - gboolean require_static_deltas = FALSE; gboolean opt_gpg_verify_set = FALSE; gboolean opt_gpg_verify_summary_set = FALSE; const char *url_override = NULL; @@ -2368,7 +2397,7 @@ ostree_repo_pull_with_options (OstreeRepo *self, g_variant_lookup (options, "gpg-verify-summary", "b", &pull_data->gpg_verify_summary); (void) g_variant_lookup (options, "depth", "i", &pull_data->maxdepth); (void) g_variant_lookup (options, "disable-static-deltas", "b", &disable_static_deltas); - (void) g_variant_lookup (options, "require-static-deltas", "b", &require_static_deltas); + (void) g_variant_lookup (options, "require-static-deltas", "b", &pull_data->require_static_deltas); (void) g_variant_lookup (options, "override-commit-ids", "^a&s", &override_commit_ids); (void) g_variant_lookup (options, "dry-run", "b", &pull_data->dry_run); (void) g_variant_lookup (options, "override-url", "&s", &url_override); @@ -2386,11 +2415,11 @@ ostree_repo_pull_with_options (OstreeRepo *self, for (i = 0; dirs_to_pull != NULL && dirs_to_pull[i] != NULL; i++) g_return_val_if_fail (dirs_to_pull[i][0] == '/', FALSE); - g_return_val_if_fail (!(disable_static_deltas && require_static_deltas), FALSE); + g_return_val_if_fail (!(disable_static_deltas && pull_data->require_static_deltas), FALSE); /* We only do dry runs with static deltas, because we don't really have any * in-advance information for bare fetches. */ - g_return_val_if_fail (!pull_data->dry_run || require_static_deltas, FALSE); + g_return_val_if_fail (!pull_data->dry_run || pull_data->require_static_deltas, FALSE); pull_data->is_mirror = (flags & OSTREE_REPO_PULL_FLAGS_MIRROR) > 0; pull_data->is_commit_only = (flags & OSTREE_REPO_PULL_FLAGS_COMMIT_ONLY) > 0; @@ -2655,7 +2684,7 @@ ostree_repo_pull_with_options (OstreeRepo *self, /* For local pulls, default to disabling static deltas so that the * exact object files are copied. */ - if (pull_data->remote_repo_local && !require_static_deltas) + if (pull_data->remote_repo_local && !pull_data->require_static_deltas) disable_static_deltas = TRUE; pull_data->static_delta_superblocks = g_ptr_array_new_with_free_func ((GDestroyNotify)g_variant_unref); @@ -2710,7 +2739,7 @@ ostree_repo_pull_with_options (OstreeRepo *self, goto out; } - if (!bytes_summary && require_static_deltas) + if (!bytes_summary && pull_data->require_static_deltas) { g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "Fetch configured to require static deltas, but no summary found"); @@ -2939,7 +2968,6 @@ ostree_repo_pull_with_options (OstreeRepo *self, g_autofree char *from_revision = NULL; const char *ref = key; const char *to_revision = value; - g_autoptr(GVariant) delta_superblock = NULL; if (!ostree_repo_resolve_rev (pull_data->repo, ref, TRUE, &from_revision, error)) @@ -2948,31 +2976,25 @@ ostree_repo_pull_with_options (OstreeRepo *self, if (!(disable_static_deltas || mirroring_into_archive || pull_data->is_commit_only) && (from_revision == NULL || g_strcmp0 (from_revision, to_revision) != 0)) { - if (!request_static_delta_superblock_sync (pull_data, from_revision, to_revision, - &delta_superblock, cancellable, error)) - goto out; - } - - if (!delta_superblock) - { - if (require_static_deltas) - { - g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, - "Static deltas required, but none found for %s to %s", - from_revision, to_revision); - goto out; - } - g_debug ("no delta superblock for %s-%s", from_revision ? from_revision : "empty", to_revision); - queue_scan_one_metadata_object (pull_data, to_revision, OSTREE_OBJECT_TYPE_COMMIT, NULL, 0); + g_autofree char *delta_name = + _ostree_get_relative_static_delta_superblock_path (from_revision, to_revision); + FetchDeltaSuperData *fdata = g_new0(FetchDeltaSuperData, 1); + fdata->pull_data = pull_data; + fdata->from_revision = g_strdup (from_revision); + fdata->to_revision = g_strdup (to_revision); + + _ostree_fetcher_request_to_membuf (pull_data->fetcher, + pull_data->content_mirrorlist, + delta_name, 0, + OSTREE_MAX_METADATA_SIZE, + 0, pull_data->cancellable, + on_superblock_fetched, fdata); + pull_data->n_outstanding_metadata_fetches++; + pull_data->n_requested_metadata++; } else { - g_debug ("processing delta superblock for %s-%s", from_revision ? from_revision : "empty", to_revision); - g_ptr_array_add (pull_data->static_delta_superblocks, g_variant_ref (delta_superblock)); - if (!process_one_static_delta (pull_data, from_revision, to_revision, - delta_superblock, - cancellable, error)) - goto out; + queue_scan_one_metadata_object (pull_data, to_revision, OSTREE_OBJECT_TYPE_COMMIT, NULL, 0); } }