* - stream_write_emulator()
* - stream_write_emulator_done()
* - stream_continue()
+ *
+ * Depending on the contents of the stream, there are likely to be several
+ * parallel tasks being managed. check_all_finished() is used to join all
+ * tasks in both success and error cases.
*/
/* Success/error/cleanup handling. */
libxl__stream_read_state *stream, int rc);
static void stream_done(libxl__egc *egc,
libxl__stream_read_state *stream);
+static void conversion_done(libxl__egc *egc,
+ libxl__conversion_helper_state *chs, int rc);
+static void check_all_finished(libxl__egc *egc,
+ libxl__stream_read_state *stream, int rc);
/* Event chain for first iteration, from _start(). */
static void stream_header_done(libxl__egc *egc,
{
stream->rc = 0;
stream->running = false;
+ libxl__conversion_helper_init(&stream->chs);
FILLZERO(stream->dc);
FILLZERO(stream->hdr);
LIBXL_STAILQ_INIT(&stream->record_queue);
libxl__stream_read_state *stream)
{
libxl__datacopier_state *dc = &stream->dc;
+ STATE_AO_GC(stream->ao);
int rc = 0;
libxl__stream_read_init(stream);
stream->running = true;
stream->phase = SRS_PHASE_NORMAL;
+ if (stream->legacy) {
+ /* Convert the legacy stream. */
+ libxl__conversion_helper_state *chs = &stream->chs;
+
+ chs->ao = stream->ao;
+ chs->legacy_fd = stream->fd;
+ chs->hvm =
+ (stream->dcs->guest_config->b_info.type == LIBXL_DOMAIN_TYPE_HVM);
+ chs->completion_callback = conversion_done;
+
+ rc = libxl__convert_legacy_stream(egc, &stream->chs);
+
+ if (rc) {
+ LOG(ERROR, "Failed to start the legacy stream conversion helper");
+ goto err;
+ }
+
+ assert(stream->chs.v2_carefd);
+ stream->fd = libxl__carefd_fd(stream->chs.v2_carefd);
+ stream->dcs->libxc_fd = stream->fd;
+ }
+ /* stream->fd is now a v2 stream. */
+
dc->ao = stream->ao;
dc->readfd = stream->fd;
dc->writefd = -1;
if (stream->emu_carefd)
libxl__carefd_close(stream->emu_carefd);
+ /* If we started a conversion helper, we took ownership of its carefd. */
+ if (stream->chs.v2_carefd)
+ libxl__carefd_close(stream->chs.v2_carefd);
+
/* The record queue had better be empty if the stream believes
* itself to have been successful. */
assert(LIBXL_STAILQ_EMPTY(&stream->record_queue) || stream->rc);
LIBXL_STAILQ_FOREACH_SAFE(rec, &stream->record_queue, entry, trec)
free_record(rec);
+ check_all_finished(egc, stream, stream->rc);
+}
+
+static void conversion_done(libxl__egc *egc,
+ libxl__conversion_helper_state *chs, int rc)
+{
+ libxl__stream_read_state *stream = CONTAINER_OF(chs, *stream, chs);
+
+ check_all_finished(egc, stream, rc);
+}
+
+static void check_all_finished(libxl__egc *egc,
+ libxl__stream_read_state *stream, int rc)
+{
+ STATE_AO_GC(stream->ao);
+
+ if (!stream->rc && rc) {
+ /* First reported failure. Tear everything down. */
+ stream->rc = rc;
+
+ libxl__stream_read_abort(egc, stream, rc);
+ libxl__conversion_helper_abort(egc, &stream->chs, rc);
+ }
+
+ /* Don't fire the callback until all our parallel tasks have stopped. */
+ if (libxl__stream_read_inuse(stream) ||
+ libxl__conversion_helper_inuse(&stream->chs))
+ return;
+
stream->completion_callback(egc, stream, stream->rc);
}