pull: Rework threading communication model

Previously, I've observed bugs where we either:
1) Exit too early, leaving undownloaded objects
2) Hang while downloading

This rewrite hopefully fixes both.
This commit is contained in:
Colin Walters 2013-03-29 17:16:03 -04:00
parent a268b53dc9
commit bac4d7a0d2
6 changed files with 499 additions and 384 deletions

View File

@ -35,8 +35,8 @@ libotutil_la_SOURCES = \
src/libotutil/ot-spawn-utils.h \ src/libotutil/ot-spawn-utils.h \
src/libotutil/ot-variant-utils.c \ src/libotutil/ot-variant-utils.c \
src/libotutil/ot-variant-utils.h \ src/libotutil/ot-variant-utils.h \
src/libotutil/ot-worker-queue.c \ src/libotutil/ot-waitable-queue.c \
src/libotutil/ot-worker-queue.h \ src/libotutil/ot-waitable-queue.h \
src/libotutil/ot-gio-utils.c \ src/libotutil/ot-gio-utils.c \
src/libotutil/ot-gio-utils.h \ src/libotutil/ot-gio-utils.h \
src/libotutil/otutil.c \ src/libotutil/otutil.c \

View File

@ -0,0 +1,120 @@
/* -*- mode: C; c-file-style: "gnu"; indent-tabs-mode: nil; -*-
*
* Copyright (C) 2012 Colin Walters <walters@verbum.org>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*
* Author: Colin Walters <walters@verbum.org>
*/
#include "config.h"
#include "otutil.h"
#include <glib-unix.h>
#include <string.h>
#include <sys/eventfd.h>
#include <errno.h>
#include <unistd.h>
struct OtWaitableQueue {
volatile gint refcount;
GMutex mutex;
int fd;
gboolean read_empty;
GQueue queue;
};
OtWaitableQueue *
ot_waitable_queue_new (void)
{
OtWaitableQueue *queue = g_new0 (OtWaitableQueue, 1);
queue->refcount = 1;
g_mutex_init (&queue->mutex);
g_queue_init (&queue->queue);
queue->fd = eventfd (0, EFD_CLOEXEC | EFD_NONBLOCK);
g_assert (queue->fd >= 0);
return queue;
}
void
ot_waitable_queue_push (OtWaitableQueue *queue,
gpointer data)
{
const guint64 val = 1;
int rval;
g_mutex_lock (&queue->mutex);
g_queue_push_head (&queue->queue, data);
do
rval = write (queue->fd, &val, sizeof (val));
while (G_UNLIKELY (rval == -1 && errno == EINTR));
queue->read_empty = FALSE;
g_mutex_unlock (&queue->mutex);
}
gboolean
ot_waitable_queue_pop (OtWaitableQueue *queue,
gpointer *out_data)
{
gpointer ret = NULL;
gboolean empty = TRUE;
int rval;
guint64 val;
g_mutex_lock (&queue->mutex);
if (g_queue_peek_tail_link (&queue->queue) != NULL)
{
ret = g_queue_pop_tail (&queue->queue);
empty = FALSE;
}
else if (!queue->read_empty)
{
do
rval = read (queue->fd, &val, sizeof (val));
while (G_UNLIKELY (rval == -1 && errno == EINTR));
queue->read_empty = TRUE;
}
g_mutex_unlock (&queue->mutex);
*out_data = ret;
return !empty;
}
void
ot_waitable_queue_ref (OtWaitableQueue *queue)
{
g_atomic_int_inc (&queue->refcount);
}
void
ot_waitable_queue_unref (OtWaitableQueue *queue)
{
if (!g_atomic_int_dec_and_test (&queue->refcount))
return;
g_mutex_clear (&queue->mutex);
g_queue_clear (&queue->queue);
(void) close (queue->fd);
g_free (queue);
}
GSource *
ot_waitable_queue_create_source (OtWaitableQueue *queue)
{
return g_unix_fd_source_new (queue->fd, G_IO_IN);
}

View File

@ -20,30 +20,27 @@
* Author: Colin Walters <walters@verbum.org> * Author: Colin Walters <walters@verbum.org>
*/ */
#ifndef __OSTREE_WORKER_QUEUE_H__ #ifndef __OSTREE_WAITABLE_QUEUE_H__
#define __OSTREE_WORKER_QUEUE_H__ #define __OSTREE_WAITABLE_QUEUE_H__
#include <gio/gio.h> #include <gio/gio.h>
G_BEGIN_DECLS G_BEGIN_DECLS
typedef struct OtWorkerQueue OtWorkerQueue; typedef struct OtWaitableQueue OtWaitableQueue;
typedef void (*OtWorkerQueueFunc) (gpointer data, OtWaitableQueue *ot_waitable_queue_new (void);
gpointer user_data);
OtWorkerQueue *ot_worker_queue_new (const char *thread_name, void ot_waitable_queue_push (OtWaitableQueue *queue,
OtWorkerQueueFunc func, gpointer data);
gpointer data);
void ot_worker_queue_start (OtWorkerQueue *queue); GSource *ot_waitable_queue_create_source (OtWaitableQueue *queue);
gboolean ot_worker_queue_is_idle (OtWorkerQueue *queue); gboolean ot_waitable_queue_pop (OtWaitableQueue *queue,
gpointer *out_val);
void ot_worker_queue_push (OtWorkerQueue *queue, void ot_waitable_queue_ref (OtWaitableQueue *queue);
gpointer data); void ot_waitable_queue_unref (OtWaitableQueue *queue);
void ot_worker_queue_unref (OtWorkerQueue *queue);
G_END_DECLS G_END_DECLS

View File

