fetcher: Split lowlevel API into file/membuf variants

The previous commit introduced a single low level API - however,
we can do things in a more optimal way for the curl backend if
we drop the "streaming API" variant.  Currently, we only use
it to synchronously splice into a memory buffer...which is pretty
silly when we could just do that in the backend.

The only tweak here is that we have an "add NUL character" flag that is
(possibly) needed when fetching into a membuf.

The code here ends up being better I think, since we avoid the double return
value for the `_finish()` invocation, and now most of the fetcher code (in the
soup case) writes to a `GOutputStream` consistently.

This will again make things easier for a curl backend.

Closes: #636
Approved by: jlebon
This commit is contained in:
Colin Walters 2016-12-28 14:43:28 -05:00 committed by Atomic Bot
parent 9d0d0a26db
commit 21aca3fa83
4 changed files with 194 additions and 134 deletions

View File

@ -28,7 +28,7 @@
typedef struct typedef struct
{ {
GInputStream *result_stream; GBytes *result_buf;
gboolean done; gboolean done;
GError **error; GError **error;
} }
@ -41,9 +41,9 @@ fetch_uri_sync_on_complete (GObject *object,
{ {
FetchUriSyncData *data = user_data; FetchUriSyncData *data = user_data;
(void)_ostree_fetcher_request_finish ((OstreeFetcher*)object, (void)_ostree_fetcher_request_to_membuf_finish ((OstreeFetcher*)object,
result, NULL, &data->result_stream, result, &data->result_buf,
data->error); data->error);
data->done = TRUE; data->done = TRUE;
} }
@ -59,13 +59,11 @@ _ostree_fetcher_mirrored_request_to_membuf (OstreeFetcher *fetcher,
GError **error) GError **error)
{ {
gboolean ret = FALSE; gboolean ret = FALSE;
const guint8 nulchar = 0;
g_autoptr(GMemoryOutputStream) buf = NULL;
g_autoptr(GMainContext) mainctx = NULL; g_autoptr(GMainContext) mainctx = NULL;
FetchUriSyncData data; FetchUriSyncData data;
g_assert (error != NULL); g_assert (error != NULL);
data.result_stream = NULL; memset (&data, 0, sizeof (data));
if (g_cancellable_set_error_if_cancelled (cancellable, error)) if (g_cancellable_set_error_if_cancelled (cancellable, error))
return FALSE; return FALSE;
@ -76,13 +74,14 @@ _ostree_fetcher_mirrored_request_to_membuf (OstreeFetcher *fetcher,
data.done = FALSE; data.done = FALSE;
data.error = error; data.error = error;
_ostree_fetcher_request_async (fetcher, mirrorlist, filename, 0, max_size, _ostree_fetcher_request_to_membuf (fetcher, mirrorlist, filename,
OSTREE_FETCHER_DEFAULT_PRIORITY, cancellable, add_nul ? OSTREE_FETCHER_REQUEST_NUL_TERMINATION : 0,
fetch_uri_sync_on_complete, &data); max_size, OSTREE_FETCHER_DEFAULT_PRIORITY,
cancellable, fetch_uri_sync_on_complete, &data);
while (!data.done) while (!data.done)
g_main_context_iteration (mainctx, TRUE); g_main_context_iteration (mainctx, TRUE);
if (!data.result_stream) if (!data.result_buf)
{ {
if (allow_noent) if (allow_noent)
{ {
@ -96,27 +95,12 @@ _ostree_fetcher_mirrored_request_to_membuf (OstreeFetcher *fetcher,
goto out; goto out;
} }
buf = (GMemoryOutputStream*)g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
if (g_output_stream_splice ((GOutputStream*)buf, data.result_stream,
G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE,
cancellable, error) < 0)
goto out;
if (add_nul)
{
if (!g_output_stream_write ((GOutputStream*)buf, &nulchar, 1, cancellable, error))
goto out;
}
if (!g_output_stream_close ((GOutputStream*)buf, cancellable, error))
goto out;
ret = TRUE; ret = TRUE;
*out_contents = g_memory_output_stream_steal_as_bytes (buf); *out_contents = g_steal_pointer (&data.result_buf);
out: out:
if (mainctx) if (mainctx)
g_main_context_pop_thread_default (mainctx); g_main_context_pop_thread_default (mainctx);
g_clear_object (&(data.result_stream)); g_clear_pointer (&data.result_buf, (GDestroyNotify)g_bytes_unref);
return ret; return ret;
} }

View File

@ -22,6 +22,7 @@
#include "config.h" #include "config.h"
#include <gio/gio.h>
#include <gio/gfiledescriptorbased.h> #include <gio/gfiledescriptorbased.h>
#include <gio/gunixoutputstream.h> #include <gio/gunixoutputstream.h>
#define LIBSOUP_USE_UNSTABLE_REQUEST_API #define LIBSOUP_USE_UNSTABLE_REQUEST_API
@ -90,7 +91,8 @@ typedef struct {
SoupRequest *request; SoupRequest *request;
gboolean is_stream; gboolean is_membuf;
OstreeFetcherRequestFlags flags;
GInputStream *request_body; GInputStream *request_body;
char *out_tmpfile; char *out_tmpfile;
GOutputStream *out_stream; GOutputStream *out_stream;
@ -468,7 +470,7 @@ session_thread_request_uri (ThreadClosure *thread_closure,
soup_message_headers_append (msg->request_headers, key, value); soup_message_headers_append (msg->request_headers, key, value);
} }
if (pending->is_stream) if (pending->is_membuf)
{ {
soup_request_send_async (pending->request, soup_request_send_async (pending->request,
cancellable, cancellable,
@ -855,6 +857,16 @@ finish_stream (OstreeFetcherPendingURI *pending,
*/ */
if (pending->out_stream) if (pending->out_stream)
{ {
if ((pending->flags & OSTREE_FETCHER_REQUEST_NUL_TERMINATION) > 0)
{
const guint8 nulchar = 0;
gsize bytes_written;
if (!g_output_stream_write_all (pending->out_stream, &nulchar, 1, &bytes_written,
cancellable, error))
goto out;
}
if (!g_output_stream_close (pending->out_stream, cancellable, error)) if (!g_output_stream_close (pending->out_stream, cancellable, error))
goto out; goto out;
@ -864,30 +876,37 @@ finish_stream (OstreeFetcherPendingURI *pending,
g_mutex_unlock (&pending->thread_closure->output_stream_set_lock); g_mutex_unlock (&pending->thread_closure->output_stream_set_lock);
} }
pending->state = OSTREE_FETCHER_STATE_COMPLETE; if (!pending->is_membuf)
if (fstatat (pending->thread_closure->tmpdir_dfd,
pending->out_tmpfile,
&stbuf, AT_SYMLINK_NOFOLLOW) != 0)
{ {
glnx_set_error_from_errno (error); if (fstatat (pending->thread_closure->tmpdir_dfd,
goto out; pending->out_tmpfile,
&stbuf, AT_SYMLINK_NOFOLLOW) != 0)
{
glnx_set_error_from_errno (error);
goto out;
}
} }
pending->state = OSTREE_FETCHER_STATE_COMPLETE;
/* Now that we've finished downloading, continue with other queued /* Now that we've finished downloading, continue with other queued
* requests. * requests.
*/ */
session_thread_process_pending_queue (pending->thread_closure); session_thread_process_pending_queue (pending->thread_closure);
if (stbuf.st_size < pending->content_length) if (!pending->is_membuf)
{ {
g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "Download incomplete"); if (stbuf.st_size < pending->content_length)
goto out; {
} g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "Download incomplete");
else goto out;
{ }
g_mutex_lock (&pending->thread_closure->output_stream_set_lock); else
pending->thread_closure->total_downloaded += stbuf.st_size; {
g_mutex_unlock (&pending->thread_closure->output_stream_set_lock); g_mutex_lock (&pending->thread_closure->output_stream_set_lock);
pending->thread_closure->total_downloaded += stbuf.st_size;
g_mutex_unlock (&pending->thread_closure->output_stream_set_lock);
}
} }
ret = TRUE; ret = TRUE;
@ -973,9 +992,18 @@ on_stream_read (GObject *object,
{ {
if (!finish_stream (pending, cancellable, &local_error)) if (!finish_stream (pending, cancellable, &local_error))
goto out; goto out;
g_task_return_pointer (task, if (pending->is_membuf)
g_strdup (pending->out_tmpfile), {
(GDestroyNotify) g_free); g_task_return_pointer (task,
g_memory_output_stream_steal_as_bytes ((GMemoryOutputStream*)pending->out_stream),
(GDestroyNotify) g_bytes_unref);
}
else
{
g_task_return_pointer (task,
g_strdup (pending->out_tmpfile),
(GDestroyNotify) g_free);
}
remove_pending_rerun_queue (pending); remove_pending_rerun_queue (pending);
} }
else else
@ -1045,23 +1073,15 @@ on_request_sent (GObject *object,
if (SOUP_IS_REQUEST_HTTP (object)) if (SOUP_IS_REQUEST_HTTP (object))
{ {
msg = soup_request_http_get_message ((SoupRequestHTTP*) object); msg = soup_request_http_get_message ((SoupRequestHTTP*) object);
if (msg->status_code == SOUP_STATUS_REQUESTED_RANGE_NOT_SATISFIABLE) if (!pending->is_membuf &&
msg->status_code == SOUP_STATUS_REQUESTED_RANGE_NOT_SATISFIABLE)
{ {
// We already have the whole file, so just use it. // We already have the whole file, so just use it.
pending->state = OSTREE_FETCHER_STATE_COMPLETE; pending->state = OSTREE_FETCHER_STATE_COMPLETE;
(void) g_input_stream_close (pending->request_body, NULL, NULL); (void) g_input_stream_close (pending->request_body, NULL, NULL);
if (pending->is_stream) g_task_return_pointer (task,
{ g_strdup (pending->out_tmpfile),
g_task_return_pointer (task, (GDestroyNotify) g_free);
g_object_ref (pending->request_body),
(GDestroyNotify) g_object_unref);
}
else
{
g_task_return_pointer (task,
g_strdup (pending->out_tmpfile),
(GDestroyNotify) g_free);
}
remove_pending_rerun_queue (pending); remove_pending_rerun_queue (pending);
goto out; goto out;
} }
@ -1126,7 +1146,7 @@ on_request_sent (GObject *object,
pending->content_length = soup_request_get_content_length (pending->request); pending->content_length = soup_request_get_content_length (pending->request);
if (!pending->is_stream) if (!pending->is_membuf)
{ {
int oflags = O_CREAT | O_WRONLY | O_CLOEXEC; int oflags = O_CREAT | O_WRONLY | O_CLOEXEC;
int fd; int fd;
@ -1147,26 +1167,23 @@ on_request_sent (GObject *object,
goto out; goto out;
} }
pending->out_stream = g_unix_output_stream_new (fd, TRUE); pending->out_stream = g_unix_output_stream_new (fd, TRUE);
g_mutex_lock (&pending->thread_closure->output_stream_set_lock);
g_hash_table_add (pending->thread_closure->output_stream_set,
g_object_ref (pending->out_stream));
g_mutex_unlock (&pending->thread_closure->output_stream_set_lock);
g_input_stream_read_bytes_async (pending->request_body,
8192, G_PRIORITY_DEFAULT,
cancellable,
on_stream_read,
g_object_ref (task));
} }
else else
{ {
g_task_return_pointer (task, pending->out_stream = g_memory_output_stream_new_resizable ();
g_object_ref (pending->request_body),
(GDestroyNotify) g_object_unref);
remove_pending_rerun_queue (pending);
} }
g_mutex_lock (&pending->thread_closure->output_stream_set_lock);
g_hash_table_add (pending->thread_closure->output_stream_set,
g_object_ref (pending->out_stream));
g_mutex_unlock (&pending->thread_closure->output_stream_set_lock);
g_input_stream_read_bytes_async (pending->request_body,
8192, G_PRIORITY_DEFAULT,
cancellable,
on_stream_read,
g_object_ref (task));
out: out:
if (local_error) if (local_error)
{ {
@ -1179,11 +1196,12 @@ on_request_sent (GObject *object,
g_object_unref (task); g_object_unref (task);
} }
void static void
_ostree_fetcher_request_async (OstreeFetcher *self, _ostree_fetcher_request_async (OstreeFetcher *self,
GPtrArray *mirrorlist, GPtrArray *mirrorlist,
const char *filename, const char *filename,
OstreeFetcherRequestFlags flags, OstreeFetcherRequestFlags flags,
gboolean is_membuf,
guint64 max_size, guint64 max_size,
int priority, int priority,
GCancellable *cancellable, GCancellable *cancellable,
@ -1203,8 +1221,9 @@ _ostree_fetcher_request_async (OstreeFetcher *self,
pending->thread_closure = thread_closure_ref (self->thread_closure); pending->thread_closure = thread_closure_ref (self->thread_closure);
pending->mirrorlist = g_ptr_array_ref (mirrorlist); pending->mirrorlist = g_ptr_array_ref (mirrorlist);
pending->filename = g_strdup (filename); pending->filename = g_strdup (filename);
pending->flags = flags;
pending->max_size = max_size; pending->max_size = max_size;
pending->is_stream = (flags & OSTREE_FETCHER_REQUEST_FLAG_ENABLE_PARTIAL) == 0; pending->is_membuf = is_membuf;
task = g_task_new (self, cancellable, callback, user_data); task = g_task_new (self, cancellable, callback, user_data);
g_task_set_source_tag (task, _ostree_fetcher_request_async); g_task_set_source_tag (task, _ostree_fetcher_request_async);
@ -1219,12 +1238,26 @@ _ostree_fetcher_request_async (OstreeFetcher *self,
(GDestroyNotify) g_object_unref); (GDestroyNotify) g_object_unref);
} }
void
_ostree_fetcher_request_to_tmpfile (OstreeFetcher *self,
GPtrArray *mirrorlist,
const char *filename,
guint64 max_size,
int priority,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data)
{
_ostree_fetcher_request_async (self, mirrorlist, filename, 0, FALSE,
max_size, priority, cancellable,
callback, user_data);
}
gboolean gboolean
_ostree_fetcher_request_finish (OstreeFetcher *self, _ostree_fetcher_request_to_tmpfile_finish (OstreeFetcher *self,
GAsyncResult *result, GAsyncResult *result,
char **out_filename, char **out_filename,
GInputStream **out_stream, GError **error)
GError **error)
{ {
GTask *task; GTask *task;
OstreeFetcherPendingURI *pending; OstreeFetcherPendingURI *pending;
@ -1233,12 +1266,6 @@ _ostree_fetcher_request_finish (OstreeFetcher *self,
g_return_val_if_fail (g_task_is_valid (result, self), FALSE); g_return_val_if_fail (g_task_is_valid (result, self), FALSE);
g_return_val_if_fail (g_async_result_is_tagged (result, _ostree_fetcher_request_async), FALSE); g_return_val_if_fail (g_async_result_is_tagged (result, _ostree_fetcher_request_async), FALSE);
/* Special dance to implement
enum FetchResult {
Filename(String path),
Membuf(uint8[])
} in Rust terms
*/
task = (GTask*)result; task = (GTask*)result;
pending = g_task_get_task_data (task); pending = g_task_get_task_data (task);
@ -1246,20 +1273,57 @@ _ostree_fetcher_request_finish (OstreeFetcher *self,
if (!ret) if (!ret)
return FALSE; return FALSE;
if (pending->is_stream) g_assert (!pending->is_membuf);
{ g_assert (out_filename);
g_assert (out_stream); *out_filename = ret;
*out_stream = ret;
}
else
{
g_assert (out_filename);
*out_filename = ret;
}
return TRUE; return TRUE;
} }
void
_ostree_fetcher_request_to_membuf (OstreeFetcher *self,
GPtrArray *mirrorlist,
const char *filename,
OstreeFetcherRequestFlags flags,
guint64 max_size,
int priority,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data)
{
_ostree_fetcher_request_async (self, mirrorlist, filename, flags, TRUE,
max_size, priority, cancellable,
callback, user_data);
}
gboolean
_ostree_fetcher_request_to_membuf_finish (OstreeFetcher *self,
GAsyncResult *result,
GBytes **out_buf,
GError **error)
{
GTask *task;
OstreeFetcherPendingURI *pending;
gpointer ret;
g_return_val_if_fail (g_task_is_valid (result, self), FALSE);
g_return_val_if_fail (g_async_result_is_tagged (result, _ostree_fetcher_request_async), FALSE);
task = (GTask*)result;
pending = g_task_get_task_data (task);
ret = g_task_propagate_pointer (task, error);
if (!ret)
return FALSE;
g_assert (pending->is_membuf);
g_assert (out_buf);
*out_buf = ret;
return TRUE;
}
guint64 guint64
_ostree_fetcher_bytes_transferred (OstreeFetcher *self) _ostree_fetcher_bytes_transferred (OstreeFetcher *self)
{ {

View File

@ -103,25 +103,39 @@ void _ostree_fetcher_set_extra_headers (OstreeFetcher *self,
guint64 _ostree_fetcher_bytes_transferred (OstreeFetcher *self); guint64 _ostree_fetcher_bytes_transferred (OstreeFetcher *self);
void _ostree_fetcher_request_to_tmpfile (OstreeFetcher *self,
GPtrArray *mirrorlist,
const char *filename,
guint64 max_size,
int priority,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data);
gboolean _ostree_fetcher_request_to_tmpfile_finish (OstreeFetcher *self,
GAsyncResult *result,
char **out_filename,
GError **error);
typedef enum { typedef enum {
OSTREE_FETCHER_REQUEST_FLAG_ENABLE_PARTIAL = (1 << 0) OSTREE_FETCHER_REQUEST_NUL_TERMINATION = (1 << 0)
} OstreeFetcherRequestFlags; } OstreeFetcherRequestFlags;
void _ostree_fetcher_request_async (OstreeFetcher *self, void _ostree_fetcher_request_to_membuf (OstreeFetcher *self,
GPtrArray *mirrorlist, GPtrArray *mirrorlist,
const char *filename, const char *filename,
OstreeFetcherRequestFlags flags, OstreeFetcherRequestFlags flags,
guint64 max_size, guint64 max_size,
int priority, int priority,
GCancellable *cancellable, GCancellable *cancellable,
GAsyncReadyCallback callback, GAsyncReadyCallback callback,
gpointer user_data); gpointer user_data);
gboolean _ostree_fetcher_request_to_membuf_finish (OstreeFetcher *self,
GAsyncResult *result,
GBytes **out_buf,
GError **error);
gboolean _ostree_fetcher_request_finish (OstreeFetcher *self,
GAsyncResult *result,
char **out_filename,
GInputStream **out_stream,
GError **error);
G_END_DECLS G_END_DECLS

View File

@ -708,7 +708,7 @@ content_fetch_on_complete (GObject *object,
OstreeObjectType objtype; OstreeObjectType objtype;
gboolean free_fetch_data = TRUE; gboolean free_fetch_data = TRUE;
if (!_ostree_fetcher_request_finish (fetcher, result, &temp_path, NULL, error)) if (!_ostree_fetcher_request_to_tmpfile_finish (fetcher, result, &temp_path, error))
goto out; goto out;
ostree_object_name_deserialize (fetch_data->object, &checksum, &objtype); ostree_object_name_deserialize (fetch_data->object, &checksum, &objtype);
@ -841,7 +841,7 @@ meta_fetch_on_complete (GObject *object,
g_debug ("fetch of %s%s complete", checksum_obj, g_debug ("fetch of %s%s complete", checksum_obj,
fetch_data->is_detached_meta ? " (detached)" : ""); fetch_data->is_detached_meta ? " (detached)" : "");
if (!_ostree_fetcher_request_finish (fetcher, result, &temp_path, NULL, error)) if (!_ostree_fetcher_request_to_tmpfile_finish (fetcher, result, &temp_path, error))
{ {
if (g_error_matches (local_error, G_IO_ERROR, G_IO_ERROR_NOT_FOUND)) if (g_error_matches (local_error, G_IO_ERROR, G_IO_ERROR_NOT_FOUND))
{ {
@ -981,7 +981,7 @@ static_deltapart_fetch_on_complete (GObject *object,
g_debug ("fetch static delta part %s complete", fetch_data->expected_checksum); g_debug ("fetch static delta part %s complete", fetch_data->expected_checksum);
if (!_ostree_fetcher_request_finish (fetcher, result, &temp_path, NULL, error)) if (!_ostree_fetcher_request_to_tmpfile_finish (fetcher, result, &temp_path, error))
goto out; goto out;
fd = openat (_ostree_fetcher_get_dfd (fetcher), temp_path, O_RDONLY | O_CLOEXEC); fd = openat (_ostree_fetcher_get_dfd (fetcher), temp_path, O_RDONLY | O_CLOEXEC);
@ -1380,13 +1380,12 @@ enqueue_one_object_request (OtPullData *pull_data,
else else
expected_max_size = 0; expected_max_size = 0;
_ostree_fetcher_request_async (pull_data->fetcher, mirrorlist, _ostree_fetcher_request_to_tmpfile (pull_data->fetcher, mirrorlist,
obj_subpath, OSTREE_FETCHER_REQUEST_FLAG_ENABLE_PARTIAL, obj_subpath, expected_max_size,
expected_max_size, is_meta ? OSTREE_REPO_PULL_METADATA_PRIORITY
is_meta ? OSTREE_REPO_PULL_METADATA_PRIORITY : OSTREE_REPO_PULL_CONTENT_PRIORITY,
: OSTREE_REPO_PULL_CONTENT_PRIORITY, pull_data->cancellable,
pull_data->cancellable, is_meta ? meta_fetch_on_complete : content_fetch_on_complete, fetch_data);
is_meta ? meta_fetch_on_complete : content_fetch_on_complete, fetch_data);
} }
static gboolean static gboolean
@ -1734,14 +1733,13 @@ process_one_static_delta (OtPullData *pull_data,
} }
else else
{ {
_ostree_fetcher_request_async (pull_data->fetcher, _ostree_fetcher_request_to_tmpfile (pull_data->fetcher,
pull_data->content_mirrorlist, pull_data->content_mirrorlist,
deltapart_path, OSTREE_FETCHER_REQUEST_FLAG_ENABLE_PARTIAL, deltapart_path, size,
size, OSTREE_FETCHER_DEFAULT_PRIORITY,
OSTREE_FETCHER_DEFAULT_PRIORITY, pull_data->cancellable,
pull_data->cancellable, static_deltapart_fetch_on_complete,
static_deltapart_fetch_on_complete, fetch_data);
fetch_data);
pull_data->n_outstanding_deltapart_fetches++; pull_data->n_outstanding_deltapart_fetches++;
} }
} }