pull: Stage content asynchronously

For similar reasons as metadata, this avoids having the main thread
blocked in fdatasync(), and even better - we can achieve much higher
parallelism if we have multiple threads blocked on fdatasync().
This commit is contained in:
Colin Walters 2012-10-04 20:00:00 -04:00
parent 5b8e833351
commit 9618232f4d
5 changed files with 164 additions and 78 deletions

View File

@ -1558,6 +1558,101 @@ ostree_repo_stage_content (OstreeRepo *self,
cancellable, error);
}
typedef struct {
OstreeRepo *repo;
char *expected_checksum;
GInputStream *object;
guint64 file_object_length;
GCancellable *cancellable;
GSimpleAsyncResult *result;
guchar *result_csum;
} StageContentAsyncData;
static void
stage_content_async_data_free (gpointer user_data)
{
StageContentAsyncData *data = user_data;
g_clear_object (&data->repo);
g_clear_object (&data->cancellable);
g_clear_object (&data->object);
g_free (data->result_csum);
g_free (data->expected_checksum);
g_free (data);
}
static void
stage_content_thread (GSimpleAsyncResult *res,
GObject *object,
GCancellable *cancellable)
{
GError *error = NULL;
StageContentAsyncData *data;
data = g_simple_async_result_get_op_res_gpointer (res);
if (!ostree_repo_stage_content (data->repo, data->expected_checksum,
data->object, data->file_object_length,
&data->result_csum,
cancellable, &error))
g_simple_async_result_take_error (res, error);
}
/**
* ostree_repo_stage_content_async:
*
* Asynchronously store the content object @object. If provided,
* the checksum @expected_checksum will be verified.
*/
void
ostree_repo_stage_content_async (OstreeRepo *self,
const char *expected_checksum,
GInputStream *object,
guint64 file_object_length,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data)
{
StageContentAsyncData *asyncdata;
asyncdata = g_new0 (StageContentAsyncData, 1);
asyncdata->repo = g_object_ref (self);
asyncdata->expected_checksum = g_strdup (expected_checksum);
asyncdata->object = g_object_ref (object);
asyncdata->file_object_length = file_object_length;
asyncdata->cancellable = cancellable ? g_object_ref (cancellable) : NULL;
asyncdata->result = g_simple_async_result_new ((GObject*) self,
callback, user_data,
ostree_repo_stage_content_async);
g_simple_async_result_set_op_res_gpointer (asyncdata->result, asyncdata,
stage_content_async_data_free);
g_simple_async_result_run_in_thread (asyncdata->result, stage_content_thread, G_PRIORITY_DEFAULT, cancellable);
g_object_unref (asyncdata->result);
}
gboolean
ostree_repo_stage_content_finish (OstreeRepo *self,
GAsyncResult *result,
guchar **out_csum,
GError **error)
{
GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
StageContentAsyncData *data;
g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == ostree_repo_stage_content_async);
if (g_simple_async_result_propagate_error (simple, error))
return FALSE;
data = g_simple_async_result_get_op_res_gpointer (simple);
/* Transfer ownership */
*out_csum = data->result_csum;
data->result_csum = NULL;
return TRUE;
}
static GVariant *
create_empty_gvariant_dict (void)
{

View File

@ -113,7 +113,7 @@ void ostree_repo_stage_metadata_async (OstreeRepo *self,
gboolean ostree_repo_stage_metadata_finish (OstreeRepo *self,
GAsyncResult *result,
guchar **out_checksum,
guchar **out_csum,
GError **error);
gboolean ostree_repo_stage_content (OstreeRepo *self,
@ -138,6 +138,19 @@ gboolean ostree_repo_stage_content_trusted (OstreeRepo *self,
GCancellable *cancellable,
GError **error);
void ostree_repo_stage_content_async (OstreeRepo *self,
const char *expected_checksum,
GInputStream *object,
guint64 file_object_length,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data);
gboolean ostree_repo_stage_content_finish (OstreeRepo *self,
GAsyncResult *result,
guchar **out_csum,
GError **error);
gboolean ostree_repo_resolve_rev (OstreeRepo *self,
const char *rev,
gboolean allow_noent,

View File

@ -82,6 +82,7 @@ struct OstreeFetcher
GHashTable *message_to_request; /* SoupMessage -> SoupRequest */
guint64 total_downloaded;
guint total_requests;
};
G_DEFINE_TYPE (OstreeFetcher, ostree_fetcher, G_TYPE_OBJECT)
@ -243,6 +244,8 @@ ostree_fetcher_request_uri_async (OstreeFetcher *self,
OstreeFetcherPendingURI *pending;
GError *local_error = NULL;
self->total_requests++;
pending = g_new0 (OstreeFetcherPendingURI, 1);
pending->refcount = 1;
pending->self = g_object_ref (self);
@ -352,3 +355,9 @@ ostree_fetcher_bytes_transferred (OstreeFetcher *self)
{
return self->total_downloaded;
}
guint
ostree_fetcher_get_n_requests (OstreeFetcher *self)
{
return self->total_requests;
}

View File

@ -51,6 +51,8 @@ char * ostree_fetcher_query_state_text (OstreeFetcher *self);
guint64 ostree_fetcher_bytes_transferred (OstreeFetcher *self);
guint ostree_fetcher_get_n_requests (OstreeFetcher *self);
void ostree_fetcher_request_uri_async (OstreeFetcher *self,
SoupURI *uri,
GCancellable *cancellable,

View File

@ -109,7 +109,9 @@ typedef struct {
guint n_fetched_content;
guint outstanding_filemeta_requests;
guint outstanding_filecontent_requests;
guint outstanding_checksum_requests;
guint outstanding_content_stage_requests;
guint64 previous_total_downloaded;
GError **async_error;
gboolean caught_error;
@ -185,18 +187,31 @@ uri_fetch_update_status (gpointer user_data)
OtPullData *pull_data = user_data;
ot_lfree char *fetcher_status;
GString *status;
guint64 current_bytes_transferred;
guint64 delta_bytes_transferred;
status = g_string_new ("");
g_string_append_printf (status, "%u/%u metadata %u/%u content fetched; ",
if (pull_data->metadata_scan_active)
g_string_append_printf (status, "scan: %u metadata; ",
g_atomic_int_get (&pull_data->n_scanned_metadata));
g_string_append_printf (status, "fetch: %u/%u metadata %u/%u content; ",
g_atomic_int_get (&pull_data->n_fetched_metadata),
g_atomic_int_get (&pull_data->n_requested_metadata),
pull_data->n_fetched_content,
g_atomic_int_get (&pull_data->n_requested_content));
if (pull_data->outstanding_checksum_requests > 0)
g_string_append_printf (status, "Calculating %u checksums; ",
pull_data->outstanding_checksum_requests);
current_bytes_transferred = ostree_fetcher_bytes_transferred (pull_data->fetcher);
delta_bytes_transferred = current_bytes_transferred - pull_data->previous_total_downloaded;
pull_data->previous_total_downloaded = current_bytes_transferred;
if (delta_bytes_transferred < 1024)
g_string_append_printf (status, "%u B/s; ",
(guint)delta_bytes_transferred);
else
g_string_append_printf (status, "%.1f KiB/s; ",
(double)delta_bytes_transferred / 1024);
fetcher_status = ostree_fetcher_query_state_text (pull_data->fetcher);
g_string_append (status, fetcher_status);
@ -245,7 +260,7 @@ check_outstanding_requests_handle_error (OtPullData *pull_data,
pull_data->outstanding_uri_requests == 0 &&
pull_data->outstanding_filemeta_requests == 0 &&
pull_data->outstanding_filecontent_requests == 0 &&
pull_data->outstanding_checksum_requests == 0)
pull_data->outstanding_content_stage_requests == 0)
g_main_loop_quit (pull_data->loop);
throw_async_error (pull_data, error);
}
@ -507,79 +522,27 @@ destroy_fetch_one_content_item_data (OtFetchOneContentItemData *data)
}
static void
content_fetch_on_checksum_complete (GObject *object,
content_fetch_on_stage_complete (GObject *object,
GAsyncResult *result,
gpointer user_data)
{
OtFetchOneContentItemData *data = user_data;
GError *local_error = NULL;
GError **error = &local_error;
guint64 length;
GCancellable *cancellable = NULL;
gboolean compressed;
ot_lfree guchar *csum;
ot_lvariant GVariant *file_meta = NULL;
ot_lobj GFileInfo *file_info = NULL;
ot_lvariant GVariant *xattrs = NULL;
ot_lobj GInputStream *content_input = NULL;
ot_lobj GInputStream *file_object_input = NULL;
ot_lfree char *checksum;
ot_lfree guchar *csum = NULL;
ot_lfree char *checksum = NULL;
csum = ot_gio_checksum_stream_finish ((GInputStream*)object, result, error);
if (!csum)
if (!ostree_repo_stage_content_finish ((OstreeRepo*)object, result,
&csum, error))
goto out;
checksum = ostree_checksum_from_bytes (csum);
if (strcmp (checksum, data->checksum) != 0)
{
g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
"Corrupted object %s (actual checksum is %s)",
data->checksum, checksum);
goto out;
}
compressed = data->pull_data->remote_mode == OSTREE_REPO_MODE_ARCHIVE_Z;
if (compressed)
{
content_input = (GInputStream*)g_file_read (data->content_path, cancellable, error);
if (!content_input)
goto out;
if (!ostree_zlib_content_stream_open (content_input, &length, &file_object_input,
cancellable, error))
goto out;
}
else
{
if (!ot_util_variant_map (data->meta_path, OSTREE_FILE_HEADER_GVARIANT_FORMAT, TRUE,
&file_meta, error))
goto out;
if (!ostree_file_header_parse (file_meta, &file_info, &xattrs, error))
goto out;
if (data->content_path)
{
content_input = (GInputStream*)g_file_read (data->content_path, cancellable, error);
if (!content_input)
goto out;
}
if (!ostree_raw_file_to_content_stream (content_input, file_info, xattrs,
&file_object_input, &length,
cancellable, error))
goto out;
}
if (!ostree_repo_stage_content_trusted (data->pull_data->repo, checksum,
file_object_input, length,
cancellable, error))
goto out;
g_assert (strcmp (checksum, data->checksum) == 0);
data->pull_data->n_fetched_content++;
out:
data->pull_data->outstanding_checksum_requests--;
data->pull_data->outstanding_content_stage_requests--;
check_outstanding_requests_handle_error (data->pull_data, local_error);
destroy_fetch_one_content_item_data (data);
}
@ -623,6 +586,7 @@ content_fetch_on_complete (GObject *object,
GCancellable *cancellable = NULL;
gboolean was_content_fetch = FALSE;
gboolean need_content_fetch = FALSE;
guint64 length;
ot_lvariant GVariant *file_meta = NULL;
ot_lobj GFileInfo *file_info = NULL;
ot_lobj GInputStream *content_input = NULL;
@ -674,20 +638,21 @@ content_fetch_on_complete (GObject *object,
if (!need_content_fetch && compressed)
{
ot_lobj GInputStream *uncomp_input = NULL;
guint64 uncompressed_len;
g_assert (data->content_path != NULL);
content_input = (GInputStream*)g_file_read (data->content_path, cancellable, error);
if (!content_input)
goto out;
if (!ostree_zlib_content_stream_open (content_input, &uncompressed_len, &uncomp_input,
if (!ostree_zlib_content_stream_open (content_input, &length, &uncomp_input,
cancellable, error))
goto out;
data->pull_data->outstanding_checksum_requests++;
ot_gio_checksum_stream_async (uncomp_input, G_PRIORITY_DEFAULT, NULL,
content_fetch_on_checksum_complete, data);
data->pull_data->outstanding_content_stage_requests++;
ostree_repo_stage_content_async (data->pull_data->repo, data->checksum,
uncomp_input, length,
cancellable,
content_fetch_on_stage_complete, data);
}
else if (!need_content_fetch)
{
@ -709,13 +674,15 @@ content_fetch_on_complete (GObject *object,
}
if (!ostree_raw_file_to_content_stream (content_input, file_info, xattrs,
&file_object_input, NULL,
&file_object_input, &length,
cancellable, error))
goto out;
data->pull_data->outstanding_checksum_requests++;
ot_gio_checksum_stream_async (file_object_input, G_PRIORITY_DEFAULT, NULL,
content_fetch_on_checksum_complete, data);
data->pull_data->outstanding_content_stage_requests++;
ostree_repo_stage_content_async (data->pull_data->repo, data->checksum,
file_object_input, length,
cancellable,
content_fetch_on_stage_complete, data);
}
while (data->pull_data->outstanding_filemeta_requests < 10)