@ -1,140 +0,0 @@
/* -*- mode: C; c-file-style: "gnu"; indent-tabs-mode: nil; -*-
*
* Copyright (C) 2012 Colin Walters <walters@verbum.org>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*
* Author: Colin Walters <walters@verbum.org>
*/
#include "config.h"
#include "otutil.h"
#include <string.h>
struct OtWorkerQueue {
GMutex mutex;
GCond cond;
GQueue queue;
char *thread_name;
gboolean complete;
gboolean is_idle;
gboolean destroyed;
GThread *worker;
OtWorkerQueueFunc work_func;
OtWorkerQueueFunc work_data;
};
static gpointer
ot_worker_queue_thread_main (gpointer user_data);
OtWorkerQueue *
ot_worker_queue_new (const char *thread_name,
OtWorkerQueueFunc func,
gpointer data)
{
OtWorkerQueue *queue = g_new0 (OtWorkerQueue, 1);
g_mutex_init (&queue->mutex);
g_cond_init (&queue->cond);
g_queue_init (&queue->queue);
queue->is_idle = TRUE;
queue->thread_name = g_strdup (thread_name);
queue->work_func = func;
queue->work_data = data;
return queue;
}
void
ot_worker_queue_start (OtWorkerQueue *queue)
{
queue->worker = g_thread_new (queue->thread_name, ot_worker_queue_thread_main, queue);
}
void
ot_worker_queue_push (OtWorkerQueue *queue,
gpointer data)
{
g_mutex_lock (&queue->mutex);
g_queue_push_head (&queue->queue, data);
queue->is_idle = FALSE;
g_cond_signal (&queue->cond);
g_mutex_unlock (&queue->mutex);
}
static gpointer
ot_worker_queue_thread_main (gpointer user_data)
{
OtWorkerQueue *queue = user_data;
while (TRUE)
{
gpointer item;
g_mutex_lock (&queue->mutex);
while (!g_queue_peek_tail_link (&queue->queue))
{
queue->is_idle = TRUE;
g_cond_wait (&queue->cond, &queue->mutex);
}
item = g_queue_pop_tail (&queue->queue);
g_mutex_unlock (&queue->mutex);
if (!item)
break;
queue->work_func (item, queue->work_data);
}
return NULL;
}
gboolean
ot_worker_queue_is_idle (OtWorkerQueue *queue)
{
gboolean ret;
g_mutex_lock (&queue->mutex);
ret = queue->is_idle;
g_mutex_unlock (&queue->mutex);
return ret;
}
void
ot_worker_queue_unref (OtWorkerQueue *queue)
{
if (queue->worker)
{
ot_worker_queue_push (queue, NULL);
g_thread_join (queue->worker);
}
g_free (queue->thread_name);
g_mutex_clear (&queue->mutex);
g_cond_clear (&queue->cond);
g_queue_clear (&queue->queue);
g_free (queue);
}

View File

@ -37,7 +37,7 @@
} \ } \
} G_STMT_END; } G_STMT_END;
#include <ot-worker-queue.h> #include <ot-waitable-queue.h>
#include <ot-local-alloc.h> #include <ot-local-alloc.h>
#include <ot-keyfile-utils.h> #include <ot-keyfile-utils.h>
#include <ot-gio-utils.h> #include <ot-gio-utils.h>

View File

