deltas: Drop async content writes

This caused deadlocks and/or EMFILE due to the interaction between
threads and fds.  What we really want here is a better pull-based
model for parsing content objects.

Another idea would be to change static deltas so that content objects
have a special opcode that includes their metadata first, and then do
rollsums etc. only over actual content.
This commit is contained in:
Colin Walters 2015-01-20 23:21:26 -05:00
parent d49fc876bb
commit 6d1de23f87
1 changed files with 28 additions and 88 deletions

View File

@ -25,6 +25,7 @@
#include <glib-unix.h>
#include <gio/gunixinputstream.h>
#include <gio/gunixoutputstream.h>
#include <gio/gfiledescriptorbased.h>
#include "ostree-repo-private.h"
#include "ostree-repo-static-delta-private.h"
@ -45,8 +46,6 @@ typedef struct {
guint oplen;
gboolean object_start;
guint outstanding_content_writes;
GMainContext *content_writing_context;
gboolean caught_error;
GError **async_error;
@ -93,11 +92,6 @@ static OstreeStaticDeltaOperation op_dispatch_table[] = {
{ NULL }
};
static void
on_content_written (GObject *src,
GAsyncResult *result,
gpointer user_data);
static gboolean
read_varuint64 (StaticDeltaExecutionState *state,
guint64 *out_value,
@ -146,43 +140,10 @@ open_output_target (StaticDeltaExecutionState *state,
if (!read_varuint64 (state, &object_size, error))
goto out;
if (OSTREE_OBJECT_TYPE_IS_META (state->output_objtype))
{
if (!gs_file_open_in_tmpdir_at (state->repo->tmp_dir_fd, 0644,
&state->output_tmp_path, &state->output_tmp_stream,
cancellable, error))
goto out;
}
else
{
int pipefds[2];
if (!g_unix_open_pipe (pipefds, FD_CLOEXEC, error))
goto out;
content_in_stream = g_unix_input_stream_new (pipefds[0], TRUE);
state->output_tmp_stream = g_unix_output_stream_new (pipefds[1], TRUE);
if (!state->content_writing_context)
state->content_writing_context = g_main_context_new();
g_main_context_push_thread_default (state->content_writing_context);
{
StaticDeltaContentWrite *writedata = g_new0 (StaticDeltaContentWrite, 1);
writedata->state = state;
memcpy (writedata->checksum, checksum, sizeof (writedata->checksum));
ostree_repo_write_content_async (state->repo, checksum,
content_in_stream,
object_size,
cancellable,
on_content_written,
writedata);
}
state->outstanding_content_writes++;
g_main_context_pop_thread_default (state->content_writing_context);
}
ret = TRUE;
out:
@ -265,11 +226,6 @@ _ostree_static_delta_part_execute_raw (OstreeRepo *repo,
guint8 opcode;
OstreeStaticDeltaOperation *op;
/* Limit the number of outstanding writes to 1 to prevent too many open files
at the same time. */
while (state->outstanding_content_writes > 1)
g_main_context_iteration (state->content_writing_context, TRUE);
if (state->object_start)
{
if (!open_output_target (state, cancellable, error))
@ -294,15 +250,11 @@ _ostree_static_delta_part_execute_raw (OstreeRepo *repo,
n_executed++;
}
while (state->outstanding_content_writes > 0)
g_main_context_iteration (state->content_writing_context, TRUE);
if (state->caught_error)
goto out;
ret = TRUE;
out:
g_clear_pointer (&state->content_writing_context, g_main_context_unref);
g_clear_pointer (&state->output_tmp_path, g_free);
g_clear_object (&state->output_tmp_stream);
return ret;
@ -496,38 +448,6 @@ validate_ofs (StaticDeltaExecutionState *state,
return TRUE;
}
static void
on_content_written (GObject *src,
GAsyncResult *result,
gpointer user_data)
{
StaticDeltaContentWrite *writedata = user_data;
StaticDeltaExecutionState *state = writedata->state;
GError *local_error = NULL;
GError **error = &local_error;
if (!ostree_repo_write_content_finish ((OstreeRepo*)src, result, NULL, error))
goto out;
out:
state->outstanding_content_writes--;
if (state->outstanding_content_writes == 0)
g_main_context_wakeup (state->content_writing_context);
if (local_error)
{
if (!state->caught_error)
{
state->caught_error = TRUE;
g_main_context_wakeup (state->content_writing_context);
g_propagate_error (state->async_error, local_error);
}
else
{
g_error_free (local_error);
}
}
}
static gboolean
dispatch_write (OstreeRepo *repo,
StaticDeltaExecutionState *state,
@ -663,9 +583,29 @@ dispatch_close (OstreeRepo *repo,
}
else
{
/* We already have an async write going, the close() above will
* ensure it completes.
*/
gs_unref_object GInputStream *instream = NULL;
int fd;
struct stat stbuf;
if (!ot_openat_read_stream (state->repo->tmp_dir_fd,
state->output_tmp_path, FALSE,
&instream, cancellable, error))
goto out;
fd = g_file_descriptor_based_get_fd (G_FILE_DESCRIPTOR_BASED (instream));
if (fstat (fd, &stbuf) == -1)
{
gs_set_error_from_errno (error, errno);
goto out;
}
/* Now get rid of the temporary */
(void) unlinkat (state->repo->tmp_dir_fd, state->output_tmp_path, 0);
if (!ostree_repo_write_content (repo, tmp_checksum,
instream, stbuf.st_size,
NULL, cancellable, error))
goto out;
}
state->output_target = NULL;