libxl_device_model_version_to_string(b_info->device_model_version));
}
+/*----- remus asynchronous checkpoint callback -----*/
+
+static void remus_checkpoint_stream_done(
+ libxl__egc *egc, libxl__stream_read_state *srs, int rc);
+
+static void libxl__remus_domain_checkpoint_callback(void *data)
+{
+ libxl__save_helper_state *shs = data;
+ libxl__domain_create_state *dcs = shs->caller_state;
+ libxl__egc *egc = shs->egc;
+ STATE_AO_GC(dcs->ao);
+
+ libxl__stream_read_start_checkpoint(egc, &dcs->srs);
+}
+
+static void remus_checkpoint_stream_done(
+ libxl__egc *egc, libxl__stream_read_state *stream, int rc)
+{
+ libxl__xc_domain_saverestore_async_callback_done(egc, &stream->shs, rc);
+}
+
/*----- main domain creation -----*/
/* We have a linear control flow; only one event callback is
libxl_domain_config *const d_config = dcs->guest_config;
const int restore_fd = dcs->restore_fd;
libxl__domain_build_state *const state = &dcs->build_state;
+ libxl__srm_restore_autogen_callbacks *const callbacks =
+ &dcs->srs.shs.callbacks.restore.a;
if (rc) {
domcreate_rebuild_done(egc, dcs, rc);
}
/* Restore */
+ callbacks->checkpoint = libxl__remus_domain_checkpoint_callback;
rc = libxl__build_pre(gc, domid, d_config, state);
if (rc)
dcs->srs.fd = restore_fd;
dcs->srs.legacy = (dcs->restore_params.stream_version == 1);
dcs->srs.completion_callback = domcreate_stream_done;
+ dcs->srs.checkpoint_callback = remus_checkpoint_stream_done;
libxl__stream_read_start(egc, &dcs->srs);
return;
* Undefined undef undef undef undef undef
* Idle false undef false 0 0
* Active true NORMAL false 0/1 0/partial
+ * Active true BUFFERING true any 0/partial
+ * Active true UNBUFFERING true any 0
*
* While reading data from the stream, 'dc' is active and a callback
* is expected. Most actions in process_record() start a callback of
* Records are read one at time and immediately processed. (The
* record queue will not contain more than a single record.)
*
+ * PHASE_BUFFERING:
+ * This phase is used in checkpointed streams, when libxc signals
+ * the presence of a checkpoint in the stream. Records are read and
+ * buffered until a CHECKPOINT_END record has been read.
+ *
+ * PHASE_UNBUFFERING:
+ * Once a CHECKPOINT_END record has been read, all buffered records
+ * are processed.
+ *
* Note:
* Record buffers are not allocated from a GC; they are allocated
* and tracked manually. This is to avoid OOM with Remus where the
* - Initialises state. Must be called once before _start()
* - libxl__stream_read_start()
* - Starts reading records from the stream, and acting on them.
+ * - libxl__stream_read_start_checkpoint()
+ * - Starts buffering records at a checkpoint. Must be called on
+ * a running stream.
*
* There are several chains of event:
*
/* Success/error/cleanup handling. */
static void stream_complete(libxl__egc *egc,
libxl__stream_read_state *stream, int rc);
+static void checkpoint_done(libxl__egc *egc,
+ libxl__stream_read_state *stream, int rc);
static void stream_done(libxl__egc *egc,
libxl__stream_read_state *stream);
static void conversion_done(libxl__egc *egc,
{
stream->rc = 0;
stream->running = false;
+ stream->in_checkpoint = false;
libxl__save_helper_init(&stream->shs);
libxl__conversion_helper_init(&stream->chs);
FILLZERO(stream->dc);
stream_complete(egc, stream, rc);
}
+void libxl__stream_read_start_checkpoint(libxl__egc *egc,
+ libxl__stream_read_state *stream)
+{
+ assert(stream->running);
+ assert(!stream->in_checkpoint);
+
+ stream->in_checkpoint = true;
+ stream->phase = SRS_PHASE_BUFFERING;
+
+ /*
+ * Libxc has handed control of the fd to us. Start reading some
+ * libxl records out of it.
+ */
+ stream_continue(egc, stream);
+}
+
void libxl__stream_read_abort(libxl__egc *egc,
libxl__stream_read_state *stream, int rc)
{
}
break;
+ case SRS_PHASE_BUFFERING: {
+ /*
+ * Buffering phase (checkpointed streams only):
+ *
+ * logically:
+ * do { read_record(); } while ( not CHECKPOINT_END );
+ *
+ * Read and buffer all records from the stream until a
+ * CHECKPOINT_END record is encountered. We need to peek at
+ * the tail to spot the CHECKPOINT_END record, and switch to
+ * the unbuffering phase.
+ */
+ libxl__sr_record_buf *rec = LIBXL_STAILQ_LAST(
+ &stream->record_queue, libxl__sr_record_buf, entry);
+
+ assert(stream->in_checkpoint);
+
+ if (!rec || (rec->hdr.type != REC_TYPE_CHECKPOINT_END)) {
+ setup_read_record(egc, stream);
+ break;
+ }
+
+ /*
+ * There are now some number of buffered records, with a
+ * CHECKPOINT_END at the end. Start processing them all.
+ */
+ stream->phase = SRS_PHASE_UNBUFFERING;
+ }
+ /* FALLTHROUGH */
+ case SRS_PHASE_UNBUFFERING:
+ /*
+ * Unbuffering phase (checkpointed streams only):
+ *
+ * logically:
+ * do { process_record(); } while ( not CHECKPOINT_END );
+ *
+ * Process all records collected during the buffering phase.
+ */
+ assert(stream->in_checkpoint);
+
+ while (process_record(egc, stream))
+ ; /*
+ * Nothing! process_record() helpfully tells us if no specific
+ * futher actions have been set up, in which case we want to go
+ * ahead and process the next record.
+ */
+ break;
+
default:
abort();
}
write_emulator_blob(egc, stream, rec);
break;
+ case REC_TYPE_CHECKPOINT_END:
+ if (!stream->in_checkpoint) {
+ LOG(ERROR, "Unexpected CHECKPOINT_END record in stream");
+ rc = ERROR_FAIL;
+ goto err;
+ }
+ checkpoint_done(egc, stream, 0);
+ break;
+
default:
LOG(ERROR, "Unrecognised record 0x%08x", rec->hdr.type);
rc = ERROR_FAIL;
{
assert(stream->running);
+ if (stream->in_checkpoint) {
+ assert(rc);
+
+ /*
+ * If an error is encountered while in a checkpoint, pass it
+ * back to libxc. The failure will come back around to us via
+ * libxl__xc_domain_restore_done()
+ */
+ checkpoint_done(egc, stream, rc);
+ return;
+ }
+
if (!stream->rc)
stream->rc = rc;
stream_done(egc, stream);
}
+static void checkpoint_done(libxl__egc *egc,
+ libxl__stream_read_state *stream, int rc)
+{
+ int ret;
+
+ assert(stream->in_checkpoint);
+
+ if (rc == 0)
+ ret = XGR_CHECKPOINT_SUCCESS;
+ else if (stream->phase == SRS_PHASE_BUFFERING)
+ ret = XGR_CHECKPOINT_FAILOVER;
+ else
+ ret = XGR_CHECKPOINT_ERROR;
+
+ stream->checkpoint_callback(egc, stream, ret);
+
+ stream->in_checkpoint = false;
+ stream->phase = SRS_PHASE_NORMAL;
+}
+
static void stream_done(libxl__egc *egc,
libxl__stream_read_state *stream)
{
libxl__sr_record_buf *rec, *trec;
assert(stream->running);
+ assert(!stream->in_checkpoint);
stream->running = false;
if (stream->incoming_record)