From de0e015908d50ffa64c50ef5aaf1e3e2df61e763 Mon Sep 17 00:00:00 2001 From: Alexander Larsson Date: Wed, 25 Oct 2017 22:32:02 +0200 Subject: [PATCH] static deltas: Process each part as soon as its done Directly when we allocate a new part we finish the old one, writing the compressed data to a temporary file and generating the delta header for it. When all these are done we loop over them and collect the headers, sizes and either copy the tempfile data into the inlined superblock or link the tempfiles to disk with the proper names. Closes: #1309 Approved by: cgwalters --- .../ostree-repo-static-delta-compilation.c | 313 ++++++++++-------- 1 file changed, 180 insertions(+), 133 deletions(-) diff --git a/src/libostree/ostree-repo-static-delta-compilation.c b/src/libostree/ostree-repo-static-delta-compilation.c index 70a18c36..41cd347e 100644 --- a/src/libostree/ostree-repo-static-delta-compilation.c +++ b/src/libostree/ostree-repo-static-delta-compilation.c @@ -44,6 +44,7 @@ typedef enum { } DeltaOpts; typedef struct { + guint64 compressed_size; guint64 uncompressed_size; GPtrArray *objects; GString *payload; @@ -52,6 +53,8 @@ typedef struct { GPtrArray *modes; GHashTable *xattr_set; /* GVariant(ayay) -> offset */ GPtrArray *xattrs; + GLnxTmpfile part_tmpf; + GVariant *header; } OstreeStaticDeltaPartBuilder; typedef struct { @@ -66,6 +69,8 @@ typedef struct { guint n_bsdiff; guint n_fallback; gboolean swap_endian; + int parts_dfd; + DeltaOpts delta_opts; } OstreeStaticDeltaBuilder; /* Get an input stream for a GVariant */ @@ -119,6 +124,9 @@ ostree_static_delta_part_builder_unref (OstreeStaticDeltaPartBuilder *part_build g_ptr_array_unref (part_builder->modes); g_hash_table_unref (part_builder->xattr_set); g_ptr_array_unref (part_builder->xattrs); + glnx_tmpfile_clear (&part_builder->part_tmpf); + if (part_builder->header) + g_variant_unref (part_builder->header); g_free (part_builder); } @@ -200,10 +208,123 @@ xattr_chunk_equals (const void *one, const void *two) return memcmp (g_variant_get_data (v1), g_variant_get_data (v2), l1) == 0; } +static gboolean +finish_part (OstreeStaticDeltaBuilder *builder, GError **error) +{ + OstreeStaticDeltaPartBuilder *part_builder = builder->parts->pdata[builder->parts->len - 1]; + g_autofree guchar *part_checksum = NULL; + g_autoptr(GBytes) objtype_checksum_array = NULL; + g_autoptr(GBytes) checksum_bytes = NULL; + g_autoptr(GOutputStream) part_temp_outstream = NULL; + g_autoptr(GInputStream) part_in = NULL; + g_autoptr(GInputStream) part_payload_in = NULL; + g_autoptr(GMemoryOutputStream) part_payload_out = NULL; + g_autoptr(GConverterOutputStream) part_payload_compressor = NULL; + g_autoptr(GConverter) compressor = NULL; + g_autoptr(GVariant) delta_part_content = NULL; + g_autoptr(GVariant) delta_part = NULL; + g_autoptr(GVariant) delta_part_header = NULL; + g_auto(GVariantBuilder) mode_builder = OT_VARIANT_BUILDER_INITIALIZER; + g_auto(GVariantBuilder) xattr_builder = OT_VARIANT_BUILDER_INITIALIZER; + guint8 compression_type_char; + + g_variant_builder_init (&mode_builder, G_VARIANT_TYPE ("a(uuu)")); + g_variant_builder_init (&xattr_builder, G_VARIANT_TYPE ("aa(ayay)")); + guint j; + + for (j = 0; j < part_builder->modes->len; j++) + g_variant_builder_add_value (&mode_builder, part_builder->modes->pdata[j]); + + for (j = 0; j < part_builder->xattrs->len; j++) + g_variant_builder_add_value (&xattr_builder, part_builder->xattrs->pdata[j]); + + { + g_autoptr(GBytes) payload_b; + g_autoptr(GBytes) operations_b; + + payload_b = g_string_free_to_bytes (part_builder->payload); + part_builder->payload = NULL; + + operations_b = g_string_free_to_bytes (part_builder->operations); + part_builder->operations = NULL; + + delta_part_content = g_variant_new ("(a(uuu)aa(ayay)@ay@ay)", + &mode_builder, &xattr_builder, + ot_gvariant_new_ay_bytes (payload_b), + ot_gvariant_new_ay_bytes (operations_b)); + g_variant_ref_sink (delta_part_content); + } + + /* Hardcode xz for now */ + compressor = (GConverter*)_ostree_lzma_compressor_new (NULL); + compression_type_char = 'x'; + part_payload_in = variant_to_inputstream (delta_part_content); + part_payload_out = (GMemoryOutputStream*)g_memory_output_stream_new (NULL, 0, g_realloc, g_free); + part_payload_compressor = (GConverterOutputStream*)g_converter_output_stream_new ((GOutputStream*)part_payload_out, compressor); + + { + gssize n_bytes_written = g_output_stream_splice ((GOutputStream*)part_payload_compressor, part_payload_in, + G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET | G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE, + NULL, error); + if (n_bytes_written < 0) + return FALSE; + } + + g_clear_pointer (&delta_part_content, g_variant_unref); + + { g_autoptr(GBytes) payload = g_memory_output_stream_steal_as_bytes (part_payload_out); + delta_part = g_variant_ref_sink (g_variant_new ("(y@ay)", + compression_type_char, + ot_gvariant_new_ay_bytes (payload))); + } + + if (!glnx_open_tmpfile_linkable_at (builder->parts_dfd, ".", O_RDWR | O_CLOEXEC, + &part_builder->part_tmpf, error)) + return FALSE; + + part_temp_outstream = g_unix_output_stream_new (part_builder->part_tmpf.fd, FALSE); + + part_in = variant_to_inputstream (delta_part); + if (!ot_gio_splice_get_checksum (part_temp_outstream, part_in, + &part_checksum, + NULL, error)) + return FALSE; + + checksum_bytes = g_bytes_new (part_checksum, OSTREE_SHA256_DIGEST_LEN); + objtype_checksum_array = objtype_checksum_array_new (part_builder->objects); + delta_part_header = g_variant_new ("(u@aytt@ay)", + maybe_swap_endian_u32 (builder->swap_endian, OSTREE_DELTAPART_VERSION), + ot_gvariant_new_ay_bytes (checksum_bytes), + maybe_swap_endian_u64 (builder->swap_endian, (guint64) g_variant_get_size (delta_part)), + maybe_swap_endian_u64 (builder->swap_endian, part_builder->uncompressed_size), + ot_gvariant_new_ay_bytes (objtype_checksum_array)); + g_variant_ref_sink (delta_part_header); + + part_builder->header = g_variant_ref (delta_part_header); + part_builder->compressed_size = g_variant_get_size (delta_part); + + if (builder->delta_opts & DELTAOPT_FLAG_VERBOSE) + { + g_printerr ("part %u n:%u compressed:%" G_GUINT64_FORMAT " uncompressed:%" G_GUINT64_FORMAT "\n", + builder->parts->len, part_builder->objects->len, + part_builder->compressed_size, + part_builder->uncompressed_size); + } + + return TRUE; +} + static OstreeStaticDeltaPartBuilder * -allocate_part (OstreeStaticDeltaBuilder *builder) +allocate_part (OstreeStaticDeltaBuilder *builder, GError **error) { OstreeStaticDeltaPartBuilder *part = g_new0 (OstreeStaticDeltaPartBuilder, 1); + + if (builder->parts->len > 0) + { + if (!finish_part (builder, error)) + return NULL; + } + part->objects = g_ptr_array_new_with_free_func ((GDestroyNotify)g_variant_unref); part->payload = g_string_new (NULL); part->operations = g_string_new (NULL); @@ -351,7 +472,10 @@ process_one_object (OstreeRepo *repo, if (current_part->objects->len > 0 && current_part->payload->len + content_size > builder->max_chunk_size_bytes) { - *current_part_val = current_part = allocate_part (builder); + current_part = allocate_part (builder, error); + if (current_part == NULL) + return FALSE; + *current_part_val = current_part; } guint64 compressed_size; @@ -590,7 +714,10 @@ process_one_rollsum (OstreeRepo *repo, if (current_part->objects->len > 0 && current_part->payload->len > builder->max_chunk_size_bytes) { - *current_part_val = current_part = allocate_part (builder); + current_part = allocate_part (builder, error); + if (current_part == NULL) + return FALSE; + *current_part_val = current_part; } g_autoptr(GBytes) tmp_to = NULL; @@ -705,7 +832,10 @@ process_one_bsdiff (OstreeRepo *repo, if (current_part->objects->len > 0 && current_part->payload->len > builder->max_chunk_size_bytes) { - *current_part_val = current_part = allocate_part (builder); + current_part = allocate_part (builder, error); + if (current_part == NULL) + return FALSE; + *current_part_val = current_part; } g_autoptr(GBytes) tmp_from = NULL; @@ -977,7 +1107,9 @@ generate_delta_lowlatency (OstreeRepo *repo, g_hash_table_size (modified_regfile_content)); } - current_part = allocate_part (builder); + current_part = allocate_part (builder, error); + if (current_part == NULL) + return FALSE; /* Pack the metadata first */ g_hash_table_iter_init (&hashiter, new_reachable_metadata); @@ -1093,6 +1225,9 @@ generate_delta_lowlatency (OstreeRepo *repo, return FALSE; } + if (!finish_part (builder, error)) + return FALSE; + return TRUE; } @@ -1258,6 +1393,26 @@ ostree_repo_static_delta_generate (OstreeRepo *self, &to_commit, error)) goto out; + if (opt_filename) + { + g_autofree char *dnbuf = g_strdup (opt_filename); + const char *dn = dirname (dnbuf); + if (!glnx_opendirat (AT_FDCWD, dn, TRUE, &tmp_dfd, error)) + goto out; + } + else + { + tmp_dfd = fcntl (self->tmp_dir_fd, F_DUPFD_CLOEXEC, 3); + if (tmp_dfd < 0) + { + glnx_set_error_from_errno (error); + goto out; + } + } + + builder.parts_dfd = tmp_dfd; + builder.delta_opts = delta_opts; + /* Ignore optimization flags */ if (!generate_delta_lowlatency (self, from, to, delta_opts, &builder, cancellable, error)) @@ -1331,153 +1486,45 @@ ostree_repo_static_delta_generate (OstreeRepo *self, goto out; } - if (opt_filename) - { - g_autofree char *dnbuf = g_strdup (opt_filename); - const char *dn = dirname (dnbuf); - if (!glnx_opendirat (AT_FDCWD, dn, TRUE, &tmp_dfd, error)) - goto out; - } - else - { - tmp_dfd = fcntl (self->tmp_dir_fd, F_DUPFD_CLOEXEC, 3); - if (tmp_dfd < 0) - { - glnx_set_error_from_errno (error); - goto out; - } - } - part_headers = g_variant_builder_new (G_VARIANT_TYPE ("a" OSTREE_STATIC_DELTA_META_ENTRY_FORMAT)); part_temp_paths = g_ptr_array_new_with_free_func ((GDestroyNotify)glnx_tmpfile_clear); for (i = 0; i < builder.parts->len; i++) { OstreeStaticDeltaPartBuilder *part_builder = builder.parts->pdata[i]; - g_autoptr(GBytes) payload_b; - g_autoptr(GBytes) operations_b; - g_autofree guchar *part_checksum = NULL; - g_autoptr(GBytes) objtype_checksum_array = NULL; - g_autoptr(GBytes) checksum_bytes = NULL; - g_autoptr(GOutputStream) part_temp_outstream = NULL; - g_autoptr(GInputStream) part_in = NULL; - g_autoptr(GInputStream) part_payload_in = NULL; - g_autoptr(GMemoryOutputStream) part_payload_out = NULL; - g_autoptr(GConverterOutputStream) part_payload_compressor = NULL; - g_autoptr(GConverter) compressor = NULL; - g_autoptr(GVariant) delta_part_content = NULL; - g_autoptr(GVariant) delta_part = NULL; - g_autoptr(GVariant) delta_part_header = NULL; - g_auto(GVariantBuilder) mode_builder = OT_VARIANT_BUILDER_INITIALIZER; - g_auto(GVariantBuilder) xattr_builder = OT_VARIANT_BUILDER_INITIALIZER; - guint8 compression_type_char; - - g_variant_builder_init (&mode_builder, G_VARIANT_TYPE ("a(uuu)")); - g_variant_builder_init (&xattr_builder, G_VARIANT_TYPE ("aa(ayay)")); - { guint j; - for (j = 0; j < part_builder->modes->len; j++) - g_variant_builder_add_value (&mode_builder, part_builder->modes->pdata[j]); - - for (j = 0; j < part_builder->xattrs->len; j++) - g_variant_builder_add_value (&xattr_builder, part_builder->xattrs->pdata[j]); - } - - payload_b = g_string_free_to_bytes (part_builder->payload); - part_builder->payload = NULL; - - operations_b = g_string_free_to_bytes (part_builder->operations); - part_builder->operations = NULL; - /* FIXME - avoid duplicating memory here */ - delta_part_content = g_variant_new ("(a(uuu)aa(ayay)@ay@ay)", - &mode_builder, &xattr_builder, - ot_gvariant_new_ay_bytes (payload_b), - ot_gvariant_new_ay_bytes (operations_b)); - g_variant_ref_sink (delta_part_content); - - /* Hardcode xz for now */ - compressor = (GConverter*)_ostree_lzma_compressor_new (NULL); - compression_type_char = 'x'; - part_payload_in = variant_to_inputstream (delta_part_content); - part_payload_out = (GMemoryOutputStream*)g_memory_output_stream_new (NULL, 0, g_realloc, g_free); - part_payload_compressor = (GConverterOutputStream*)g_converter_output_stream_new ((GOutputStream*)part_payload_out, compressor); - - { - gssize n_bytes_written = g_output_stream_splice ((GOutputStream*)part_payload_compressor, part_payload_in, - G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET | G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE, - cancellable, error); - if (n_bytes_written < 0) - goto out; - } - - /* FIXME - avoid duplicating memory here */ - { g_autoptr(GBytes) payload = g_memory_output_stream_steal_as_bytes (part_payload_out); - delta_part = g_variant_ref_sink (g_variant_new ("(y@ay)", - compression_type_char, - ot_gvariant_new_ay_bytes (payload))); - } if (inline_parts) { g_autofree char *part_relpath = _ostree_get_relative_static_delta_part_path (from, to, i); - if (!ot_variant_builder_add (descriptor_builder, error, "{sv}", part_relpath, delta_part)) + + lseek (part_builder->part_tmpf.fd, 0, SEEK_SET); + + if (!ot_variant_builder_open (descriptor_builder, G_VARIANT_TYPE ("{sv}"), error) || + !ot_variant_builder_add (descriptor_builder, error, "s", part_relpath) || + !ot_variant_builder_open (descriptor_builder, G_VARIANT_TYPE ("v"), error) || + !ot_variant_builder_add_from_fd (descriptor_builder, G_VARIANT_TYPE ("(yay)"), part_builder->part_tmpf.fd, part_builder->compressed_size, error) || + !ot_variant_builder_close (descriptor_builder, error) || + !ot_variant_builder_close (descriptor_builder, error)) goto out; } else { - GLnxTmpfile *part_tmpf = g_new0 (GLnxTmpfile, 1); + g_autofree char *partstr = g_strdup_printf ("%u", i); - if (!glnx_open_tmpfile_linkable_at (tmp_dfd, ".", O_WRONLY | O_CLOEXEC, - part_tmpf, error)) + if (fchmod (part_builder->part_tmpf.fd, 0644) < 0) + { + glnx_set_error_from_errno (error); + goto out; + } + + if (!glnx_link_tmpfile_at (&part_builder->part_tmpf, GLNX_LINK_TMPFILE_REPLACE, + descriptor_dfd, partstr, error)) goto out; - - /* Transfer tempfile ownership */ - part_temp_outstream = g_unix_output_stream_new (part_tmpf->fd, FALSE); - g_ptr_array_add (part_temp_paths, g_steal_pointer (&part_tmpf)); } - part_in = variant_to_inputstream (delta_part); - if (!ot_gio_splice_get_checksum (part_temp_outstream, part_in, - &part_checksum, - cancellable, error)) - goto out; + g_variant_builder_add_value (part_headers, g_variant_ref (part_builder->header)); - checksum_bytes = g_bytes_new (part_checksum, OSTREE_SHA256_DIGEST_LEN); - objtype_checksum_array = objtype_checksum_array_new (part_builder->objects); - delta_part_header = g_variant_new ("(u@aytt@ay)", - maybe_swap_endian_u32 (builder.swap_endian, OSTREE_DELTAPART_VERSION), - ot_gvariant_new_ay_bytes (checksum_bytes), - maybe_swap_endian_u64 (builder.swap_endian, (guint64) g_variant_get_size (delta_part)), - maybe_swap_endian_u64 (builder.swap_endian, part_builder->uncompressed_size), - ot_gvariant_new_ay_bytes (objtype_checksum_array)); - - g_variant_builder_add_value (part_headers, g_variant_ref (delta_part_header)); - - total_compressed_size += g_variant_get_size (delta_part); + total_compressed_size += part_builder->compressed_size; total_uncompressed_size += part_builder->uncompressed_size; - - if (delta_opts & DELTAOPT_FLAG_VERBOSE) - { - g_printerr ("part %u n:%u compressed:%" G_GUINT64_FORMAT " uncompressed:%" G_GUINT64_FORMAT "\n", - i, part_builder->objects->len, - (guint64)g_variant_get_size (delta_part), - part_builder->uncompressed_size); - } - } - for (i = 0; i < part_temp_paths->len; i++) - { - g_autofree char *partstr = g_strdup_printf ("%u", i); - /* Take ownership of the path/fd here */ - g_auto(GLnxTmpfile) tmpf = *((GLnxTmpfile*)part_temp_paths->pdata[i]); - g_clear_pointer (&(part_temp_paths->pdata[i]), g_free); - - if (fchmod (tmpf.fd, 0644) < 0) - { - glnx_set_error_from_errno (error); - goto out; - } - - if (!glnx_link_tmpfile_at (&tmpf, GLNX_LINK_TMPFILE_REPLACE, - descriptor_dfd, partstr, error)) - goto out; } if (!get_fallback_headers (self, &builder, &fallback_headers,