pull: Remove explicit threading

Mixing async and threads has proved to be too much for my little mind.
It has race conditions that I've tried repeatedly to fix, but failed.

The threading here was scanning metadata objects - and there are
two parts to that:

1) Physically loading them from disk
2) Parsing them

Now #1 has been partially addressed by avoiding a storm of lstat() if
we're starting from a known working state.  If pull gets interrupted,
then we do need to rescan all objects.  Also, we can address this with
local metadata packfiles.

The other potentially slow bit is that we recurse across the metadata,
blocking the main thread.  We could ameliorate that in the future by
scheduling metadata parsing as idle "chunks".

Anyways, let's move the needle back to reliability, and readd speed
more carefully.

https://bugzilla.gnome.org/show_bug.cgi?id=706456
This commit is contained in:
Colin Walters 2014-02-21 12:56:41 -05:00
parent f2e0162846
commit b762c2f8f1
1 changed files with 92 additions and 350 deletions

View File

@ -20,39 +20,6 @@
* Author: Colin Walters <walters@verbum.org>
*/
/*
* See:
* https://mail.gnome.org/archives/ostree-list/2012-August/msg00021.html
*
* First, we synchronously fetch all requested refs, and resolve them
* to SHA256 commit checksums.
*
* Now, there are two threads involved here. First, there's the
* calling thread; we create a temporary #GMainContext, and iterate
* it. This thread performs all HTTP requests.
*
* The calling thread communicates with the "metadata scanning"
* thread. The purpose of the metadata thread is to avoid blocking
* the main thread while reading from the repository. If a
* transaction is interrupted for example, the next run will need to
* lstat() each loose object, which could easily be 60000 or more.
*
* The two threads pass messages back and forth over queues. The deep
* complexity in this code is determining when a pull process is
* complete. When the main thread completes fetching a metadata
* object, it passes it over to the metadata thread, which may in turn
* queue more work for the main thread. That in turn may generate
* more work for the metadata thread, etc.
*
* Work completion is presently done via sending special _IDLE message
* down the queue; if both threads are idle, the main thread tells the
* metadata thread to shut down, and then proceeds to stop iterating
* the main context.
*
* There is still a race condition here. See
* https://bugzilla.gnome.org/show_bug.cgi?id=706456
*/
#include "config.h"
#include "ostree.h"
@ -62,20 +29,6 @@
#include "ostree-fetcher.h"
#include "otutil.h"
typedef struct {
enum {
PULL_MSG_IDLE,
PULL_MSG_FETCH,
PULL_MSG_FETCH_DETACHED_METADATA,
PULL_MSG_SCAN,
PULL_MSG_QUIT
} t;
union {
guint idle_serial;
GVariant *item;
} d;
} PullWorkerMessage;
typedef struct {
OstreeRepo *repo;
OstreeRepoPullFlags flags;
@ -90,24 +43,19 @@ typedef struct {
OstreeAsyncProgress *progress;
gboolean transaction_resuming;
volatile gint n_scanned_metadata;
enum {
OSTREE_PULL_PHASE_FETCHING_REFS,
OSTREE_PULL_PHASE_FETCHING_OBJECTS
} phase;
gint n_scanned_metadata;
SoupURI *fetching_sync_uri;
gboolean gpg_verify;
GThread *metadata_thread;
GMainContext *metadata_thread_context;
GMainLoop *metadata_thread_loop;
GPtrArray *static_delta_metas;
OtWaitableQueue *metadata_objects_to_scan;
OtWaitableQueue *metadata_objects_to_fetch;
GHashTable *scanned_metadata; /* Maps object name to itself */
GHashTable *requested_metadata; /* Maps object name to itself */
GHashTable *requested_content; /* Maps object name to itself */
guint checking_metadata_scan_complete : 1;
guint metadata_scan_complete : 1;
gboolean was_idle;
guint idle_serial;
guint n_outstanding_metadata_fetches;
guint n_outstanding_metadata_write_requests;
guint n_outstanding_content_fetches;
@ -138,16 +86,18 @@ suburi_new (SoupURI *base,
...) G_GNUC_NULL_TERMINATED;
static gboolean scan_one_metadata_object (OtPullData *pull_data,
const guchar *csum,
const char *csum,
OstreeObjectType objtype,
guint recursion_depth,
GCancellable *cancellable,
GError **error);
static gboolean scan_one_metadata_object_v_name (OtPullData *pull_data,
GVariant *object,
GCancellable *cancellable,
GError **error);
static gboolean scan_one_metadata_object_c (OtPullData *pull_data,
const guchar *csum,
OstreeObjectType objtype,
guint recursion_depth,
GCancellable *cancellable,
GError **error);
static SoupURI *
suburi_new (SoupURI *base,
@ -193,7 +143,7 @@ update_progress (gpointer user_data)
guint64 bytes_transferred = ostree_fetcher_bytes_transferred (pull_data->fetcher);
guint fetched = pull_data->n_fetched_metadata + pull_data->n_fetched_content;
guint requested = pull_data->n_requested_metadata + pull_data->n_requested_content;
guint n_scanned_metadata = g_atomic_int_get (&pull_data->n_scanned_metadata);
guint n_scanned_metadata = pull_data->n_scanned_metadata;
g_assert (pull_data->progress);
@ -216,27 +166,6 @@ update_progress (gpointer user_data)
return TRUE;
}
static PullWorkerMessage *
pull_worker_message_new (int msgtype, gpointer data)
{
PullWorkerMessage *msg = g_new (PullWorkerMessage, 1);
msg->t = msgtype;
switch (msgtype)
{
case PULL_MSG_IDLE:
msg->d.idle_serial = GPOINTER_TO_UINT (data);
break;
case PULL_MSG_SCAN:
case PULL_MSG_FETCH:
case PULL_MSG_FETCH_DETACHED_METADATA:
msg->d.item = data;
break;
case PULL_MSG_QUIT:
break;
}
return msg;
}
static void
throw_async_error (OtPullData *pull_data,
GError *error)
@ -268,34 +197,20 @@ check_outstanding_requests_handle_error (OtPullData *pull_data,
throw_async_error (pull_data, error);
/* This is true in the phase when we're fetching refs */
if (pull_data->metadata_objects_to_scan == NULL)
switch (pull_data->phase)
{
case OSTREE_PULL_PHASE_FETCHING_REFS:
if (!pull_data->fetching_sync_uri)
g_main_loop_quit (pull_data->loop);
return;
}
if (pull_data->was_idle && !current_idle)
{
/* We transitioned to !idle */
g_debug ("pull: No longer idle");
pull_data->idle_serial++;
pull_data->was_idle = FALSE;
}
else if (!pull_data->was_idle && current_idle)
{
pull_data->was_idle = TRUE;
g_debug ("Sending new MSG_IDLE with serial %u", pull_data->idle_serial);
ot_waitable_queue_push (pull_data->metadata_objects_to_scan,
pull_worker_message_new (PULL_MSG_IDLE, GUINT_TO_POINTER (pull_data->idle_serial)));
}
if (pull_data->metadata_scan_complete && current_idle)
{
g_debug ("pull: metadata scan complete and idle, exiting mainloop");
g_main_loop_quit (pull_data->loop);
break;
case OSTREE_PULL_PHASE_FETCHING_OBJECTS:
if (current_idle)
{
g_debug ("pull: idle, exiting mainloop");
g_main_loop_quit (pull_data->loop);
}
break;
}
}
@ -441,6 +356,12 @@ fetch_uri_contents_utf8_sync (OtPullData *pull_data,
return ret;
}
static void
enqueue_one_object_request (OtPullData *pull_data,
const char *checksum,
OstreeObjectType objtype,
gboolean is_detached_meta);
static gboolean
scan_dirtree_object (OtPullData *pull_data,
const char *checksum,
@ -491,12 +412,8 @@ scan_dirtree_object (OtPullData *pull_data,
if (!file_is_stored && !g_hash_table_lookup (pull_data->requested_content, file_checksum))
{
g_hash_table_insert (pull_data->requested_content, file_checksum, file_checksum);
g_debug ("queued fetch of content %s", file_checksum);
ot_waitable_queue_push (pull_data->metadata_objects_to_fetch,
pull_worker_message_new (PULL_MSG_FETCH,
ostree_object_name_serialize (file_checksum, OSTREE_OBJECT_TYPE_FILE)));
file_checksum = NULL; /* Transfer ownership to hash */
enqueue_one_object_request (pull_data, file_checksum, OSTREE_OBJECT_TYPE_FILE, FALSE);
file_checksum = NULL; /* Transfer ownership */
}
}
@ -513,14 +430,14 @@ scan_dirtree_object (OtPullData *pull_data,
if (!ot_util_filename_validate (dirname, error))
goto out;
if (!scan_one_metadata_object (pull_data, ostree_checksum_bytes_peek (tree_csum),
OSTREE_OBJECT_TYPE_DIR_TREE, recursion_depth + 1,
cancellable, error))
if (!scan_one_metadata_object_c (pull_data, ostree_checksum_bytes_peek (tree_csum),
OSTREE_OBJECT_TYPE_DIR_TREE, recursion_depth + 1,
cancellable, error))
goto out;
if (!scan_one_metadata_object (pull_data, ostree_checksum_bytes_peek (meta_csum),
OSTREE_OBJECT_TYPE_DIR_META, recursion_depth + 1,
cancellable, error))
if (!scan_one_metadata_object_c (pull_data, ostree_checksum_bytes_peek (meta_csum),
OSTREE_OBJECT_TYPE_DIR_META, recursion_depth + 1,
cancellable, error))
goto out;
}
@ -649,14 +566,6 @@ content_fetch_on_complete (GObject *object,
check_outstanding_requests_handle_error (pull_data, local_error);
}
static void
note_metadata_not_complete (OtPullData *pull_data)
{
if (pull_data->metadata_scan_complete)
g_debug ("pull: Transition metadata scan complete -> not complete");
pull_data->metadata_scan_complete = FALSE;
}
static void
on_metadata_writed (GObject *object,
GAsyncResult *result,
@ -692,10 +601,10 @@ on_metadata_writed (GObject *object,
goto out;
}
note_metadata_not_complete (pull_data);
ot_waitable_queue_push (pull_data->metadata_objects_to_scan,
pull_worker_message_new (PULL_MSG_SCAN,
g_variant_ref (fetch_data->object)));
if (!scan_one_metadata_object_c (pull_data, csum, objtype, 0,
pull_data->cancellable, error))
goto out;
out:
pull_data->n_outstanding_metadata_write_requests--;
(void) gs_file_unlink (fetch_data->temp_path, NULL, NULL);
@ -706,11 +615,6 @@ on_metadata_writed (GObject *object,
check_outstanding_requests_handle_error (pull_data, local_error);
}
static void
enqueue_one_object_request (OtPullData *pull_data,
GVariant *object_name,
gboolean is_detached_meta);
static void
meta_fetch_on_complete (GObject *object,
GAsyncResult *result,
@ -736,7 +640,7 @@ meta_fetch_on_complete (GObject *object,
{
/* There isn't any detached metadata, just fetch the commit */
g_clear_error (&local_error);
enqueue_one_object_request (pull_data, fetch_data->object, FALSE);
enqueue_one_object_request (pull_data, checksum, objtype, FALSE);
}
goto out;
@ -751,7 +655,7 @@ meta_fetch_on_complete (GObject *object,
pull_data->cancellable, error))
goto out;
enqueue_one_object_request (pull_data, fetch_data->object, FALSE);
enqueue_one_object_request (pull_data, checksum, objtype, FALSE);
}
else
{
@ -766,6 +670,7 @@ meta_fetch_on_complete (GObject *object,
}
out:
g_assert (pull_data->n_outstanding_metadata_fetches > 0);
pull_data->n_outstanding_metadata_fetches--;
pull_data->n_fetched_metadata++;
throw_async_error (pull_data, local_error);
@ -817,14 +722,16 @@ scan_commit_object (OtPullData *pull_data,
g_variant_get_child (commit, 6, "@ay", &tree_contents_csum);
g_variant_get_child (commit, 7, "@ay", &tree_meta_csum);
if (!scan_one_metadata_object (pull_data, ostree_checksum_bytes_peek (tree_contents_csum),
OSTREE_OBJECT_TYPE_DIR_TREE, recursion_depth + 1,
cancellable, error))
if (!scan_one_metadata_object_c (pull_data,
ostree_checksum_bytes_peek (tree_contents_csum),
OSTREE_OBJECT_TYPE_DIR_TREE, recursion_depth + 1,
cancellable, error))
goto out;
if (!scan_one_metadata_object (pull_data, ostree_checksum_bytes_peek (tree_meta_csum),
OSTREE_OBJECT_TYPE_DIR_META, recursion_depth + 1,
cancellable, error))
if (!scan_one_metadata_object_c (pull_data,
ostree_checksum_bytes_peek (tree_meta_csum),
OSTREE_OBJECT_TYPE_DIR_META, recursion_depth + 1,
cancellable, error))
goto out;
ret = TRUE;
@ -836,11 +743,27 @@ scan_commit_object (OtPullData *pull_data,
static gboolean
scan_one_metadata_object (OtPullData *pull_data,
const guchar *csum,
const char *csum,
OstreeObjectType objtype,
guint recursion_depth,
GCancellable *cancellable,
GError **error)
{
guchar buf[32];
ostree_checksum_inplace_to_bytes (csum, buf);
return scan_one_metadata_object_c (pull_data, buf, objtype,
recursion_depth,
cancellable, error);
}
static gboolean
scan_one_metadata_object_c (OtPullData *pull_data,
const guchar *csum,
OstreeObjectType objtype,
guint recursion_depth,
GCancellable *cancellable,
GError **error)
{
gboolean ret = FALSE;
gs_unref_variant GVariant *object = NULL;
@ -862,16 +785,12 @@ scan_one_metadata_object (OtPullData *pull_data,
if (!is_stored && !is_requested)
{
char *duped_checksum = g_strdup (tmp_checksum);
gboolean do_fetch_detached;
g_hash_table_insert (pull_data->requested_metadata, duped_checksum, duped_checksum);
if (objtype == OSTREE_OBJECT_TYPE_COMMIT)
ot_waitable_queue_push (pull_data->metadata_objects_to_fetch,
pull_worker_message_new (PULL_MSG_FETCH_DETACHED_METADATA,
g_variant_ref (object)));
else
ot_waitable_queue_push (pull_data->metadata_objects_to_fetch,
pull_worker_message_new (PULL_MSG_FETCH,
g_variant_ref (object)));
do_fetch_detached = (objtype == OSTREE_OBJECT_TYPE_COMMIT);
enqueue_one_object_request (pull_data, tmp_checksum, objtype, do_fetch_detached);
}
else if (is_stored)
{
@ -891,13 +810,13 @@ scan_one_metadata_object (OtPullData *pull_data,
pull_data->cancellable, error))
goto out;
break;
case OSTREE_OBJECT_TYPE_FILE:
default:
g_assert_not_reached ();
break;
}
}
g_hash_table_insert (pull_data->scanned_metadata, g_variant_ref (object), object);
g_atomic_int_inc (&pull_data->n_scanned_metadata);
pull_data->n_scanned_metadata++;
}
ret = TRUE;
@ -905,132 +824,19 @@ scan_one_metadata_object (OtPullData *pull_data,
return ret;
}
static gboolean
scan_one_metadata_object_v_name (OtPullData *pull_data,
GVariant *object,
GCancellable *cancellable,
GError **error)
{
OstreeObjectType objtype;
const char *checksum = NULL;
gs_free guchar *csum = NULL;
ostree_object_name_deserialize (object, &checksum, &objtype);
csum = ostree_checksum_to_bytes (checksum);
return scan_one_metadata_object (pull_data, csum, objtype, 0,
cancellable, error);
}
typedef struct {
OtPullData *pull_data;
GError *error;
} IdleThrowErrorData;
static gboolean
idle_throw_error (gpointer user_data)
{
IdleThrowErrorData *data = user_data;
throw_async_error (data->pull_data, data->error);
g_free (data);
return FALSE;
}
static gboolean
on_metadata_objects_to_scan_ready (gint fd,
GIOCondition condition,
gpointer user_data)
{
OtPullData *pull_data = user_data;
PullWorkerMessage *msg;
PullWorkerMessage *last_idle_msg = NULL;
GError *local_error = NULL;
GError **error = &local_error;
while (ot_waitable_queue_pop (pull_data->metadata_objects_to_scan, (gpointer*)&msg))
{
if (msg->t == PULL_MSG_SCAN)
{
if (!scan_one_metadata_object_v_name (pull_data, msg->d.item,
pull_data->cancellable, error))
goto out;
g_variant_unref (msg->d.item);
g_free (msg);
}
else if (msg->t == PULL_MSG_IDLE)
{
g_free (last_idle_msg);
last_idle_msg = msg;
}
else if (msg->t == PULL_MSG_QUIT)
{
g_free (msg);
g_debug ("pull: Processing PULL_MSG_QUIT");
g_main_loop_quit (pull_data->metadata_thread_loop);
}
else
g_assert_not_reached ();
}
if (last_idle_msg)
{
g_debug ("pull: Processing PULL_MSG_IDLE");
ot_waitable_queue_push (pull_data->metadata_objects_to_fetch,
last_idle_msg);
}
out:
if (local_error)
{
IdleThrowErrorData *throwdata = g_new0 (IdleThrowErrorData, 1);
throwdata->pull_data = pull_data;
throwdata->error = local_error;
g_main_context_invoke (NULL, idle_throw_error, throwdata);
}
return TRUE;
}
/**
* metadata_thread_main:
*
* Called from the metadatascan worker thread. If we're missing an
* object from one of them, we queue a request to the main thread to
* fetch it. When it's fetched, we get passed the object back and
* scan it.
*/
static gpointer
metadata_thread_main (gpointer user_data)
{
OtPullData *pull_data = user_data;
GSource *src;
pull_data->metadata_thread_context = g_main_context_new ();
pull_data->metadata_thread_loop = g_main_loop_new (pull_data->metadata_thread_context, TRUE);
src = ot_waitable_queue_create_source (pull_data->metadata_objects_to_scan);
g_source_set_callback (src, (GSourceFunc)on_metadata_objects_to_scan_ready, pull_data, NULL);
g_source_attach (src, pull_data->metadata_thread_context);
g_source_unref (src);
g_main_loop_run (pull_data->metadata_thread_loop);
return NULL;
}
static void
enqueue_one_object_request (OtPullData *pull_data,
GVariant *object_name,
const char *checksum,
OstreeObjectType objtype,
gboolean is_detached_meta)
{
const char *checksum;
OstreeObjectType objtype;
SoupURI *obj_uri = NULL;
gboolean is_meta;
FetchObjectData *fetch_data;
gs_free char *objpath = NULL;
ostree_object_name_deserialize (object_name, &checksum, &objtype);
g_debug ("queuing fetch of %s.%s", checksum,
ostree_object_type_to_string (objtype));
if (is_detached_meta)
{
@ -1058,57 +864,13 @@ enqueue_one_object_request (OtPullData *pull_data,
}
fetch_data = g_new0 (FetchObjectData, 1);
fetch_data->pull_data = pull_data;
fetch_data->object = g_variant_ref (object_name);
fetch_data->object = ostree_object_name_serialize (checksum, objtype);
fetch_data->is_detached_meta = is_detached_meta;
ostree_fetcher_request_uri_with_partial_async (pull_data->fetcher, obj_uri, pull_data->cancellable,
is_meta ? meta_fetch_on_complete : content_fetch_on_complete, fetch_data);
soup_uri_free (obj_uri);
}
static gboolean
on_metadata_objects_to_fetch_ready (gint fd,
GIOCondition condition,
gpointer user_data)
{
OtPullData *pull_data = user_data;
PullWorkerMessage *msg;
if (!ot_waitable_queue_pop (pull_data->metadata_objects_to_fetch, (gpointer*)&msg))
goto out;
if (msg->t == PULL_MSG_IDLE)
{
pull_data->checking_metadata_scan_complete = FALSE;
if (msg->d.idle_serial == pull_data->idle_serial)
{
g_debug ("marking metadata scan as complete");
pull_data->metadata_scan_complete = TRUE;
}
}
else if (msg->t == PULL_MSG_FETCH || msg->t == PULL_MSG_FETCH_DETACHED_METADATA)
{
gboolean is_detached_meta;
note_metadata_not_complete (pull_data);
is_detached_meta = msg->t == PULL_MSG_FETCH_DETACHED_METADATA;
enqueue_one_object_request (pull_data, msg->d.item, is_detached_meta);
g_variant_unref (msg->d.item);
}
else
{
g_assert_not_reached ();
}
g_free (msg);
out:
check_outstanding_requests_handle_error (pull_data, NULL);
return TRUE;
}
static gboolean
repo_get_string_key_inherit (OstreeRepo *repo,
const char *section,
@ -1178,15 +940,6 @@ load_remote_repo_config (OtPullData *pull_data,
return ret;
}
static void
initiate_commit_scan (OtPullData *pull_data,
const char *checksum)
{
ot_waitable_queue_push (pull_data->metadata_objects_to_scan,
pull_worker_message_new (PULL_MSG_SCAN,
ostree_object_name_serialize (checksum, OSTREE_OBJECT_TYPE_COMMIT)));
}
#if 0
static gboolean
request_static_delta_meta_sync (OtPullData *pull_data,
@ -1306,6 +1059,8 @@ ostree_repo_pull (OstreeRepo *self,
pull_data->gpg_verify = FALSE;
#endif
pull_data->phase = OSTREE_PULL_PHASE_FETCHING_REFS;
if (!ot_keyfile_get_boolean_with_default (config, remote_key, "tls-permissive",
FALSE, &tls_permissive, error))
goto out;
@ -1394,28 +1149,30 @@ ostree_repo_pull (OstreeRepo *self,
}
}
pull_data->phase = OSTREE_PULL_PHASE_FETCHING_OBJECTS;
if (!ostree_repo_prepare_transaction (pull_data->repo, &pull_data->transaction_resuming,
cancellable, error))
goto out;
g_debug ("resuming transaction: %s", pull_data->transaction_resuming ? "true" : " false");
pull_data->metadata_objects_to_fetch = ot_waitable_queue_new ();
pull_data->metadata_objects_to_scan = ot_waitable_queue_new ();
pull_data->metadata_thread = g_thread_new ("metadatascan", metadata_thread_main, pull_data);
g_hash_table_iter_init (&hash_iter, commits_to_fetch);
while (g_hash_table_iter_next (&hash_iter, &key, &value))
{
const char *commit = value;
initiate_commit_scan (pull_data, commit);
if (!scan_one_metadata_object (pull_data, commit, OSTREE_OBJECT_TYPE_COMMIT,
0, pull_data->cancellable, error))
goto out;
}
g_hash_table_iter_init (&hash_iter, requested_refs_to_fetch);
while (g_hash_table_iter_next (&hash_iter, &key, &value))
{
const char *checksum = value;
initiate_commit_scan (pull_data, checksum);
if (!scan_one_metadata_object (pull_data, checksum, OSTREE_OBJECT_TYPE_COMMIT,
0, pull_data->cancellable, error))
goto out;
}
for (i = 0; i < pull_data->static_delta_metas->len; i++)
@ -1423,13 +1180,6 @@ ostree_repo_pull (OstreeRepo *self,
process_one_static_delta_meta (pull_data, pull_data->static_delta_metas->pdata[i]);
}
{
queue_src = ot_waitable_queue_create_source (pull_data->metadata_objects_to_fetch);
g_source_set_callback (queue_src, (GSourceFunc)on_metadata_objects_to_fetch_ready, pull_data, NULL);
g_source_attach (queue_src, pull_data->main_context);
g_source_unref (queue_src);
}
/* Now await work completion */
if (!run_mainloop_monitor_fetcher (pull_data))
goto out;
@ -1497,15 +1247,7 @@ ostree_repo_pull (OstreeRepo *self,
soup_uri_free (pull_data->base_uri);
if (queue_src)
g_source_destroy (queue_src);
if (pull_data->metadata_thread)
{
ot_waitable_queue_push (pull_data->metadata_objects_to_scan,
pull_worker_message_new (PULL_MSG_QUIT, NULL));
g_thread_join (pull_data->metadata_thread);
}
g_clear_pointer (&pull_data->static_delta_metas, (GDestroyNotify) g_ptr_array_unref);
g_clear_pointer (&pull_data->metadata_objects_to_scan, (GDestroyNotify) ot_waitable_queue_unref);
g_clear_pointer (&pull_data->metadata_objects_to_fetch, (GDestroyNotify) ot_waitable_queue_unref);
g_clear_pointer (&pull_data->scanned_metadata, (GDestroyNotify) g_hash_table_unref);
g_clear_pointer (&pull_data->requested_content, (GDestroyNotify) g_hash_table_unref);
g_clear_pointer (&pull_data->requested_metadata, (GDestroyNotify) g_hash_table_unref);