pull-local: Make multithreaded
We were blocking for easily 1/10 or 1/5 of a second in fdatasync(), which drastically slows down the whole process. This threading isn't quite as good as the ostree-pull command, but it lets us avoid the dependency on libsoup everywhere, and it's simpler.
This commit is contained in:
parent
de1ce843f1
commit
05e7b6d596
|
|
@ -56,3 +56,25 @@ ot_spawn_sync_checked (const char *cwd,
|
|||
out:
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* ot_thread_pool_new_nproc:
|
||||
*
|
||||
* Like g_thread_pool_new (), but choose number of threads appropriate
|
||||
* for CPU bound workers automatically. Also aborts on error.
|
||||
*/
|
||||
GThreadPool *
|
||||
ot_thread_pool_new_nproc (GFunc func,
|
||||
gpointer user_data)
|
||||
{
|
||||
long nproc_onln;
|
||||
GThreadPool *ret;
|
||||
GError *local_error = NULL;
|
||||
|
||||
nproc_onln = sysconf (_SC_NPROCESSORS_ONLN);
|
||||
if (G_UNLIKELY (nproc_onln == -1 && errno == EINVAL))
|
||||
nproc_onln = 2;
|
||||
ret = g_thread_pool_new (func, user_data, (int)nproc_onln, FALSE, &local_error);
|
||||
g_assert_no_error (local_error);
|
||||
return ret;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -37,6 +37,10 @@ gboolean ot_spawn_sync_checked (const char *cwd,
|
|||
char **stderr_data,
|
||||
GError **error);
|
||||
|
||||
GThreadPool * ot_thread_pool_new_nproc (GFunc func,
|
||||
gpointer user_data);
|
||||
|
||||
|
||||
G_END_DECLS
|
||||
|
||||
#endif
|
||||
|
|
|
|||
|
|
@ -35,6 +35,12 @@ static GOptionEntry options[] = {
|
|||
typedef struct {
|
||||
OstreeRepo *src_repo;
|
||||
OstreeRepo *dest_repo;
|
||||
GThreadPool *threadpool;
|
||||
GMainLoop *loop;
|
||||
gboolean stdout_is_tty;
|
||||
int n_objects_to_check;
|
||||
volatile int n_objects_checked;
|
||||
volatile int n_objects_copied;
|
||||
} OtLocalCloneData;
|
||||
|
||||
static gboolean
|
||||
|
|
@ -84,11 +90,61 @@ import_one_object (OtLocalCloneData *data,
|
|||
goto out;
|
||||
}
|
||||
|
||||
g_atomic_int_inc (&data->n_objects_copied);
|
||||
|
||||
ret = TRUE;
|
||||
out:
|
||||
return ret;
|
||||
}
|
||||
|
||||
static void
|
||||
import_one_object_thread (gpointer object,
|
||||
gpointer user_data)
|
||||
{
|
||||
OtLocalCloneData *data = user_data;
|
||||
ot_lvariant GVariant *serialized_key = object;
|
||||
GError *local_error = NULL;
|
||||
GError **error = &local_error;
|
||||
const char *checksum;
|
||||
OstreeObjectType objtype;
|
||||
gboolean has_object;
|
||||
GCancellable *cancellable = NULL;
|
||||
|
||||
ostree_object_name_deserialize (serialized_key, &checksum, &objtype);
|
||||
|
||||
if (!ostree_repo_has_object (data->dest_repo, objtype, checksum, &has_object,
|
||||
cancellable, error))
|
||||
goto out;
|
||||
|
||||
if (!has_object)
|
||||
{
|
||||
if (!import_one_object (data, checksum, objtype, cancellable, error))
|
||||
goto out;
|
||||
}
|
||||
|
||||
out:
|
||||
if (g_atomic_int_add (&data->n_objects_checked, 1) == data->n_objects_to_check - 1)
|
||||
g_main_loop_quit (data->loop);
|
||||
if (local_error != NULL)
|
||||
{
|
||||
g_printerr ("%s\n", local_error->message);
|
||||
exit (1);
|
||||
}
|
||||
}
|
||||
|
||||
static gboolean
|
||||
idle_print_status (gpointer user_data)
|
||||
{
|
||||
OtLocalCloneData *data = user_data;
|
||||
|
||||
g_print ("%c8pull: %d/%d scanned, %d objects copied", 0x1B,
|
||||
g_atomic_int_get (&data->n_objects_checked),
|
||||
data->n_objects_to_check,
|
||||
g_atomic_int_get (&data->n_objects_copied));
|
||||
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
gboolean
|
||||
ostree_builtin_pull_local (int argc, char **argv, GFile *repo_path, GError **error)
|
||||
{
|
||||
|
|
@ -108,18 +164,19 @@ ostree_builtin_pull_local (int argc, char **argv, GFile *repo_path, GError **err
|
|||
ot_lhash GHashTable *refs_to_clone = NULL;
|
||||
ot_lhash GHashTable *source_objects = NULL;
|
||||
ot_lhash GHashTable *objects_to_copy = NULL;
|
||||
OtLocalCloneData data;
|
||||
OtLocalCloneData datav;
|
||||
OtLocalCloneData *data = &datav;
|
||||
|
||||
context = g_option_context_new ("SRC_REPO [REFS...] - Copy data from SRC_REPO");
|
||||
g_option_context_add_main_entries (context, options, NULL);
|
||||
|
||||
memset (&data, 0, sizeof (data));
|
||||
memset (&datav, 0, sizeof (datav));
|
||||
|
||||
if (!g_option_context_parse (context, &argc, &argv, error))
|
||||
goto out;
|
||||
|
||||
data.dest_repo = ostree_repo_new (repo_path);
|
||||
if (!ostree_repo_check (data.dest_repo, error))
|
||||
data->dest_repo = ostree_repo_new (repo_path);
|
||||
if (!ostree_repo_check (data->dest_repo, error))
|
||||
goto out;
|
||||
|
||||
if (argc < 2)
|
||||
|
|
@ -135,16 +192,20 @@ ostree_builtin_pull_local (int argc, char **argv, GFile *repo_path, GError **err
|
|||
src_repo_path = argv[1];
|
||||
src_f = g_file_new_for_path (src_repo_path);
|
||||
|
||||
data.src_repo = ostree_repo_new (src_f);
|
||||
if (!ostree_repo_check (data.src_repo, error))
|
||||
data->src_repo = ostree_repo_new (src_f);
|
||||
if (!ostree_repo_check (data->src_repo, error))
|
||||
goto out;
|
||||
|
||||
src_repo_dir = g_object_ref (ostree_repo_get_path (data.src_repo));
|
||||
dest_repo_dir = g_object_ref (ostree_repo_get_path (data.dest_repo));
|
||||
data->threadpool = ot_thread_pool_new_nproc (import_one_object_thread, data);
|
||||
data->loop = g_main_loop_new (NULL, TRUE);
|
||||
data->stdout_is_tty = isatty (1);
|
||||
|
||||
src_repo_dir = g_object_ref (ostree_repo_get_path (data->src_repo));
|
||||
dest_repo_dir = g_object_ref (ostree_repo_get_path (data->dest_repo));
|
||||
|
||||
if (argc == 2)
|
||||
{
|
||||
if (!ostree_repo_list_all_refs (data.src_repo, &refs_to_clone, cancellable, error))
|
||||
if (!ostree_repo_list_all_refs (data->src_repo, &refs_to_clone, cancellable, error))
|
||||
goto out;
|
||||
}
|
||||
else
|
||||
|
|
@ -155,7 +216,7 @@ ostree_builtin_pull_local (int argc, char **argv, GFile *repo_path, GError **err
|
|||
const char *ref = argv[i];
|
||||
char *rev;
|
||||
|
||||
if (!ostree_repo_resolve_rev (data.src_repo, ref, FALSE, &rev, error))
|
||||
if (!ostree_repo_resolve_rev (data->src_repo, ref, FALSE, &rev, error))
|
||||
goto out;
|
||||
|
||||
/* Transfer ownership of rev */
|
||||
|
|
@ -172,47 +233,39 @@ ostree_builtin_pull_local (int argc, char **argv, GFile *repo_path, GError **err
|
|||
{
|
||||
const char *checksum = value;
|
||||
|
||||
if (!ostree_traverse_commit (data.src_repo, checksum, 0, source_objects, cancellable, error))
|
||||
if (!ostree_traverse_commit (data->src_repo, checksum, 0, source_objects, cancellable, error))
|
||||
goto out;
|
||||
}
|
||||
|
||||
objects_to_copy = ostree_traverse_new_reachable ();
|
||||
if (!ostree_repo_prepare_transaction (data->dest_repo, FALSE, cancellable, error))
|
||||
goto out;
|
||||
|
||||
g_hash_table_iter_init (&hash_iter, source_objects);
|
||||
while (g_hash_table_iter_next (&hash_iter, &key, &value))
|
||||
{
|
||||
GVariant *serialized_key = key;
|
||||
gboolean has_object;
|
||||
const char *checksum;
|
||||
OstreeObjectType objtype;
|
||||
|
||||
ostree_object_name_deserialize (serialized_key, &checksum, &objtype);
|
||||
|
||||
if (!ostree_repo_has_object (data.dest_repo, objtype, checksum, &has_object,
|
||||
cancellable, error))
|
||||
goto out;
|
||||
if (!has_object)
|
||||
g_hash_table_insert (objects_to_copy, g_variant_ref (serialized_key), serialized_key);
|
||||
data->n_objects_to_check++;
|
||||
g_thread_pool_push (data->threadpool, g_variant_ref (serialized_key), NULL);
|
||||
}
|
||||
|
||||
g_print ("%u objects to copy\n", g_hash_table_size (objects_to_copy));
|
||||
|
||||
if (!ostree_repo_prepare_transaction (data.dest_repo, FALSE, cancellable, error))
|
||||
goto out;
|
||||
|
||||
g_hash_table_iter_init (&hash_iter, objects_to_copy);
|
||||
while (g_hash_table_iter_next (&hash_iter, &key, &value))
|
||||
if (data->n_objects_to_check > 0)
|
||||
{
|
||||
GVariant *serialized_key = key;
|
||||
const char *checksum;
|
||||
OstreeObjectType objtype;
|
||||
|
||||
ostree_object_name_deserialize (serialized_key, &checksum, &objtype);
|
||||
|
||||
if (!import_one_object (&data, checksum, objtype, cancellable, error))
|
||||
goto out;
|
||||
if (data->stdout_is_tty)
|
||||
{
|
||||
g_print ("%c7", 0x1B);
|
||||
g_timeout_add_seconds (1, idle_print_status, data);
|
||||
idle_print_status (data);
|
||||
}
|
||||
|
||||
if (!ostree_repo_commit_transaction (data.dest_repo, NULL, error))
|
||||
g_main_loop_run (data->loop);
|
||||
|
||||
idle_print_status (data);
|
||||
|
||||
if (data->stdout_is_tty)
|
||||
g_print ("\n");
|
||||
}
|
||||
|
||||
if (!ostree_repo_commit_transaction (data->dest_repo, NULL, error))
|
||||
goto out;
|
||||
|
||||
g_print ("Writing %u refs\n", g_hash_table_size (refs_to_clone));
|
||||
|
|
@ -223,16 +276,18 @@ ostree_builtin_pull_local (int argc, char **argv, GFile *repo_path, GError **err
|
|||
const char *name = key;
|
||||
const char *checksum = value;
|
||||
|
||||
if (!ostree_repo_write_ref (data.dest_repo, NULL, name, checksum, error))
|
||||
if (!ostree_repo_write_ref (data->dest_repo, NULL, name, checksum, error))
|
||||
goto out;
|
||||
}
|
||||
|
||||
ret = TRUE;
|
||||
out:
|
||||
if (data.src_repo)
|
||||
g_object_unref (data.src_repo);
|
||||
if (data.dest_repo)
|
||||
g_object_unref (data.dest_repo);
|
||||
g_clear_pointer (&data->threadpool, (GDestroyNotify) g_thread_pool_free);
|
||||
g_clear_pointer (&data->loop, (GDestroyNotify) g_main_loop_unref);
|
||||
if (data->src_repo)
|
||||
g_object_unref (data->src_repo);
|
||||
if (data->dest_repo)
|
||||
g_object_unref (data->dest_repo);
|
||||
if (context)
|
||||
g_option_context_free (context);
|
||||
return ret;
|
||||
|
|
|
|||
Loading…
Reference in New Issue