@ -83,6 +83,20 @@ static GOptionEntry options[] = {
{ NULL }, { NULL },
}; };
typedef struct {
enum {
PULL_MSG_SCAN_IDLE,
PULL_MSG_MAIN_IDLE,
PULL_MSG_FETCH,
PULL_MSG_SCAN,
PULL_MSG_QUIT
} t;
union {
guint idle_serial;
GVariant *item;
} d;
} PullWorkerMessage;
typedef struct { typedef struct {
OstreeRepo *repo; OstreeRepo *repo;
char *remote_name; char *remote_name;
@ -93,21 +107,27 @@ typedef struct {
GMainLoop *loop; GMainLoop *loop;
GCancellable *cancellable; GCancellable *cancellable;
gboolean metadata_scan_active;
volatile gint n_scanned_metadata; volatile gint n_scanned_metadata;
volatile gint n_requested_metadata;
volatile gint n_requested_content;
guint n_fetched_metadata;
guint outstanding_uri_requests; guint outstanding_uri_requests;
OtWorkerQueue *metadata_objects_to_scan; GThread *metadata_thread;
GHashTable *scanned_metadata; /* Maps object name to itself */ GMainContext *metadata_thread_context;
GHashTable *requested_content; /* Maps object name to itself */ GMainLoop *metadata_thread_loop;
guint n_outstanding_metadata_fetches; OtWaitableQueue *metadata_objects_to_scan;
OtWaitableQueue *metadata_objects_to_fetch;
guint n_fetched_content; GHashTable *scanned_metadata; /* Maps object name to itself */
guint outstanding_filecontent_requests; GHashTable *requested_metadata; /* Maps object name to itself */
guint outstanding_content_stage_requests; GHashTable *requested_content; /* Maps object name to itself */
guint metadata_scan_idle : 1; /* TRUE if we passed through an idle message */
guint idle_serial; /* Incremented when we get a SCAN_IDLE message */
guint n_outstanding_metadata_fetches;
guint n_outstanding_metadata_stage_requests;
guint n_outstanding_content_fetches;
guint n_outstanding_content_stage_requests;
gint n_requested_metadata;
gint n_requested_content;
guint n_fetched_metadata;
guint n_fetched_content;
gboolean have_previous_bytes; gboolean have_previous_bytes;
guint64 previous_bytes_sec; guint64 previous_bytes_sec;
@ -118,15 +138,10 @@ typedef struct {
} OtPullData; } OtPullData;
typedef struct { typedef struct {
OtPullData *pull_data; OtPullData *pull_data;
GVariant *object;
gboolean fetching_content; GFile *temp_path;
} FetchObjectData;
GFile *meta_path;
GFile *content_path;
char *checksum;
} OtFetchOneContentItemData;
static SoupURI * static SoupURI *
suburi_new (SoupURI *base, suburi_new (SoupURI *base,
@ -182,46 +197,56 @@ static gboolean
uri_fetch_update_status (gpointer user_data) uri_fetch_update_status (gpointer user_data)
{ {
OtPullData *pull_data = user_data; OtPullData *pull_data = user_data;
ot_lfree char *fetcher_status; ot_lfree char *fetcher_status = NULL;
GString *status; GString *status;
guint64 current_bytes_transferred; guint64 current_bytes_transferred;
guint64 current_delta_bytes_transferred; guint64 current_delta_bytes_transferred;
guint64 delta_bytes_transferred; guint64 delta_bytes_transferred;
guint outstanding_stages;
guint outstanding_fetches;
status = g_string_new (""); status = g_string_new ("");
if (pull_data->metadata_scan_active) if (!pull_data->metadata_scan_idle)
g_string_append_printf (status, "scan: %u metadata; ", g_string_append_printf (status, "scan: %u metadata; ",
g_atomic_int_get (&pull_data->n_scanned_metadata)); g_atomic_int_get (&pull_data->n_scanned_metadata));
g_string_append_printf (status, "fetch: %u/%u metadata %u/%u content; ", outstanding_stages = pull_data->n_outstanding_content_stage_requests + pull_data->n_outstanding_metadata_stage_requests;
g_atomic_int_get (&pull_data->n_fetched_metadata), if (outstanding_stages > 0)
g_atomic_int_get (&pull_data->n_requested_metadata), g_string_append_printf (status, "writing: %u objects; ", outstanding_stages);
pull_data->n_fetched_content,
g_atomic_int_get (&pull_data->n_requested_content));
current_bytes_transferred = ostree_fetcher_bytes_transferred (pull_data->fetcher); outstanding_fetches = pull_data->n_outstanding_content_fetches + pull_data->n_outstanding_metadata_fetches;
current_delta_bytes_transferred = current_bytes_transferred - pull_data->previous_total_downloaded; if (outstanding_fetches)
if (pull_data->have_previous_bytes)
delta_bytes_transferred = (guint64)(0.5 * current_delta_bytes_transferred + 0.5 * pull_data->previous_bytes_sec);
else
{ {
pull_data->have_previous_bytes = TRUE; g_string_append_printf (status, "fetch: %u/%u metadata %u/%u content; ",
delta_bytes_transferred = current_delta_bytes_transferred; pull_data->n_fetched_metadata,
pull_data->n_requested_metadata,
pull_data->n_fetched_content,
pull_data->n_requested_content);
current_bytes_transferred = ostree_fetcher_bytes_transferred (pull_data->fetcher);
current_delta_bytes_transferred = current_bytes_transferred - pull_data->previous_total_downloaded;
if (pull_data->have_previous_bytes)
delta_bytes_transferred = (guint64)(0.5 * current_delta_bytes_transferred + 0.5 * pull_data->previous_bytes_sec);
else
{
pull_data->have_previous_bytes = TRUE;
delta_bytes_transferred = current_delta_bytes_transferred;
}
pull_data->previous_bytes_sec = delta_bytes_transferred;
pull_data->previous_total_downloaded = current_bytes_transferred;
if (delta_bytes_transferred < 1024)
g_string_append_printf (status, "%u B/s; ",
(guint)delta_bytes_transferred);
else
g_string_append_printf (status, "%.1f KiB/s; ",
(double)delta_bytes_transferred / 1024);
fetcher_status = ostree_fetcher_query_state_text (pull_data->fetcher);
g_string_append (status, fetcher_status);
} }
pull_data->previous_bytes_sec = delta_bytes_transferred;
pull_data->previous_total_downloaded = current_bytes_transferred;
if (delta_bytes_transferred < 1024)
g_string_append_printf (status, "%u B/s; ",
(guint)delta_bytes_transferred);
else
g_string_append_printf (status, "%.1f KiB/s; ",
(double)delta_bytes_transferred / 1024);
fetcher_status = ostree_fetcher_query_state_text (pull_data->fetcher);
g_string_append (status, fetcher_status);
gs_console_begin_status_line (gs_console_get (), status->str, NULL, NULL); gs_console_begin_status_line (gs_console_get (), status->str, NULL, NULL);
@ -230,6 +255,27 @@ uri_fetch_update_status (gpointer user_data)
return TRUE; return TRUE;
} }
static PullWorkerMessage *
pull_worker_message_new (int msgtype, gpointer data)
{
PullWorkerMessage *msg = g_new (PullWorkerMessage, 1);
msg->t = msgtype;
switch (msgtype)
{
case PULL_MSG_SCAN_IDLE:
case PULL_MSG_MAIN_IDLE:
msg->d.idle_serial = GPOINTER_TO_UINT (data);
break;
case PULL_MSG_SCAN:
case PULL_MSG_FETCH:
msg->d.item = data;
break;
case PULL_MSG_QUIT:
break;
}
return msg;
}
static void static void
throw_async_error (OtPullData *pull_data, throw_async_error (OtPullData *pull_data,
GError *error) GError *error)
@ -253,12 +299,26 @@ static void
check_outstanding_requests_handle_error (OtPullData *pull_data, check_outstanding_requests_handle_error (OtPullData *pull_data,
GError *error) GError *error)
{ {
if ((!pull_data->metadata_objects_to_scan || ot_worker_queue_is_idle (pull_data->metadata_objects_to_scan)) && gboolean current_fetch_idle = (pull_data->n_outstanding_metadata_fetches == 0 &&
pull_data->outstanding_uri_requests == 0 && pull_data->n_outstanding_content_fetches == 0);
pull_data->outstanding_filecontent_requests == 0 && gboolean current_stage_idle = (pull_data->n_outstanding_metadata_stage_requests == 0 &&
pull_data->n_outstanding_metadata_fetches == 0 && pull_data->n_outstanding_content_stage_requests == 0);
pull_data->outstanding_content_stage_requests == 0)
g_main_loop_quit (pull_data->loop); g_debug ("pull: scan: %u fetching: %u staging: %u",
!pull_data->metadata_scan_idle, !current_fetch_idle, !current_stage_idle);
/* This is true in the phase when we're fetching refs */
if (pull_data->metadata_objects_to_scan == NULL)
{
if (pull_data->outstanding_uri_requests == 0)
g_main_loop_quit (pull_data->loop);
return;
}
else if (pull_data->metadata_scan_idle && current_fetch_idle && current_stage_idle)
{
g_main_loop_quit (pull_data->loop);
}
throw_async_error (pull_data, error); throw_async_error (pull_data, error);
} }
@ -287,6 +347,7 @@ run_mainloop_monitor_fetcher (OtPullData *pull_data)
g_source_unref (update_timeout); g_source_unref (update_timeout);
} }
g_idle_add (idle_check_outstanding_requests, pull_data);
g_main_loop_run (pull_data->loop); g_main_loop_run (pull_data->loop);
if (console) if (console)
@ -337,7 +398,6 @@ fetch_uri (OtPullData *pull_data,
fetch_data.pull_data = pull_data; fetch_data.pull_data = pull_data;
uri_string = soup_uri_to_string (uri, FALSE); uri_string = soup_uri_to_string (uri, FALSE);
g_print ("Fetching %s\n", uri_string);
pull_data->outstanding_uri_requests++; pull_data->outstanding_uri_requests++;
ostree_fetcher_request_uri_async (pull_data->fetcher, uri, cancellable, ostree_fetcher_request_uri_async (pull_data->fetcher, uri, cancellable,
@ -385,9 +445,6 @@ fetch_uri_contents_utf8 (OtPullData *pull_data,
return ret; return ret;
} }
static gboolean
idle_queue_content_request (gpointer user_data);
static gboolean static gboolean
scan_dirtree_object (OtPullData *pull_data, scan_dirtree_object (OtPullData *pull_data,
const char *checksum, const char *checksum,
@ -422,7 +479,6 @@ scan_dirtree_object (OtPullData *pull_data,
{ {
const char *filename; const char *filename;
gboolean file_is_stored; gboolean file_is_stored;
OtFetchOneContentItemData *idle_fetch_data;
ot_lvariant GVariant *csum = NULL; ot_lvariant GVariant *csum = NULL;
ot_lfree char *file_checksum; ot_lfree char *file_checksum;
@ -439,18 +495,12 @@ scan_dirtree_object (OtPullData *pull_data,
if (!file_is_stored && !g_hash_table_lookup (pull_data->requested_content, file_checksum)) if (!file_is_stored && !g_hash_table_lookup (pull_data->requested_content, file_checksum))
{ {
char *duped_checksum; g_hash_table_insert (pull_data->requested_content, file_checksum, file_checksum);
idle_fetch_data = g_new0 (OtFetchOneContentItemData, 1); ot_waitable_queue_push (pull_data->metadata_objects_to_fetch,
idle_fetch_data->pull_data = pull_data; pull_worker_message_new (PULL_MSG_FETCH,
idle_fetch_data->checksum = file_checksum; ostree_object_name_serialize (file_checksum, OSTREE_OBJECT_TYPE_FILE)));
file_checksum = NULL; /* Transfer ownership */ file_checksum = NULL; /* Transfer ownership to hash */
duped_checksum = g_strdup (idle_fetch_data->checksum);
g_hash_table_insert (pull_data->requested_content, duped_checksum, duped_checksum);
g_atomic_int_inc (&pull_data->n_requested_content);
g_main_context_invoke (NULL, idle_queue_content_request, idle_fetch_data);
} }
} }
@ -513,27 +563,17 @@ fetch_ref_contents (OtPullData *pull_data,
return ret; return ret;
} }
static void
destroy_fetch_one_content_item_data (OtFetchOneContentItemData *data)
{
if (data->meta_path)
(void) gs_file_unlink (data->meta_path, NULL, NULL);
g_clear_object (&data->meta_path);
if (data->content_path)
(void) gs_file_unlink (data->content_path, NULL, NULL);
g_clear_object (&data->content_path);
g_free (data->checksum);
g_free (data);
}
static void static void
content_fetch_on_stage_complete (GObject *object, content_fetch_on_stage_complete (GObject *object,
GAsyncResult *result, GAsyncResult *result,
gpointer user_data) gpointer user_data)
{ {
OtFetchOneContentItemData *data = user_data; FetchObjectData *fetch_data = user_data;
OtPullData *pull_data = fetch_data->pull_data;
GError *local_error = NULL; GError *local_error = NULL;
GError **error = &local_error; GError **error = &local_error;
OstreeObjectType objtype;
const char *expected_checksum;
ot_lfree guchar *csum = NULL; ot_lfree guchar *csum = NULL;
ot_lfree char *checksum = NULL; ot_lfree char *checksum = NULL;
@ -543,27 +583,30 @@ content_fetch_on_stage_complete (GObject *object,
checksum = ostree_checksum_from_bytes (csum); checksum = ostree_checksum_from_bytes (csum);
g_assert (strcmp (checksum, data->checksum) == 0); ostree_object_name_deserialize (fetch_data->object, &expected_checksum, &objtype);
g_assert (objtype == OSTREE_OBJECT_TYPE_FILE);
data->pull_data->n_fetched_content++; g_debug ("stage of %s complete", ostree_object_to_string (checksum, objtype));
g_assert (strcmp (checksum, expected_checksum) == 0);
pull_data->n_fetched_content++;
out: out:
data->pull_data->outstanding_content_stage_requests--; pull_data->n_outstanding_content_stage_requests--;
check_outstanding_requests_handle_error (data->pull_data, local_error); check_outstanding_requests_handle_error (pull_data, local_error);
destroy_fetch_one_content_item_data (data); (void) gs_file_unlink (fetch_data->temp_path, NULL, NULL);
g_object_unref (fetch_data->temp_path);
g_variant_unref (fetch_data->object);
g_free (fetch_data);
} }
static void
content_fetch_on_complete (GObject *object,
GAsyncResult *result,
gpointer user_data);
static void static void
content_fetch_on_complete (GObject *object, content_fetch_on_complete (GObject *object,
GAsyncResult *result, GAsyncResult *result,
gpointer user_data) gpointer user_data)
{ {
OtFetchOneContentItemData *data = user_data; FetchObjectData *fetch_data = user_data;
OtPullData *pull_data = fetch_data->pull_data;
GError *local_error = NULL; GError *local_error = NULL;
GError **error = &local_error; GError **error = &local_error;
GCancellable *cancellable = NULL; GCancellable *cancellable = NULL;
@ -575,13 +618,19 @@ content_fetch_on_complete (GObject *object,
ot_lvariant GVariant *xattrs = NULL; ot_lvariant GVariant *xattrs = NULL;
ot_lobj GInputStream *file_in = NULL; ot_lobj GInputStream *file_in = NULL;
ot_lobj GInputStream *object_input = NULL; ot_lobj GInputStream *object_input = NULL;
const char *checksum;
OstreeObjectType objtype;
data->content_path = ostree_fetcher_request_uri_finish ((OstreeFetcher*)object, result, error); fetch_data->temp_path = ostree_fetcher_request_uri_finish ((OstreeFetcher*)object, result, error);
if (!data->content_path) if (!fetch_data->temp_path)
goto out; goto out;
g_assert (data->content_path != NULL); ostree_object_name_deserialize (fetch_data->object, &checksum, &objtype);
if (!ostree_content_file_parse (TRUE, data->content_path, FALSE, g_assert (objtype == OSTREE_OBJECT_TYPE_FILE);
g_debug ("fetch of %s complete", ostree_object_to_string (checksum, objtype));
if (!ostree_content_file_parse (TRUE, fetch_data->temp_path, FALSE,
&file_in, &file_info, &xattrs, &file_in, &file_info, &xattrs,
cancellable, error)) cancellable, error))
goto out; goto out;
@ -591,62 +640,56 @@ content_fetch_on_complete (GObject *object,
cancellable, error)) cancellable, error))
goto out; goto out;
data->pull_data->outstanding_content_stage_requests++; pull_data->n_outstanding_content_stage_requests++;
ostree_repo_stage_content_async (data->pull_data->repo, data->checksum, ostree_repo_stage_content_async (pull_data->repo, checksum,
object_input, length, object_input, length,
cancellable, cancellable,
content_fetch_on_stage_complete, data); content_fetch_on_stage_complete, fetch_data);
out: out:
data->pull_data->outstanding_filecontent_requests--; pull_data->n_outstanding_content_fetches--;
check_outstanding_requests_handle_error (data->pull_data, local_error); check_outstanding_requests_handle_error (pull_data, local_error);
} }
static gboolean
idle_queue_content_request (gpointer user_data)
{
OtFetchOneContentItemData *data = user_data;
OtPullData *pull_data = data->pull_data;
const char *checksum = data->checksum;
ot_lfree char *objpath = NULL;
SoupURI *obj_uri = NULL;
objpath = ostree_get_relative_object_path (checksum, OSTREE_OBJECT_TYPE_FILE, TRUE);
obj_uri = suburi_new (pull_data->base_uri, objpath, NULL);
ostree_fetcher_request_uri_async (pull_data->fetcher, obj_uri, pull_data->cancellable,
content_fetch_on_complete, data);
soup_uri_free (obj_uri);
pull_data->outstanding_filecontent_requests++;
return FALSE;
}
typedef struct {
OtPullData *pull_data;
GVariant *object;
GFile *temp_path;
} IdleFetchMetadataObjectData;
static void static void
on_metadata_staged (GObject *object, on_metadata_staged (GObject *object,
GAsyncResult *result, GAsyncResult *result,
gpointer user_data) gpointer user_data)
{ {
IdleFetchMetadataObjectData *fetch_data = user_data; FetchObjectData *fetch_data = user_data;
OtPullData *pull_data = fetch_data->pull_data; OtPullData *pull_data = fetch_data->pull_data;
GError *local_error = NULL;
GError **error = &local_error;
const char *expected_checksum;
OstreeObjectType objtype;
gs_free char *checksum = NULL;
gs_free guchar *csum = NULL;
pull_data->n_fetched_metadata++; if (!ostree_repo_stage_metadata_finish ((OstreeRepo*)object, result,
pull_data->n_outstanding_metadata_fetches--; &csum, error))
goto out;
ot_worker_queue_push (pull_data->metadata_objects_to_scan, checksum = ostree_checksum_from_bytes (csum);
g_variant_ref (fetch_data->object));
ostree_object_name_deserialize (fetch_data->object, &expected_checksum, &objtype);
g_assert (OSTREE_OBJECT_TYPE_IS_META (objtype));
g_debug ("stage of %s complete", ostree_object_to_string (checksum, objtype));
g_assert (strcmp (checksum, expected_checksum) == 0);
pull_data->metadata_scan_idle = FALSE;
ot_waitable_queue_push (pull_data->metadata_objects_to_scan,
pull_worker_message_new (PULL_MSG_SCAN,
g_variant_ref (fetch_data->object)));
out:
pull_data->n_outstanding_metadata_stage_requests--;
(void) gs_file_unlink (fetch_data->temp_path, NULL, NULL); (void) gs_file_unlink (fetch_data->temp_path, NULL, NULL);
g_object_unref (fetch_data->temp_path); g_object_unref (fetch_data->temp_path);
g_variant_unref (fetch_data->object); g_variant_unref (fetch_data->object);
g_free (fetch_data); g_free (fetch_data);
check_outstanding_requests_handle_error (pull_data, local_error);
} }
static void static void
@ -654,7 +697,7 @@ meta_fetch_on_complete (GObject *object,
GAsyncResult *result, GAsyncResult *result,
gpointer user_data) gpointer user_data)
{ {
IdleFetchMetadataObjectData *fetch_data = user_data; FetchObjectData *fetch_data = user_data;
OtPullData *pull_data = fetch_data->pull_data; OtPullData *pull_data = fetch_data->pull_data;
ot_lvariant GVariant *metadata = NULL; ot_lvariant GVariant *metadata = NULL;
const char *checksum; const char *checksum;
@ -668,6 +711,8 @@ meta_fetch_on_complete (GObject *object,
ostree_object_name_deserialize (fetch_data->object, &checksum, &objtype); ostree_object_name_deserialize (fetch_data->object, &checksum, &objtype);
g_debug ("fetch of %s complete", ostree_object_to_string (checksum, objtype));
if (!ot_util_variant_map (fetch_data->temp_path, ostree_metadata_variant_type (objtype), if (!ot_util_variant_map (fetch_data->temp_path, ostree_metadata_variant_type (objtype),
FALSE, &metadata, error)) FALSE, &metadata, error))
goto out; goto out;
@ -676,7 +721,10 @@ meta_fetch_on_complete (GObject *object,
pull_data->cancellable, pull_data->cancellable,
on_metadata_staged, fetch_data); on_metadata_staged, fetch_data);
pull_data->n_outstanding_metadata_stage_requests++;
out: out:
pull_data->n_outstanding_metadata_fetches--;
pull_data->n_fetched_metadata++;
throw_async_error (pull_data, local_error); throw_async_error (pull_data, local_error);
if (local_error) if (local_error)
{ {
@ -685,44 +733,6 @@ meta_fetch_on_complete (GObject *object,
} }
} }
static gboolean
idle_fetch_metadata_object (gpointer data)
{
IdleFetchMetadataObjectData *fetch_data = data;
OtPullData *pull_data = fetch_data->pull_data;
ot_lfree char *objpath = NULL;
const char *checksum;
OstreeObjectType objtype;
SoupURI *obj_uri = NULL;
ostree_object_name_deserialize (fetch_data->object, &checksum, &objtype);
objpath = ostree_get_relative_object_path (checksum, objtype, TRUE);
obj_uri = suburi_new (pull_data->base_uri, objpath, NULL);
pull_data->n_outstanding_metadata_fetches++;
ostree_fetcher_request_uri_async (pull_data->fetcher, obj_uri, pull_data->cancellable,
meta_fetch_on_complete, fetch_data);
soup_uri_free (obj_uri);
return FALSE;
}
/**
* queue_metadata_object_fetch:
*
* Pass a request to the main thread to fetch a metadata object.
*/
static void
queue_metadata_object_fetch (OtPullData *pull_data,
GVariant *object)
{
IdleFetchMetadataObjectData *fetch_data = g_new (IdleFetchMetadataObjectData, 1);
fetch_data->pull_data = pull_data;
fetch_data->object = g_variant_ref (object);
g_idle_add (idle_fetch_metadata_object, fetch_data);
}
static gboolean static gboolean
scan_commit_object (OtPullData *pull_data, scan_commit_object (OtPullData *pull_data,
const char *checksum, const char *checksum,
@ -798,6 +808,7 @@ scan_one_metadata_object (OtPullData *pull_data,
gboolean ret = FALSE; gboolean ret = FALSE;
ot_lvariant GVariant *object = NULL; ot_lvariant GVariant *object = NULL;
ot_lfree char *tmp_checksum = NULL; ot_lfree char *tmp_checksum = NULL;
gboolean is_requested;
gboolean is_stored; gboolean is_stored;
tmp_checksum = ostree_checksum_from_bytes (csum); tmp_checksum = ostree_checksum_from_bytes (csum);
@ -806,16 +817,21 @@ scan_one_metadata_object (OtPullData *pull_data,
if (g_hash_table_lookup (pull_data->scanned_metadata, object)) if (g_hash_table_lookup (pull_data->scanned_metadata, object))
return TRUE; return TRUE;
is_requested = g_hash_table_lookup (pull_data->requested_metadata, tmp_checksum) != NULL;
if (!ostree_repo_has_object (pull_data->repo, objtype, tmp_checksum, &is_stored, if (!ostree_repo_has_object (pull_data->repo, objtype, tmp_checksum, &is_stored,
cancellable, error)) cancellable, error))
goto out; goto out;
if (!is_stored) if (!is_stored && !is_requested)
{ {
g_atomic_int_inc (&pull_data->n_requested_metadata); char *duped_checksum = g_strdup (tmp_checksum);
queue_metadata_object_fetch (pull_data, object); g_hash_table_insert (pull_data->requested_metadata, duped_checksum, duped_checksum);
ot_waitable_queue_push (pull_data->metadata_objects_to_fetch,
pull_worker_message_new (PULL_MSG_FETCH,
g_variant_ref (object)));
} }
else else if (is_stored)
{ {
switch (objtype) switch (objtype)
{ {
@ -837,8 +853,6 @@ scan_one_metadata_object (OtPullData *pull_data,
} }
g_hash_table_insert (pull_data->scanned_metadata, g_variant_ref (object), object); g_hash_table_insert (pull_data->scanned_metadata, g_variant_ref (object), object);
g_atomic_int_inc (&pull_data->n_scanned_metadata); g_atomic_int_inc (&pull_data->n_scanned_metadata);
g_idle_add (idle_check_outstanding_requests, pull_data);
} }
ret = TRUE; ret = TRUE;
@ -879,28 +893,48 @@ idle_throw_error (gpointer user_data)
return FALSE; return FALSE;
} }
/** static gboolean
* scan_one_metadata_object_dispatch: on_metadata_objects_to_scan_ready (gint fd,
* GIOCondition condition,
* Called from the metadatascan worker thread. If we're missing an gpointer user_data)
* object from one of them, we queue a request to the main thread to
* fetch it. When it's fetched, we get passed the object back and
* scan it.
*/
static void
scan_one_metadata_object_dispatch (gpointer item,
gpointer user_data)
{ {
OtPullData *pull_data = user_data; OtPullData *pull_data = user_data;
PullWorkerMessage *msg;
PullWorkerMessage *last_idle_msg = NULL;
GError *local_error = NULL; GError *local_error = NULL;
GError **error = &local_error; GError **error = &local_error;
ot_lvariant GVariant *v_item = NULL;
v_item = item; while (ot_waitable_queue_pop (pull_data->metadata_objects_to_scan, (gpointer*)&msg))
{
if (msg->t == PULL_MSG_SCAN)
{
if (!scan_one_metadata_object_v_name (pull_data, msg->d.item,
pull_data->cancellable, error))
goto out;
g_variant_unref (msg->d.item);
g_free (msg);
}
else if (msg->t == PULL_MSG_MAIN_IDLE)
{
g_free (last_idle_msg);
last_idle_msg = msg;
}
else if (msg->t == PULL_MSG_QUIT)
{
g_free (msg);
g_main_loop_quit (pull_data->metadata_thread_loop);
}
else
g_assert_not_reached ();
}
if (!scan_one_metadata_object_v_name (pull_data, v_item, if (last_idle_msg)
pull_data->cancellable, error)) ot_waitable_queue_push (pull_data->metadata_objects_to_fetch,
goto out; last_idle_msg);
/* When we have no queue to process, notify the main thread */
ot_waitable_queue_push (pull_data->metadata_objects_to_fetch,
pull_worker_message_new (PULL_MSG_SCAN_IDLE, GUINT_TO_POINTER (0)));
out: out:
if (local_error) if (local_error)
@ -910,17 +944,107 @@ scan_one_metadata_object_dispatch (gpointer item,
throwdata->error = local_error; throwdata->error = local_error;
g_main_context_invoke (NULL, idle_throw_error, throwdata); g_main_context_invoke (NULL, idle_throw_error, throwdata);
} }
return TRUE;
} }
/**
static gboolean * metadata_thread_main:
idle_start_worker (gpointer user_data) *
* Called from the metadatascan worker thread. If we're missing an
* object from one of them, we queue a request to the main thread to
* fetch it. When it's fetched, we get passed the object back and
* scan it.
*/
static gpointer
metadata_thread_main (gpointer user_data)
{ {
OtPullData *pull_data = user_data; OtPullData *pull_data = user_data;
GSource *src;
ot_worker_queue_start (pull_data->metadata_objects_to_scan); pull_data->metadata_thread_context = g_main_context_new ();
pull_data->metadata_thread_loop = g_main_loop_new (pull_data->metadata_thread_context, TRUE);
return FALSE; src = ot_waitable_queue_create_source (pull_data->metadata_objects_to_scan);
g_source_set_callback (src, (GSourceFunc)on_metadata_objects_to_scan_ready, pull_data, NULL);
g_source_attach (src, pull_data->metadata_thread_context);
g_source_unref (src);
g_main_loop_run (pull_data->metadata_thread_loop);
return NULL;
}
static gboolean
on_metadata_objects_to_fetch_ready (gint fd,
GIOCondition condition,
gpointer user_data)
{
OtPullData *pull_data = user_data;
PullWorkerMessage *msg;
if (!ot_waitable_queue_pop (pull_data->metadata_objects_to_fetch, (gpointer*)&msg))
goto out;
if (msg->t == PULL_MSG_MAIN_IDLE)
{
if (msg->d.idle_serial == pull_data->idle_serial)
{
g_assert (!pull_data->metadata_scan_idle);
pull_data->metadata_scan_idle = TRUE;
g_debug ("pull: metadata scan is idle");
}
}
else if (msg->t == PULL_MSG_SCAN_IDLE)
{
if (!pull_data->metadata_scan_idle)
{
g_debug ("pull: queue MAIN_IDLE");
pull_data->idle_serial++;
ot_waitable_queue_push (pull_data->metadata_objects_to_scan,
pull_worker_message_new (PULL_MSG_MAIN_IDLE, GUINT_TO_POINTER (pull_data->idle_serial)));
}
}
else if (msg->t == PULL_MSG_FETCH)
{
const char *checksum;
gs_free char *objpath = NULL;
OstreeObjectType objtype;
SoupURI *obj_uri = NULL;
gboolean is_meta;
FetchObjectData *fetch_data;
ostree_object_name_deserialize (msg->d.item, &checksum, &objtype);
objpath = ostree_get_relative_object_path (checksum, objtype, TRUE);
obj_uri = suburi_new (pull_data->base_uri, objpath, NULL);
is_meta = OSTREE_OBJECT_TYPE_IS_META (objtype);
if (is_meta)
{
pull_data->n_outstanding_metadata_fetches++;
pull_data->n_requested_metadata++;
}
else
{
pull_data->n_outstanding_content_fetches++;
pull_data->n_requested_content++;
}
fetch_data = g_new (FetchObjectData, 1);
fetch_data->pull_data = pull_data;
fetch_data->object = g_variant_ref (msg->d.item);
ostree_fetcher_request_uri_async (pull_data->fetcher, obj_uri, pull_data->cancellable,
is_meta ? meta_fetch_on_complete : content_fetch_on_complete, fetch_data);
soup_uri_free (obj_uri);
g_variant_unref (msg->d.item);
}
else
{
g_assert_not_reached ();
}
g_free (msg);
out:
check_outstanding_requests_handle_error (pull_data, NULL);
return TRUE;
} }
static gboolean static gboolean
@ -1097,6 +1221,8 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error)
(GDestroyNotify)g_variant_unref, NULL); (GDestroyNotify)g_variant_unref, NULL);
pull_data->requested_content = g_hash_table_new_full (g_str_hash, g_str_equal, pull_data->requested_content = g_hash_table_new_full (g_str_hash, g_str_equal,
(GDestroyNotify)g_free, NULL); (GDestroyNotify)g_free, NULL);
pull_data->requested_metadata = g_hash_table_new_full (g_str_hash, g_str_equal,
(GDestroyNotify)g_free, NULL);
if (argc < 2) if (argc < 2)
{ {
@ -1223,19 +1349,18 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error)
if (!ostree_repo_prepare_transaction (pull_data->repo, FALSE, NULL, error)) if (!ostree_repo_prepare_transaction (pull_data->repo, FALSE, NULL, error))
goto out; goto out;
pull_data->metadata_scan_active = TRUE; pull_data->metadata_objects_to_fetch = ot_waitable_queue_new ();
pull_data->metadata_objects_to_scan = ot_waitable_queue_new ();
pull_data->metadata_objects_to_scan = ot_worker_queue_new ("metadatascan", pull_data->metadata_thread = g_thread_new ("metadatascan", metadata_thread_main, pull_data);
scan_one_metadata_object_dispatch,
pull_data);
g_hash_table_iter_init (&hash_iter, commits_to_fetch); g_hash_table_iter_init (&hash_iter, commits_to_fetch);
while (g_hash_table_iter_next (&hash_iter, &key, &value)) while (g_hash_table_iter_next (&hash_iter, &key, &value))
{ {
const char *commit = value; const char *commit = value;
ot_worker_queue_push (pull_data->metadata_objects_to_scan, ot_waitable_queue_push (pull_data->metadata_objects_to_scan,
ostree_object_name_serialize (commit, OSTREE_OBJECT_TYPE_COMMIT)); pull_worker_message_new (PULL_MSG_SCAN,
ostree_object_name_serialize (commit, OSTREE_OBJECT_TYPE_COMMIT)));
} }
g_hash_table_iter_init (&hash_iter, requested_refs_to_fetch); g_hash_table_iter_init (&hash_iter, requested_refs_to_fetch);
@ -1259,23 +1384,28 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error)
} }
else else
{ {
ot_worker_queue_push (pull_data->metadata_objects_to_scan, ot_waitable_queue_push (pull_data->metadata_objects_to_scan,
ostree_object_name_serialize (sha256, OSTREE_OBJECT_TYPE_COMMIT)); pull_worker_message_new (PULL_MSG_SCAN,
ostree_object_name_serialize (sha256, OSTREE_OBJECT_TYPE_COMMIT)));
g_hash_table_insert (updated_refs, g_strdup (ref), g_strdup (sha256)); g_hash_table_insert (updated_refs, g_strdup (ref), g_strdup (sha256));
} }
} }
/* Start metadata thread, which kicks off further metadata requests {
* as well as content fetches. GSource *src = ot_waitable_queue_create_source (pull_data->metadata_objects_to_fetch);
*/ g_source_set_callback (src, (GSourceFunc)on_metadata_objects_to_fetch_ready, pull_data, NULL);
if (!ot_worker_queue_is_idle (pull_data->metadata_objects_to_scan)) g_source_attach (src, NULL);
{ g_source_unref (src);
g_idle_add (idle_start_worker, pull_data); }
/* Now await work completion */ /* Prime the message queue */
if (!run_mainloop_monitor_fetcher (pull_data)) pull_data->idle_serial++;
goto out; ot_waitable_queue_push (pull_data->metadata_objects_to_scan,
} pull_worker_message_new (PULL_MSG_MAIN_IDLE, GUINT_TO_POINTER (pull_data->idle_serial)));
/* Now await work completion */
if (!run_mainloop_monitor_fetcher (pull_data))
goto out;
if (!ostree_repo_commit_transaction (pull_data->repo, cancellable, error)) if (!ostree_repo_commit_transaction (pull_data->repo, cancellable, error))
goto out; goto out;
@ -1323,9 +1453,17 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error)
g_free (pull_data->remote_name); g_free (pull_data->remote_name);
if (pull_data->base_uri) if (pull_data->base_uri)
soup_uri_free (pull_data->base_uri); soup_uri_free (pull_data->base_uri);
g_clear_pointer (&pull_data->metadata_objects_to_scan, (GDestroyNotify) ot_worker_queue_unref); if (pull_data->metadata_thread)
{
ot_waitable_queue_push (pull_data->metadata_objects_to_scan,
pull_worker_message_new (PULL_MSG_QUIT, NULL));
g_thread_join (pull_data->metadata_thread);
}
g_clear_pointer (&pull_data->metadata_objects_to_scan, (GDestroyNotify) ot_waitable_queue_unref);
g_clear_pointer (&pull_data->metadata_objects_to_fetch, (GDestroyNotify) ot_waitable_queue_unref);
g_clear_pointer (&pull_data->scanned_metadata, (GDestroyNotify) g_hash_table_unref); g_clear_pointer (&pull_data->scanned_metadata, (GDestroyNotify) g_hash_table_unref);
g_clear_pointer (&pull_data->requested_content, (GDestroyNotify) g_hash_table_unref); g_clear_pointer (&pull_data->requested_content, (GDestroyNotify) g_hash_table_unref);
g_clear_pointer (&pull_data->requested_metadata, (GDestroyNotify) g_hash_table_unref);
g_clear_pointer (&remote_config, (GDestroyNotify) g_key_file_unref); g_clear_pointer (&remote_config, (GDestroyNotify) g_key_file_unref);
if (summary_uri) if (summary_uri)
soup_uri_free (summary_uri); soup_uri_free (summary_uri);