pull: Stage metadata objects asynchronously

This avoids the main thread being blocked on fdatasync(); also as a
bonus we checksum metadata in a separate thread too.
This commit is contained in:
Colin Walters 2012-10-04 18:24:37 -04:00
parent f1b4db15a2
commit 5b8e833351
3 changed files with 140 additions and 16 deletions

View File

@ -1383,6 +1383,101 @@ ostree_repo_stage_metadata_trusted (OstreeRepo *self,
cancellable, error);
}
typedef struct {
OstreeRepo *repo;
OstreeObjectType objtype;
char *expected_checksum;
GVariant *object;
GCancellable *cancellable;
GSimpleAsyncResult *result;
guchar *result_csum;
} StageMetadataAsyncData;
static void
stage_metadata_async_data_free (gpointer user_data)
{
StageMetadataAsyncData *data = user_data;
g_clear_object (&data->repo);
g_clear_object (&data->cancellable);
g_variant_unref (data->object);
g_free (data->result_csum);
g_free (data->expected_checksum);
g_free (data);
}
static void
stage_metadata_thread (GSimpleAsyncResult *res,
GObject *object,
GCancellable *cancellable)
{
GError *error = NULL;
StageMetadataAsyncData *data;
data = g_simple_async_result_get_op_res_gpointer (res);
if (!ostree_repo_stage_metadata (data->repo, data->objtype, data->expected_checksum,
data->object,
&data->result_csum,
cancellable, &error))
g_simple_async_result_take_error (res, error);
}
/**
* ostree_repo_stage_metadata_async:
*
* Asynchronously store the metadata object @variant. If provided,
* the checksum @expected_checksum will be verified.
*/
void
ostree_repo_stage_metadata_async (OstreeRepo *self,
OstreeObjectType objtype,
const char *expected_checksum,
GVariant *object,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data)
{
StageMetadataAsyncData *asyncdata;
asyncdata = g_new0 (StageMetadataAsyncData, 1);
asyncdata->repo = g_object_ref (self);
asyncdata->objtype = objtype;
asyncdata->expected_checksum = g_strdup (expected_checksum);
asyncdata->object = g_variant_ref (object);
asyncdata->cancellable = cancellable ? g_object_ref (cancellable) : NULL;
asyncdata->result = g_simple_async_result_new ((GObject*) self,
callback, user_data,
ostree_repo_stage_metadata_async);
g_simple_async_result_set_op_res_gpointer (asyncdata->result, asyncdata,
stage_metadata_async_data_free);
g_simple_async_result_run_in_thread (asyncdata->result, stage_metadata_thread, G_PRIORITY_DEFAULT, cancellable);
g_object_unref (asyncdata->result);
}
gboolean
ostree_repo_stage_metadata_finish (OstreeRepo *self,
GAsyncResult *result,
guchar **out_csum,
GError **error)
{
GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
StageMetadataAsyncData *data;
g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == ostree_repo_stage_metadata_async);
if (g_simple_async_result_propagate_error (simple, error))
return FALSE;
data = g_simple_async_result_get_op_res_gpointer (simple);
/* Transfer ownership */
*out_csum = data->result_csum;
data->result_csum = NULL;
return TRUE;
}
static gboolean
stage_directory_meta (OstreeRepo *self,
GFileInfo *file_info,

View File

@ -103,6 +103,19 @@ gboolean ostree_repo_stage_metadata (OstreeRepo *self,
GCancellable *cancellable,
GError **error);
void ostree_repo_stage_metadata_async (OstreeRepo *self,
OstreeObjectType objtype,
const char *expected_checksum,
GVariant *object,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data);
gboolean ostree_repo_stage_metadata_finish (OstreeRepo *self,
GAsyncResult *result,
guchar **out_checksum,
GError **error);
gboolean ostree_repo_stage_content (OstreeRepo *self,
const char *expected_checksum,
GInputStream *content,

View File

@ -760,8 +760,29 @@ idle_queue_content_request (gpointer user_data)
typedef struct {
OtPullData *pull_data;
GVariant *object;
GFile *temp_path;
} IdleFetchMetadataObjectData;
static void
on_metadata_staged (GObject *object,
GAsyncResult *result,
gpointer user_data)
{
IdleFetchMetadataObjectData *fetch_data = user_data;
OtPullData *pull_data = fetch_data->pull_data;
pull_data->n_fetched_metadata++;
ot_worker_queue_push (pull_data->metadata_objects_to_scan,
g_variant_ref (fetch_data->object));
ot_worker_queue_release (pull_data->metadata_objects_to_scan);
(void) ot_gfile_unlink (fetch_data->temp_path, NULL, NULL);
g_object_unref (fetch_data->temp_path);
g_variant_unref (fetch_data->object);
g_free (fetch_data);
}
static void
meta_fetch_on_complete (GObject *object,
GAsyncResult *result,
@ -769,38 +790,33 @@ meta_fetch_on_complete (GObject *object,
{
IdleFetchMetadataObjectData *fetch_data = user_data;
OtPullData *pull_data = fetch_data->pull_data;
ot_lobj GFile *temp_path = NULL;
ot_lvariant GVariant *metadata = NULL;
const char *checksum;
OstreeObjectType objtype;
GError *local_error = NULL;
GError **error = &local_error;
temp_path = ostree_fetcher_request_uri_finish ((OstreeFetcher*)object, result, error);
if (!temp_path)
fetch_data->temp_path = ostree_fetcher_request_uri_finish ((OstreeFetcher*)object, result, error);
if (!fetch_data->temp_path)
goto out;
ostree_object_name_deserialize (fetch_data->object, &checksum, &objtype);
if (!ot_util_variant_map (temp_path, ostree_metadata_variant_type (objtype),
if (!ot_util_variant_map (fetch_data->temp_path, ostree_metadata_variant_type (objtype),
FALSE, &metadata, error))
goto out;
if (!ostree_repo_stage_metadata (pull_data->repo, objtype, checksum, metadata, (guchar**)NULL,
pull_data->cancellable, error))
goto out;
pull_data->n_fetched_metadata++;
ot_worker_queue_push (pull_data->metadata_objects_to_scan,
g_variant_ref (fetch_data->object));
ot_worker_queue_release (pull_data->metadata_objects_to_scan);
ostree_repo_stage_metadata_async (pull_data->repo, objtype, checksum, metadata,
pull_data->cancellable,
on_metadata_staged, fetch_data);
out:
(void) ot_gfile_unlink (temp_path, NULL, NULL);
throw_async_error (pull_data, local_error);
if (local_error)
{
g_variant_unref (fetch_data->object);
g_free (fetch_data);
}
}
static gboolean