diff --git a/Makefile-otutil.am b/Makefile-otutil.am index 58ea8f83..8afab956 100644 --- a/Makefile-otutil.am +++ b/Makefile-otutil.am @@ -35,8 +35,8 @@ libotutil_la_SOURCES = \ src/libotutil/ot-spawn-utils.h \ src/libotutil/ot-variant-utils.c \ src/libotutil/ot-variant-utils.h \ - src/libotutil/ot-worker-queue.c \ - src/libotutil/ot-worker-queue.h \ + src/libotutil/ot-waitable-queue.c \ + src/libotutil/ot-waitable-queue.h \ src/libotutil/ot-gio-utils.c \ src/libotutil/ot-gio-utils.h \ src/libotutil/otutil.c \ diff --git a/src/libotutil/ot-waitable-queue.c b/src/libotutil/ot-waitable-queue.c new file mode 100644 index 00000000..874499d3 --- /dev/null +++ b/src/libotutil/ot-waitable-queue.c @@ -0,0 +1,120 @@ +/* -*- mode: C; c-file-style: "gnu"; indent-tabs-mode: nil; -*- + * + * Copyright (C) 2012 Colin Walters + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + * + * Author: Colin Walters + */ + +#include "config.h" + +#include "otutil.h" + +#include +#include +#include +#include +#include + +struct OtWaitableQueue { + volatile gint refcount; + GMutex mutex; + int fd; + gboolean read_empty; + GQueue queue; +}; + +OtWaitableQueue * +ot_waitable_queue_new (void) +{ + OtWaitableQueue *queue = g_new0 (OtWaitableQueue, 1); + queue->refcount = 1; + g_mutex_init (&queue->mutex); + g_queue_init (&queue->queue); + + queue->fd = eventfd (0, EFD_CLOEXEC | EFD_NONBLOCK); + g_assert (queue->fd >= 0); + + return queue; +} + +void +ot_waitable_queue_push (OtWaitableQueue *queue, + gpointer data) +{ + const guint64 val = 1; + int rval; + + g_mutex_lock (&queue->mutex); + g_queue_push_head (&queue->queue, data); + do + rval = write (queue->fd, &val, sizeof (val)); + while (G_UNLIKELY (rval == -1 && errno == EINTR)); + queue->read_empty = FALSE; + g_mutex_unlock (&queue->mutex); +} + +gboolean +ot_waitable_queue_pop (OtWaitableQueue *queue, + gpointer *out_data) +{ + gpointer ret = NULL; + gboolean empty = TRUE; + int rval; + guint64 val; + + g_mutex_lock (&queue->mutex); + if (g_queue_peek_tail_link (&queue->queue) != NULL) + { + ret = g_queue_pop_tail (&queue->queue); + empty = FALSE; + } + else if (!queue->read_empty) + { + do + rval = read (queue->fd, &val, sizeof (val)); + while (G_UNLIKELY (rval == -1 && errno == EINTR)); + queue->read_empty = TRUE; + } + g_mutex_unlock (&queue->mutex); + + *out_data = ret; + return !empty; +} + +void +ot_waitable_queue_ref (OtWaitableQueue *queue) +{ + g_atomic_int_inc (&queue->refcount); +} + +void +ot_waitable_queue_unref (OtWaitableQueue *queue) +{ + if (!g_atomic_int_dec_and_test (&queue->refcount)) + return; + g_mutex_clear (&queue->mutex); + g_queue_clear (&queue->queue); + (void) close (queue->fd); + g_free (queue); +} + +GSource * +ot_waitable_queue_create_source (OtWaitableQueue *queue) +{ + return g_unix_fd_source_new (queue->fd, G_IO_IN); +} diff --git a/src/libotutil/ot-worker-queue.h b/src/libotutil/ot-waitable-queue.h similarity index 58% rename from src/libotutil/ot-worker-queue.h rename to src/libotutil/ot-waitable-queue.h index cfd7a923..0fcd7de8 100644 --- a/src/libotutil/ot-worker-queue.h +++ b/src/libotutil/ot-waitable-queue.h @@ -20,30 +20,27 @@ * Author: Colin Walters */ -#ifndef __OSTREE_WORKER_QUEUE_H__ -#define __OSTREE_WORKER_QUEUE_H__ +#ifndef __OSTREE_WAITABLE_QUEUE_H__ +#define __OSTREE_WAITABLE_QUEUE_H__ #include G_BEGIN_DECLS -typedef struct OtWorkerQueue OtWorkerQueue; +typedef struct OtWaitableQueue OtWaitableQueue; -typedef void (*OtWorkerQueueFunc) (gpointer data, - gpointer user_data); +OtWaitableQueue *ot_waitable_queue_new (void); -OtWorkerQueue *ot_worker_queue_new (const char *thread_name, - OtWorkerQueueFunc func, - gpointer data); +void ot_waitable_queue_push (OtWaitableQueue *queue, + gpointer data); -void ot_worker_queue_start (OtWorkerQueue *queue); +GSource *ot_waitable_queue_create_source (OtWaitableQueue *queue); -gboolean ot_worker_queue_is_idle (OtWorkerQueue *queue); +gboolean ot_waitable_queue_pop (OtWaitableQueue *queue, + gpointer *out_val); -void ot_worker_queue_push (OtWorkerQueue *queue, - gpointer data); - -void ot_worker_queue_unref (OtWorkerQueue *queue); +void ot_waitable_queue_ref (OtWaitableQueue *queue); +void ot_waitable_queue_unref (OtWaitableQueue *queue); G_END_DECLS diff --git a/src/libotutil/ot-worker-queue.c b/src/libotutil/ot-worker-queue.c deleted file mode 100644 index 48656625..00000000 --- a/src/libotutil/ot-worker-queue.c +++ /dev/null @@ -1,140 +0,0 @@ -/* -*- mode: C; c-file-style: "gnu"; indent-tabs-mode: nil; -*- - * - * Copyright (C) 2012 Colin Walters - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. - * - * Author: Colin Walters - */ - -#include "config.h" - -#include "otutil.h" - -#include - -struct OtWorkerQueue { - GMutex mutex; - GCond cond; - GQueue queue; - - char *thread_name; - - gboolean complete; - gboolean is_idle; - gboolean destroyed; - - GThread *worker; - - OtWorkerQueueFunc work_func; - OtWorkerQueueFunc work_data; -}; - -static gpointer -ot_worker_queue_thread_main (gpointer user_data); - -OtWorkerQueue * -ot_worker_queue_new (const char *thread_name, - OtWorkerQueueFunc func, - gpointer data) -{ - OtWorkerQueue *queue = g_new0 (OtWorkerQueue, 1); - g_mutex_init (&queue->mutex); - g_cond_init (&queue->cond); - g_queue_init (&queue->queue); - - queue->is_idle = TRUE; - - queue->thread_name = g_strdup (thread_name); - queue->work_func = func; - queue->work_data = data; - - return queue; -} - -void -ot_worker_queue_start (OtWorkerQueue *queue) -{ - queue->worker = g_thread_new (queue->thread_name, ot_worker_queue_thread_main, queue); -} - -void -ot_worker_queue_push (OtWorkerQueue *queue, - gpointer data) -{ - g_mutex_lock (&queue->mutex); - g_queue_push_head (&queue->queue, data); - queue->is_idle = FALSE; - g_cond_signal (&queue->cond); - g_mutex_unlock (&queue->mutex); -} - -static gpointer -ot_worker_queue_thread_main (gpointer user_data) -{ - OtWorkerQueue *queue = user_data; - - while (TRUE) - { - gpointer item; - - g_mutex_lock (&queue->mutex); - - while (!g_queue_peek_tail_link (&queue->queue)) - { - queue->is_idle = TRUE; - g_cond_wait (&queue->cond, &queue->mutex); - } - - item = g_queue_pop_tail (&queue->queue); - - g_mutex_unlock (&queue->mutex); - - if (!item) - break; - - queue->work_func (item, queue->work_data); - } - - return NULL; -} - -gboolean -ot_worker_queue_is_idle (OtWorkerQueue *queue) -{ - gboolean ret; - g_mutex_lock (&queue->mutex); - ret = queue->is_idle; - g_mutex_unlock (&queue->mutex); - return ret; -} - -void -ot_worker_queue_unref (OtWorkerQueue *queue) -{ - if (queue->worker) - { - ot_worker_queue_push (queue, NULL); - g_thread_join (queue->worker); - } - - g_free (queue->thread_name); - - g_mutex_clear (&queue->mutex); - g_cond_clear (&queue->cond); - g_queue_clear (&queue->queue); - g_free (queue); -} diff --git a/src/libotutil/otutil.h b/src/libotutil/otutil.h index cb3346bd..53071bc7 100644 --- a/src/libotutil/otutil.h +++ b/src/libotutil/otutil.h @@ -37,7 +37,7 @@ } \ } G_STMT_END; -#include +#include #include #include #include diff --git a/src/ostree/ostree-pull.c b/src/ostree/ostree-pull.c index 0953cc2d..bdf80350 100644 --- a/src/ostree/ostree-pull.c +++ b/src/ostree/ostree-pull.c @@ -83,6 +83,20 @@ static GOptionEntry options[] = { { NULL }, }; +typedef struct { + enum { + PULL_MSG_SCAN_IDLE, + PULL_MSG_MAIN_IDLE, + PULL_MSG_FETCH, + PULL_MSG_SCAN, + PULL_MSG_QUIT + } t; + union { + guint idle_serial; + GVariant *item; + } d; +} PullWorkerMessage; + typedef struct { OstreeRepo *repo; char *remote_name; @@ -93,21 +107,27 @@ typedef struct { GMainLoop *loop; GCancellable *cancellable; - gboolean metadata_scan_active; volatile gint n_scanned_metadata; - volatile gint n_requested_metadata; - volatile gint n_requested_content; - guint n_fetched_metadata; guint outstanding_uri_requests; - OtWorkerQueue *metadata_objects_to_scan; - GHashTable *scanned_metadata; /* Maps object name to itself */ - GHashTable *requested_content; /* Maps object name to itself */ - guint n_outstanding_metadata_fetches; - - guint n_fetched_content; - guint outstanding_filecontent_requests; - guint outstanding_content_stage_requests; + GThread *metadata_thread; + GMainContext *metadata_thread_context; + GMainLoop *metadata_thread_loop; + OtWaitableQueue *metadata_objects_to_scan; + OtWaitableQueue *metadata_objects_to_fetch; + GHashTable *scanned_metadata; /* Maps object name to itself */ + GHashTable *requested_metadata; /* Maps object name to itself */ + GHashTable *requested_content; /* Maps object name to itself */ + guint metadata_scan_idle : 1; /* TRUE if we passed through an idle message */ + guint idle_serial; /* Incremented when we get a SCAN_IDLE message */ + guint n_outstanding_metadata_fetches; + guint n_outstanding_metadata_stage_requests; + guint n_outstanding_content_fetches; + guint n_outstanding_content_stage_requests; + gint n_requested_metadata; + gint n_requested_content; + guint n_fetched_metadata; + guint n_fetched_content; gboolean have_previous_bytes; guint64 previous_bytes_sec; @@ -118,15 +138,10 @@ typedef struct { } OtPullData; typedef struct { - OtPullData *pull_data; - - gboolean fetching_content; - - GFile *meta_path; - GFile *content_path; - - char *checksum; -} OtFetchOneContentItemData; + OtPullData *pull_data; + GVariant *object; + GFile *temp_path; +} FetchObjectData; static SoupURI * suburi_new (SoupURI *base, @@ -182,46 +197,56 @@ static gboolean uri_fetch_update_status (gpointer user_data) { OtPullData *pull_data = user_data; - ot_lfree char *fetcher_status; + ot_lfree char *fetcher_status = NULL; GString *status; guint64 current_bytes_transferred; guint64 current_delta_bytes_transferred; guint64 delta_bytes_transferred; + guint outstanding_stages; + guint outstanding_fetches; status = g_string_new (""); - if (pull_data->metadata_scan_active) + if (!pull_data->metadata_scan_idle) g_string_append_printf (status, "scan: %u metadata; ", g_atomic_int_get (&pull_data->n_scanned_metadata)); - g_string_append_printf (status, "fetch: %u/%u metadata %u/%u content; ", - g_atomic_int_get (&pull_data->n_fetched_metadata), - g_atomic_int_get (&pull_data->n_requested_metadata), - pull_data->n_fetched_content, - g_atomic_int_get (&pull_data->n_requested_content)); + outstanding_stages = pull_data->n_outstanding_content_stage_requests + pull_data->n_outstanding_metadata_stage_requests; + if (outstanding_stages > 0) + g_string_append_printf (status, "writing: %u objects; ", outstanding_stages); - current_bytes_transferred = ostree_fetcher_bytes_transferred (pull_data->fetcher); - current_delta_bytes_transferred = current_bytes_transferred - pull_data->previous_total_downloaded; - - if (pull_data->have_previous_bytes) - delta_bytes_transferred = (guint64)(0.5 * current_delta_bytes_transferred + 0.5 * pull_data->previous_bytes_sec); - else + outstanding_fetches = pull_data->n_outstanding_content_fetches + pull_data->n_outstanding_metadata_fetches; + if (outstanding_fetches) { - pull_data->have_previous_bytes = TRUE; - delta_bytes_transferred = current_delta_bytes_transferred; + g_string_append_printf (status, "fetch: %u/%u metadata %u/%u content; ", + pull_data->n_fetched_metadata, + pull_data->n_requested_metadata, + pull_data->n_fetched_content, + pull_data->n_requested_content); + + current_bytes_transferred = ostree_fetcher_bytes_transferred (pull_data->fetcher); + current_delta_bytes_transferred = current_bytes_transferred - pull_data->previous_total_downloaded; + + if (pull_data->have_previous_bytes) + delta_bytes_transferred = (guint64)(0.5 * current_delta_bytes_transferred + 0.5 * pull_data->previous_bytes_sec); + else + { + pull_data->have_previous_bytes = TRUE; + delta_bytes_transferred = current_delta_bytes_transferred; + } + pull_data->previous_bytes_sec = delta_bytes_transferred; + pull_data->previous_total_downloaded = current_bytes_transferred; + + if (delta_bytes_transferred < 1024) + g_string_append_printf (status, "%u B/s; ", + (guint)delta_bytes_transferred); + else + g_string_append_printf (status, "%.1f KiB/s; ", + (double)delta_bytes_transferred / 1024); + + fetcher_status = ostree_fetcher_query_state_text (pull_data->fetcher); + g_string_append (status, fetcher_status); } - pull_data->previous_bytes_sec = delta_bytes_transferred; - pull_data->previous_total_downloaded = current_bytes_transferred; - - if (delta_bytes_transferred < 1024) - g_string_append_printf (status, "%u B/s; ", - (guint)delta_bytes_transferred); - else - g_string_append_printf (status, "%.1f KiB/s; ", - (double)delta_bytes_transferred / 1024); - - fetcher_status = ostree_fetcher_query_state_text (pull_data->fetcher); - g_string_append (status, fetcher_status); gs_console_begin_status_line (gs_console_get (), status->str, NULL, NULL); @@ -230,6 +255,27 @@ uri_fetch_update_status (gpointer user_data) return TRUE; } +static PullWorkerMessage * +pull_worker_message_new (int msgtype, gpointer data) +{ + PullWorkerMessage *msg = g_new (PullWorkerMessage, 1); + msg->t = msgtype; + switch (msgtype) + { + case PULL_MSG_SCAN_IDLE: + case PULL_MSG_MAIN_IDLE: + msg->d.idle_serial = GPOINTER_TO_UINT (data); + break; + case PULL_MSG_SCAN: + case PULL_MSG_FETCH: + msg->d.item = data; + break; + case PULL_MSG_QUIT: + break; + } + return msg; +} + static void throw_async_error (OtPullData *pull_data, GError *error) @@ -253,12 +299,26 @@ static void check_outstanding_requests_handle_error (OtPullData *pull_data, GError *error) { - if ((!pull_data->metadata_objects_to_scan || ot_worker_queue_is_idle (pull_data->metadata_objects_to_scan)) && - pull_data->outstanding_uri_requests == 0 && - pull_data->outstanding_filecontent_requests == 0 && - pull_data->n_outstanding_metadata_fetches == 0 && - pull_data->outstanding_content_stage_requests == 0) - g_main_loop_quit (pull_data->loop); + gboolean current_fetch_idle = (pull_data->n_outstanding_metadata_fetches == 0 && + pull_data->n_outstanding_content_fetches == 0); + gboolean current_stage_idle = (pull_data->n_outstanding_metadata_stage_requests == 0 && + pull_data->n_outstanding_content_stage_requests == 0); + + g_debug ("pull: scan: %u fetching: %u staging: %u", + !pull_data->metadata_scan_idle, !current_fetch_idle, !current_stage_idle); + + /* This is true in the phase when we're fetching refs */ + if (pull_data->metadata_objects_to_scan == NULL) + { + if (pull_data->outstanding_uri_requests == 0) + g_main_loop_quit (pull_data->loop); + return; + } + else if (pull_data->metadata_scan_idle && current_fetch_idle && current_stage_idle) + { + g_main_loop_quit (pull_data->loop); + } + throw_async_error (pull_data, error); } @@ -287,6 +347,7 @@ run_mainloop_monitor_fetcher (OtPullData *pull_data) g_source_unref (update_timeout); } + g_idle_add (idle_check_outstanding_requests, pull_data); g_main_loop_run (pull_data->loop); if (console) @@ -337,7 +398,6 @@ fetch_uri (OtPullData *pull_data, fetch_data.pull_data = pull_data; uri_string = soup_uri_to_string (uri, FALSE); - g_print ("Fetching %s\n", uri_string); pull_data->outstanding_uri_requests++; ostree_fetcher_request_uri_async (pull_data->fetcher, uri, cancellable, @@ -385,9 +445,6 @@ fetch_uri_contents_utf8 (OtPullData *pull_data, return ret; } -static gboolean -idle_queue_content_request (gpointer user_data); - static gboolean scan_dirtree_object (OtPullData *pull_data, const char *checksum, @@ -422,7 +479,6 @@ scan_dirtree_object (OtPullData *pull_data, { const char *filename; gboolean file_is_stored; - OtFetchOneContentItemData *idle_fetch_data; ot_lvariant GVariant *csum = NULL; ot_lfree char *file_checksum; @@ -436,21 +492,15 @@ scan_dirtree_object (OtPullData *pull_data, if (!ostree_repo_has_object (pull_data->repo, OSTREE_OBJECT_TYPE_FILE, file_checksum, &file_is_stored, cancellable, error)) goto out; - + if (!file_is_stored && !g_hash_table_lookup (pull_data->requested_content, file_checksum)) { - char *duped_checksum; - - idle_fetch_data = g_new0 (OtFetchOneContentItemData, 1); - idle_fetch_data->pull_data = pull_data; - idle_fetch_data->checksum = file_checksum; - file_checksum = NULL; /* Transfer ownership */ - - duped_checksum = g_strdup (idle_fetch_data->checksum); - g_hash_table_insert (pull_data->requested_content, duped_checksum, duped_checksum); - - g_atomic_int_inc (&pull_data->n_requested_content); - g_main_context_invoke (NULL, idle_queue_content_request, idle_fetch_data); + g_hash_table_insert (pull_data->requested_content, file_checksum, file_checksum); + + ot_waitable_queue_push (pull_data->metadata_objects_to_fetch, + pull_worker_message_new (PULL_MSG_FETCH, + ostree_object_name_serialize (file_checksum, OSTREE_OBJECT_TYPE_FILE))); + file_checksum = NULL; /* Transfer ownership to hash */ } } @@ -513,27 +563,17 @@ fetch_ref_contents (OtPullData *pull_data, return ret; } -static void -destroy_fetch_one_content_item_data (OtFetchOneContentItemData *data) -{ - if (data->meta_path) - (void) gs_file_unlink (data->meta_path, NULL, NULL); - g_clear_object (&data->meta_path); - if (data->content_path) - (void) gs_file_unlink (data->content_path, NULL, NULL); - g_clear_object (&data->content_path); - g_free (data->checksum); - g_free (data); -} - static void content_fetch_on_stage_complete (GObject *object, GAsyncResult *result, gpointer user_data) { - OtFetchOneContentItemData *data = user_data; + FetchObjectData *fetch_data = user_data; + OtPullData *pull_data = fetch_data->pull_data; GError *local_error = NULL; GError **error = &local_error; + OstreeObjectType objtype; + const char *expected_checksum; ot_lfree guchar *csum = NULL; ot_lfree char *checksum = NULL; @@ -543,27 +583,30 @@ content_fetch_on_stage_complete (GObject *object, checksum = ostree_checksum_from_bytes (csum); - g_assert (strcmp (checksum, data->checksum) == 0); + ostree_object_name_deserialize (fetch_data->object, &expected_checksum, &objtype); + g_assert (objtype == OSTREE_OBJECT_TYPE_FILE); - data->pull_data->n_fetched_content++; + g_debug ("stage of %s complete", ostree_object_to_string (checksum, objtype)); + + g_assert (strcmp (checksum, expected_checksum) == 0); + + pull_data->n_fetched_content++; out: - data->pull_data->outstanding_content_stage_requests--; - check_outstanding_requests_handle_error (data->pull_data, local_error); - destroy_fetch_one_content_item_data (data); + pull_data->n_outstanding_content_stage_requests--; + check_outstanding_requests_handle_error (pull_data, local_error); + (void) gs_file_unlink (fetch_data->temp_path, NULL, NULL); + g_object_unref (fetch_data->temp_path); + g_variant_unref (fetch_data->object); + g_free (fetch_data); } -static void -content_fetch_on_complete (GObject *object, - GAsyncResult *result, - gpointer user_data); - - static void content_fetch_on_complete (GObject *object, GAsyncResult *result, gpointer user_data) { - OtFetchOneContentItemData *data = user_data; + FetchObjectData *fetch_data = user_data; + OtPullData *pull_data = fetch_data->pull_data; GError *local_error = NULL; GError **error = &local_error; GCancellable *cancellable = NULL; @@ -575,13 +618,19 @@ content_fetch_on_complete (GObject *object, ot_lvariant GVariant *xattrs = NULL; ot_lobj GInputStream *file_in = NULL; ot_lobj GInputStream *object_input = NULL; + const char *checksum; + OstreeObjectType objtype; - data->content_path = ostree_fetcher_request_uri_finish ((OstreeFetcher*)object, result, error); - if (!data->content_path) + fetch_data->temp_path = ostree_fetcher_request_uri_finish ((OstreeFetcher*)object, result, error); + if (!fetch_data->temp_path) goto out; - g_assert (data->content_path != NULL); - if (!ostree_content_file_parse (TRUE, data->content_path, FALSE, + ostree_object_name_deserialize (fetch_data->object, &checksum, &objtype); + g_assert (objtype == OSTREE_OBJECT_TYPE_FILE); + + g_debug ("fetch of %s complete", ostree_object_to_string (checksum, objtype)); + + if (!ostree_content_file_parse (TRUE, fetch_data->temp_path, FALSE, &file_in, &file_info, &xattrs, cancellable, error)) goto out; @@ -591,62 +640,56 @@ content_fetch_on_complete (GObject *object, cancellable, error)) goto out; - data->pull_data->outstanding_content_stage_requests++; - ostree_repo_stage_content_async (data->pull_data->repo, data->checksum, + pull_data->n_outstanding_content_stage_requests++; + ostree_repo_stage_content_async (pull_data->repo, checksum, object_input, length, cancellable, - content_fetch_on_stage_complete, data); + content_fetch_on_stage_complete, fetch_data); out: - data->pull_data->outstanding_filecontent_requests--; - check_outstanding_requests_handle_error (data->pull_data, local_error); + pull_data->n_outstanding_content_fetches--; + check_outstanding_requests_handle_error (pull_data, local_error); } -static gboolean -idle_queue_content_request (gpointer user_data) -{ - OtFetchOneContentItemData *data = user_data; - OtPullData *pull_data = data->pull_data; - const char *checksum = data->checksum; - ot_lfree char *objpath = NULL; - SoupURI *obj_uri = NULL; - - objpath = ostree_get_relative_object_path (checksum, OSTREE_OBJECT_TYPE_FILE, TRUE); - obj_uri = suburi_new (pull_data->base_uri, objpath, NULL); - - ostree_fetcher_request_uri_async (pull_data->fetcher, obj_uri, pull_data->cancellable, - content_fetch_on_complete, data); - soup_uri_free (obj_uri); - - pull_data->outstanding_filecontent_requests++; - - return FALSE; -} - -typedef struct { - OtPullData *pull_data; - GVariant *object; - GFile *temp_path; -} IdleFetchMetadataObjectData; - static void on_metadata_staged (GObject *object, GAsyncResult *result, gpointer user_data) { - IdleFetchMetadataObjectData *fetch_data = user_data; + FetchObjectData *fetch_data = user_data; OtPullData *pull_data = fetch_data->pull_data; + GError *local_error = NULL; + GError **error = &local_error; + const char *expected_checksum; + OstreeObjectType objtype; + gs_free char *checksum = NULL; + gs_free guchar *csum = NULL; - pull_data->n_fetched_metadata++; - pull_data->n_outstanding_metadata_fetches--; + if (!ostree_repo_stage_metadata_finish ((OstreeRepo*)object, result, + &csum, error)) + goto out; - ot_worker_queue_push (pull_data->metadata_objects_to_scan, - g_variant_ref (fetch_data->object)); + checksum = ostree_checksum_from_bytes (csum); + ostree_object_name_deserialize (fetch_data->object, &expected_checksum, &objtype); + g_assert (OSTREE_OBJECT_TYPE_IS_META (objtype)); + + g_debug ("stage of %s complete", ostree_object_to_string (checksum, objtype)); + + g_assert (strcmp (checksum, expected_checksum) == 0); + + pull_data->metadata_scan_idle = FALSE; + ot_waitable_queue_push (pull_data->metadata_objects_to_scan, + pull_worker_message_new (PULL_MSG_SCAN, + g_variant_ref (fetch_data->object))); + out: + pull_data->n_outstanding_metadata_stage_requests--; (void) gs_file_unlink (fetch_data->temp_path, NULL, NULL); g_object_unref (fetch_data->temp_path); g_variant_unref (fetch_data->object); g_free (fetch_data); + + check_outstanding_requests_handle_error (pull_data, local_error); } static void @@ -654,7 +697,7 @@ meta_fetch_on_complete (GObject *object, GAsyncResult *result, gpointer user_data) { - IdleFetchMetadataObjectData *fetch_data = user_data; + FetchObjectData *fetch_data = user_data; OtPullData *pull_data = fetch_data->pull_data; ot_lvariant GVariant *metadata = NULL; const char *checksum; @@ -668,6 +711,8 @@ meta_fetch_on_complete (GObject *object, ostree_object_name_deserialize (fetch_data->object, &checksum, &objtype); + g_debug ("fetch of %s complete", ostree_object_to_string (checksum, objtype)); + if (!ot_util_variant_map (fetch_data->temp_path, ostree_metadata_variant_type (objtype), FALSE, &metadata, error)) goto out; @@ -676,7 +721,10 @@ meta_fetch_on_complete (GObject *object, pull_data->cancellable, on_metadata_staged, fetch_data); + pull_data->n_outstanding_metadata_stage_requests++; out: + pull_data->n_outstanding_metadata_fetches--; + pull_data->n_fetched_metadata++; throw_async_error (pull_data, local_error); if (local_error) { @@ -685,44 +733,6 @@ meta_fetch_on_complete (GObject *object, } } -static gboolean -idle_fetch_metadata_object (gpointer data) -{ - IdleFetchMetadataObjectData *fetch_data = data; - OtPullData *pull_data = fetch_data->pull_data; - ot_lfree char *objpath = NULL; - const char *checksum; - OstreeObjectType objtype; - SoupURI *obj_uri = NULL; - - ostree_object_name_deserialize (fetch_data->object, &checksum, &objtype); - - objpath = ostree_get_relative_object_path (checksum, objtype, TRUE); - obj_uri = suburi_new (pull_data->base_uri, objpath, NULL); - - pull_data->n_outstanding_metadata_fetches++; - ostree_fetcher_request_uri_async (pull_data->fetcher, obj_uri, pull_data->cancellable, - meta_fetch_on_complete, fetch_data); - soup_uri_free (obj_uri); - - return FALSE; -} - -/** - * queue_metadata_object_fetch: - * - * Pass a request to the main thread to fetch a metadata object. - */ -static void -queue_metadata_object_fetch (OtPullData *pull_data, - GVariant *object) -{ - IdleFetchMetadataObjectData *fetch_data = g_new (IdleFetchMetadataObjectData, 1); - fetch_data->pull_data = pull_data; - fetch_data->object = g_variant_ref (object); - g_idle_add (idle_fetch_metadata_object, fetch_data); -} - static gboolean scan_commit_object (OtPullData *pull_data, const char *checksum, @@ -798,6 +808,7 @@ scan_one_metadata_object (OtPullData *pull_data, gboolean ret = FALSE; ot_lvariant GVariant *object = NULL; ot_lfree char *tmp_checksum = NULL; + gboolean is_requested; gboolean is_stored; tmp_checksum = ostree_checksum_from_bytes (csum); @@ -806,16 +817,21 @@ scan_one_metadata_object (OtPullData *pull_data, if (g_hash_table_lookup (pull_data->scanned_metadata, object)) return TRUE; + is_requested = g_hash_table_lookup (pull_data->requested_metadata, tmp_checksum) != NULL; if (!ostree_repo_has_object (pull_data->repo, objtype, tmp_checksum, &is_stored, cancellable, error)) goto out; - - if (!is_stored) + + if (!is_stored && !is_requested) { - g_atomic_int_inc (&pull_data->n_requested_metadata); - queue_metadata_object_fetch (pull_data, object); + char *duped_checksum = g_strdup (tmp_checksum); + g_hash_table_insert (pull_data->requested_metadata, duped_checksum, duped_checksum); + + ot_waitable_queue_push (pull_data->metadata_objects_to_fetch, + pull_worker_message_new (PULL_MSG_FETCH, + g_variant_ref (object))); } - else + else if (is_stored) { switch (objtype) { @@ -837,8 +853,6 @@ scan_one_metadata_object (OtPullData *pull_data, } g_hash_table_insert (pull_data->scanned_metadata, g_variant_ref (object), object); g_atomic_int_inc (&pull_data->n_scanned_metadata); - - g_idle_add (idle_check_outstanding_requests, pull_data); } ret = TRUE; @@ -879,28 +893,48 @@ idle_throw_error (gpointer user_data) return FALSE; } -/** - * scan_one_metadata_object_dispatch: - * - * Called from the metadatascan worker thread. If we're missing an - * object from one of them, we queue a request to the main thread to - * fetch it. When it's fetched, we get passed the object back and - * scan it. - */ -static void -scan_one_metadata_object_dispatch (gpointer item, - gpointer user_data) +static gboolean +on_metadata_objects_to_scan_ready (gint fd, + GIOCondition condition, + gpointer user_data) { OtPullData *pull_data = user_data; + PullWorkerMessage *msg; + PullWorkerMessage *last_idle_msg = NULL; GError *local_error = NULL; GError **error = &local_error; - ot_lvariant GVariant *v_item = NULL; - v_item = item; - - if (!scan_one_metadata_object_v_name (pull_data, v_item, - pull_data->cancellable, error)) - goto out; + while (ot_waitable_queue_pop (pull_data->metadata_objects_to_scan, (gpointer*)&msg)) + { + if (msg->t == PULL_MSG_SCAN) + { + if (!scan_one_metadata_object_v_name (pull_data, msg->d.item, + pull_data->cancellable, error)) + goto out; + g_variant_unref (msg->d.item); + g_free (msg); + } + else if (msg->t == PULL_MSG_MAIN_IDLE) + { + g_free (last_idle_msg); + last_idle_msg = msg; + } + else if (msg->t == PULL_MSG_QUIT) + { + g_free (msg); + g_main_loop_quit (pull_data->metadata_thread_loop); + } + else + g_assert_not_reached (); + } + + if (last_idle_msg) + ot_waitable_queue_push (pull_data->metadata_objects_to_fetch, + last_idle_msg); + + /* When we have no queue to process, notify the main thread */ + ot_waitable_queue_push (pull_data->metadata_objects_to_fetch, + pull_worker_message_new (PULL_MSG_SCAN_IDLE, GUINT_TO_POINTER (0))); out: if (local_error) @@ -910,17 +944,107 @@ scan_one_metadata_object_dispatch (gpointer item, throwdata->error = local_error; g_main_context_invoke (NULL, idle_throw_error, throwdata); } + return TRUE; } - -static gboolean -idle_start_worker (gpointer user_data) +/** + * metadata_thread_main: + * + * Called from the metadatascan worker thread. If we're missing an + * object from one of them, we queue a request to the main thread to + * fetch it. When it's fetched, we get passed the object back and + * scan it. + */ +static gpointer +metadata_thread_main (gpointer user_data) { OtPullData *pull_data = user_data; + GSource *src; - ot_worker_queue_start (pull_data->metadata_objects_to_scan); + pull_data->metadata_thread_context = g_main_context_new (); + pull_data->metadata_thread_loop = g_main_loop_new (pull_data->metadata_thread_context, TRUE); - return FALSE; + src = ot_waitable_queue_create_source (pull_data->metadata_objects_to_scan); + g_source_set_callback (src, (GSourceFunc)on_metadata_objects_to_scan_ready, pull_data, NULL); + g_source_attach (src, pull_data->metadata_thread_context); + g_source_unref (src); + + g_main_loop_run (pull_data->metadata_thread_loop); + return NULL; +} + +static gboolean +on_metadata_objects_to_fetch_ready (gint fd, + GIOCondition condition, + gpointer user_data) +{ + OtPullData *pull_data = user_data; + PullWorkerMessage *msg; + + if (!ot_waitable_queue_pop (pull_data->metadata_objects_to_fetch, (gpointer*)&msg)) + goto out; + + if (msg->t == PULL_MSG_MAIN_IDLE) + { + if (msg->d.idle_serial == pull_data->idle_serial) + { + g_assert (!pull_data->metadata_scan_idle); + pull_data->metadata_scan_idle = TRUE; + g_debug ("pull: metadata scan is idle"); + } + } + else if (msg->t == PULL_MSG_SCAN_IDLE) + { + if (!pull_data->metadata_scan_idle) + { + g_debug ("pull: queue MAIN_IDLE"); + pull_data->idle_serial++; + ot_waitable_queue_push (pull_data->metadata_objects_to_scan, + pull_worker_message_new (PULL_MSG_MAIN_IDLE, GUINT_TO_POINTER (pull_data->idle_serial))); + } + } + else if (msg->t == PULL_MSG_FETCH) + { + const char *checksum; + gs_free char *objpath = NULL; + OstreeObjectType objtype; + SoupURI *obj_uri = NULL; + gboolean is_meta; + FetchObjectData *fetch_data; + + ostree_object_name_deserialize (msg->d.item, &checksum, &objtype); + objpath = ostree_get_relative_object_path (checksum, objtype, TRUE); + obj_uri = suburi_new (pull_data->base_uri, objpath, NULL); + + is_meta = OSTREE_OBJECT_TYPE_IS_META (objtype); + if (is_meta) + { + pull_data->n_outstanding_metadata_fetches++; + pull_data->n_requested_metadata++; + } + else + { + pull_data->n_outstanding_content_fetches++; + pull_data->n_requested_content++; + } + fetch_data = g_new (FetchObjectData, 1); + fetch_data->pull_data = pull_data; + fetch_data->object = g_variant_ref (msg->d.item); + ostree_fetcher_request_uri_async (pull_data->fetcher, obj_uri, pull_data->cancellable, + is_meta ? meta_fetch_on_complete : content_fetch_on_complete, fetch_data); + soup_uri_free (obj_uri); + g_variant_unref (msg->d.item); + } + else + { + g_assert_not_reached (); + } + g_free (msg); + + out: + check_outstanding_requests_handle_error (pull_data, NULL); + + return TRUE; } static gboolean @@ -1097,6 +1221,8 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error) (GDestroyNotify)g_variant_unref, NULL); pull_data->requested_content = g_hash_table_new_full (g_str_hash, g_str_equal, (GDestroyNotify)g_free, NULL); + pull_data->requested_metadata = g_hash_table_new_full (g_str_hash, g_str_equal, + (GDestroyNotify)g_free, NULL); if (argc < 2) { @@ -1223,19 +1349,18 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error) if (!ostree_repo_prepare_transaction (pull_data->repo, FALSE, NULL, error)) goto out; - pull_data->metadata_scan_active = TRUE; - - pull_data->metadata_objects_to_scan = ot_worker_queue_new ("metadatascan", - scan_one_metadata_object_dispatch, - pull_data); + pull_data->metadata_objects_to_fetch = ot_waitable_queue_new (); + pull_data->metadata_objects_to_scan = ot_waitable_queue_new (); + pull_data->metadata_thread = g_thread_new ("metadatascan", metadata_thread_main, pull_data); g_hash_table_iter_init (&hash_iter, commits_to_fetch); while (g_hash_table_iter_next (&hash_iter, &key, &value)) { const char *commit = value; - ot_worker_queue_push (pull_data->metadata_objects_to_scan, - ostree_object_name_serialize (commit, OSTREE_OBJECT_TYPE_COMMIT)); + ot_waitable_queue_push (pull_data->metadata_objects_to_scan, + pull_worker_message_new (PULL_MSG_SCAN, + ostree_object_name_serialize (commit, OSTREE_OBJECT_TYPE_COMMIT))); } g_hash_table_iter_init (&hash_iter, requested_refs_to_fetch); @@ -1259,23 +1384,28 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error) } else { - ot_worker_queue_push (pull_data->metadata_objects_to_scan, - ostree_object_name_serialize (sha256, OSTREE_OBJECT_TYPE_COMMIT)); + ot_waitable_queue_push (pull_data->metadata_objects_to_scan, + pull_worker_message_new (PULL_MSG_SCAN, + ostree_object_name_serialize (sha256, OSTREE_OBJECT_TYPE_COMMIT))); g_hash_table_insert (updated_refs, g_strdup (ref), g_strdup (sha256)); } } - /* Start metadata thread, which kicks off further metadata requests - * as well as content fetches. - */ - if (!ot_worker_queue_is_idle (pull_data->metadata_objects_to_scan)) - { - g_idle_add (idle_start_worker, pull_data); + { + GSource *src = ot_waitable_queue_create_source (pull_data->metadata_objects_to_fetch); + g_source_set_callback (src, (GSourceFunc)on_metadata_objects_to_fetch_ready, pull_data, NULL); + g_source_attach (src, NULL); + g_source_unref (src); + } - /* Now await work completion */ - if (!run_mainloop_monitor_fetcher (pull_data)) - goto out; - } + /* Prime the message queue */ + pull_data->idle_serial++; + ot_waitable_queue_push (pull_data->metadata_objects_to_scan, + pull_worker_message_new (PULL_MSG_MAIN_IDLE, GUINT_TO_POINTER (pull_data->idle_serial))); + + /* Now await work completion */ + if (!run_mainloop_monitor_fetcher (pull_data)) + goto out; if (!ostree_repo_commit_transaction (pull_data->repo, cancellable, error)) goto out; @@ -1323,9 +1453,17 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error) g_free (pull_data->remote_name); if (pull_data->base_uri) soup_uri_free (pull_data->base_uri); - g_clear_pointer (&pull_data->metadata_objects_to_scan, (GDestroyNotify) ot_worker_queue_unref); + if (pull_data->metadata_thread) + { + ot_waitable_queue_push (pull_data->metadata_objects_to_scan, + pull_worker_message_new (PULL_MSG_QUIT, NULL)); + g_thread_join (pull_data->metadata_thread); + } + g_clear_pointer (&pull_data->metadata_objects_to_scan, (GDestroyNotify) ot_waitable_queue_unref); + g_clear_pointer (&pull_data->metadata_objects_to_fetch, (GDestroyNotify) ot_waitable_queue_unref); g_clear_pointer (&pull_data->scanned_metadata, (GDestroyNotify) g_hash_table_unref); g_clear_pointer (&pull_data->requested_content, (GDestroyNotify) g_hash_table_unref); + g_clear_pointer (&pull_data->requested_metadata, (GDestroyNotify) g_hash_table_unref); g_clear_pointer (&remote_config, (GDestroyNotify) g_key_file_unref); if (summary_uri) soup_uri_free (summary_uri);