ia64/xen-unstable

view tools/xfrd/xfrd.c @ 3498:1d24a5b0b338

bitkeeper revision 1.1159.223.25 (41f2cb9aEKMZkZbvqBE0eXhpljlV4Q)

Description: fix path to python
There is no python2 in debian. Instead, use python.

From: Adam Heath <doogie@brainfood.com>
Signed-off-by: ian.pratt@cl.cam.ac.uk
author iap10@labyrinth.cl.cam.ac.uk
date Sat Jan 22 21:54:34 2005 +0000 (2005-01-22)
parents f65b65977b19
children 0a4b76b6b5a0 6b2ff04c12cf 0dc3b8b8c298
line source
1 /** @file
2 * XFRD - Domain Transfer Daemon for Xen.
3 *
4 * The xfrd is forked by xend to transfer a vm to a remote system.
5 *
6 * The vm is suspended, then its state and memory are transferred to the remote system.
7 * The remote system attempts to create a vm and copy the transferred state and memory into it,
8 * finally resuming the vm. If all is OK the vm ends up running on the remote
9 * system and is removed from the originating system. If the transfer does not complete
10 * successfully the originating system attempts to resume the vm.
11 * The children exit when the transfer completes.
12 *
13 * @author Mike Wray <mike.wray@hpl.hp.com>
14 */
16 #include <stdlib.h>
17 #include <unistd.h>
18 #include <stdio.h>
19 #include <getopt.h>
20 #include <errno.h>
21 #include <sys/types.h>
22 #include <sys/stat.h>
23 #include <fcntl.h>
24 #include <time.h>
25 #include <sys/socket.h>
26 #include <netinet/in.h>
27 #include <arpa/inet.h>
28 #include <string.h>
30 #include <signal.h>
31 #include <sys/wait.h>
32 #include <sys/select.h>
34 #include "allocate.h"
35 #include "file_stream.h"
36 #include "string_stream.h"
37 #include "lzi_stream.h"
38 #include "gzip_stream.h"
39 #include "sys_net.h"
40 #include "sys_string.h"
42 //#include "xdr.h"
43 #include "enum.h"
44 #include "xfrd.h"
46 #include "xen_domain.h"
48 #include "connection.h"
49 #include "select.h"
51 #define MODULE_NAME "XFRD"
52 #define DEBUG 1
53 #undef DEBUG
54 #include "debug.h"
56 /*
57 sender:
58 xend connects to xfrd and writes migrate message
59 xend writes domain config to xfrd
61 xfrd forks
63 xfrd connects to peer
64 xfrd sends hello, reads response
65 xfrd sends domain
66 xfrd reads response
67 reports progress/status to xend
69 xend reads xfrd for progress/status, disconnects
70 If ok, destroys domain.
71 If not ok, unpauses domain.
73 receiver:
74 xfrd accepts connection on inbound port
75 xfrd forks and accepts connection
76 xfrd receives hello, writes response
77 xfrd receives domain
78 xfrd connects to xend, configures new domain
79 xfrd writes status back to peer, child exits
82 (xfr.hello <major> <minor>)
83 (xfr.err <code> <reason>)
85 xend->xfrd (xfr.migrate <domain> <vmconfig> <host> <port> <live>)
86 (xfr.save <domain> <vmconfig> <file>)
87 xfrd->xend (xfr.suspend <domain>)
88 xfrd->xend (xfr.progress <percent> <rate: kb/s>)
89 xfrd->xend (xfr.err <code> <reason>) | (xfr.ok <domain>)
90 xfrd->xfrd (xfr.xfr <domain>)
91 xfrd->xfrd (xfr.err <code>) | (xfr.ok <domain>)
93 xfrd->xend (xfr.configure <domain> <vmconfig>)
94 */
96 Sxpr oxfr_configure; // (xfr.configure <vmid> <vmconfig>)
97 Sxpr oxfr_err; // (xfr.err <code>)
98 Sxpr oxfr_hello; // (xfr.hello <major> <minor>)
99 Sxpr oxfr_migrate; // (xfr.migrate <vmid> <vmconfig> <host> <port> <live> <resource>)
100 Sxpr oxfr_migrate_ok;// (xfr.migrate.ok <value>)
101 Sxpr oxfr_progress; // (xfr.progress <percent> <rate: kb/s>)
102 Sxpr oxfr_restore; // (xfr.restore <file>)
103 Sxpr oxfr_restore_ok;// (xfr.restore.ok <vmid>)
104 Sxpr oxfr_save; // (xfr.save <vmid> <vmconfig> <file>)
105 Sxpr oxfr_save_ok; // (xfr.save.ok)
106 Sxpr oxfr_vm_destroy;// (xfr.vm.destroy <vmid>)
107 Sxpr oxfr_vm_suspend;// (xfr.vm.suspend <vmid>)
108 Sxpr oxfr_xfr; // (xfr.xfr <vmid>)
109 Sxpr oxfr_xfr_ok; // (xfr.xfr.ok <vmid>)
111 void xfr_init(void){
112 oxfr_configure = intern("xfr.configure");
113 oxfr_err = intern("xfr.err");
114 oxfr_hello = intern("xfr.hello");
115 oxfr_migrate = intern("xfr.migrate");
116 oxfr_migrate_ok = intern("xfr.migrate.ok");
117 oxfr_progress = intern("xfr.progress");
118 oxfr_restore = intern("xfr.restore");
119 oxfr_restore_ok = intern("xfr.restore.ok");
120 oxfr_save = intern("xfr.save");
121 oxfr_save_ok = intern("xfr.save.ok");
122 oxfr_vm_destroy = intern("xfr.vm.destroy");
123 oxfr_vm_suspend = intern("xfr.vm.suspend");
124 oxfr_xfr = intern("xfr.xfr");
125 oxfr_xfr_ok = intern("xfr.xfr.ok");
126 }
128 #ifndef TRUE
129 #define TRUE 1
130 #endif
132 #ifndef FALSE
133 #define FALSE 0
134 #endif
136 #define PROGRAM "xfrd"
138 #define OPT_PORT 'P'
139 #define KEY_PORT "port"
140 #define DOC_PORT "<port>\n\txfr port (as a number or service name)"
142 #define OPT_COMPRESS 'Z'
143 #define KEY_COMPRESS "compress"
144 #define DOC_COMPRESS "\n\tuse compression for migration"
146 #define OPT_HELP 'h'
147 #define KEY_HELP "help"
148 #define DOC_HELP "\n\tprint help"
150 #define OPT_VERSION 'v'
151 #define KEY_VERSION "version"
152 #define DOC_VERSION "\n\tprint version"
154 #define OPT_VERBOSE 'V'
155 #define KEY_VERBOSE "verbose"
156 #define DOC_VERBOSE "\n\tverbose flag"
158 /** Print a usage message.
159 * Prints to stdout if err is zero, and exits with 0.
160 * Prints to stderr if err is non-zero, and exits with 1.
161 */
162 void usage(int err){
163 FILE *out = (err ? stderr : stdout);
165 fprintf(out, "Usage: %s [options]\n", PROGRAM);
166 fprintf(out, "-%c, --%s %s\n", OPT_PORT, KEY_PORT, DOC_PORT);
167 fprintf(out, "-%c, --%s %s\n", OPT_COMPRESS, KEY_COMPRESS, DOC_COMPRESS);
168 fprintf(out, "-%c, --%s %s\n", OPT_VERBOSE, KEY_VERBOSE, DOC_VERBOSE);
169 fprintf(out, "-%c, --%s %s\n", OPT_VERSION, KEY_VERSION, DOC_VERSION);
170 fprintf(out, "-%c, --%s %s\n", OPT_HELP, KEY_HELP, DOC_HELP);
171 exit(err ? 1 : 0);
172 }
174 typedef struct Args {
175 int bufsize;
176 unsigned long port;
177 int verbose;
178 int compress;
179 } Args;
181 /** Transfer states. */
182 enum {
183 XFR_INIT,
184 XFR_HELLO,
185 XFR_STATE,
186 XFR_RUN,
187 XFR_FAIL,
188 XFR_DONE,
189 XFR_MAX
190 };
192 #ifndef SXPR_PARSER_MAIN
193 /** Short options. Options followed by ':' take an argument. */
194 static char *short_opts = (char[]){
195 OPT_PORT, ':',
196 OPT_COMPRESS,
197 OPT_HELP,
198 OPT_VERSION,
199 OPT_VERBOSE,
200 0 };
202 /** Long options. */
203 static struct option const long_opts[] = {
204 { KEY_PORT, required_argument, NULL, OPT_PORT },
205 { KEY_COMPRESS, no_argument, NULL, OPT_COMPRESS },
206 { KEY_HELP, no_argument, NULL, OPT_HELP },
207 { KEY_VERSION, no_argument, NULL, OPT_VERSION },
208 { KEY_VERBOSE, no_argument, NULL, OPT_VERBOSE },
209 { NULL, 0, NULL, 0 }
210 };
212 /** Xfrd arguments. */
213 static Args _args = {};
215 /** Xfrd arguments. */
216 static Args *args = &_args;
217 #endif
219 /** Initialize an array element for a constant to its string name. */
220 #define VALDEF(val) { val, #val }
222 /** Names for the transfer states. */
223 static EnumDef xfr_states[] = {
224 VALDEF(XFR_INIT),
225 VALDEF(XFR_HELLO),
226 VALDEF(XFR_STATE),
227 VALDEF(XFR_RUN),
228 VALDEF(XFR_FAIL),
229 VALDEF(XFR_DONE),
230 { 0, NULL }
231 };
234 /** State machine for transfer. */
235 typedef struct XfrState {
236 /** Current state. */
237 int state;
238 /** Error codes for the states. */
239 int state_err[XFR_MAX];
240 /** First error. */
241 int err;
242 /** State when first error happened. */
243 int err_state;
245 uint32_t vmid;
246 char* vmconfig;
247 int vmconfig_n;
248 unsigned long xfr_port;
249 char *xfr_host;
250 uint32_t vmid_new;
251 int live;
252 int resource;
253 } XfrState;
255 /** Get the name of a transfer state.
256 *
257 * @param s state
258 * @return name
259 */
260 char * xfr_state_name(int s){
261 return enum_val_to_name(s, xfr_states);
262 }
264 /** Set the state of a transfer.
265 *
266 * @param s transfer
267 * @param state state
268 * @return state
269 */
270 int XfrState_set_state(XfrState *s, int state){
271 s->state = state;
272 return s->state;
273 }
275 /** Get the state of a transfer.
276 *
277 * @param s transfer
278 * @return state
279 */
280 int XfrState_get_state(XfrState *s){
281 return s->state;
282 }
284 /** Set an error in the current state.
285 * Does nothing if an error is already set.
286 *
287 * @param s transfer
288 * @param err error
289 * @return error
290 */
291 int XfrState_set_err(XfrState *s, int err){
292 if(!s->state_err[s->state]){
293 s->state_err[s->state] = err;
294 }
295 if(!s->err){
296 s->err = err;
297 s->err_state = s->state;
298 }
299 return err;
300 }
302 /** Get the error in the current state.
303 *
304 * @param s transfer
305 * @return error
306 */
307 int XfrState_get_err(XfrState *s){
308 return s->state_err[s->state];
309 }
311 /** Get the first error of a transfer.
312 *
313 * @param s transfer
314 * @return error
315 */
316 int XfrState_first_err(XfrState *s){
317 return s->err;
318 }
320 /** Get the state a transfer was in when it had its first error.
321 *
322 * @param s transfer
323 * @return error state
324 */
325 int XfrState_first_err_state(XfrState *s){
326 return s->err_state;
327 }
329 /** Set xfrd default arguments.
330 *
331 * @param args arguments to set
332 */
333 void set_defaults(Args *args){
334 args->compress = FALSE;
335 args->bufsize = 128 * 1024;
336 args->port = htons(XFRD_PORT);
337 }
339 int stringof(Sxpr exp, char **s){
340 int err = 0;
341 //dprintf(">\n"); objprint(iostdout, exp, PRINT_TYPE); IOStream_print(iostdout, "\n");
342 if(ATOMP(exp)){
343 *s = atom_name(exp);
344 } else if(STRINGP(exp)){
345 *s = string_string(exp);
346 } else {
347 err = -EINVAL;
348 *s = NULL;
349 }
350 //dprintf("< err=%d s=%s\n", err, *s);
351 return err;
352 }
354 int intof(Sxpr exp, int *v){
355 int err = 0;
356 char *s;
357 unsigned long l;
358 //dprintf(">\n"); objprint(iostdout, exp, 0); IOStream_print(iostdout, "\n");
359 if(INTP(exp)){
360 *v = OBJ_INT(exp);
361 } else {
362 err = stringof(exp, &s);
363 if(err) goto exit;
364 err = convert_atoul(s, &l);
365 *v = (int)l;
366 }
367 exit:
368 //dprintf("< err=%d v=%d\n", err, *v);
369 return err;
370 }
372 int addrof(Sxpr exp, uint32_t *v){
373 char *h;
374 unsigned long a;
375 int err = 0;
376 //dprintf(">\n"); objprint(iostdout, exp, 0); IOStream_print(iostdout, "\n");
377 err = stringof(exp, &h);
378 if(err) goto exit;
379 if(get_host_address(h, &a)){
380 err = -EINVAL;
381 goto exit;
382 }
383 *v = a;
384 exit:
385 //dprintf("< err=%d v=%x\n", err, *v);
386 return err;
387 }
389 int portof(Sxpr exp, uint16_t *v){
390 char *s;
391 int err = 0;
392 //dprintf(">\n"); objprint(iostdout, exp, 0); IOStream_print(iostdout, "\n");
393 if(INTP(exp)){
394 *v = get_ul(exp);
395 *v = htons(*v);
396 } else {
397 unsigned long p;
398 err = stringof(exp, &s);
399 if(err) goto exit;
400 err = convert_service_to_port(s, &p);
401 if(err){
402 err = -EINVAL;
403 goto exit;
404 }
405 *v = p;
406 }
407 exit:
408 //dprintf("< err=%d v=%u\n", err, *v);
409 return err;
410 }
412 static inline struct in_addr inaddr(uint32_t addr){
413 return (struct in_addr){ .s_addr = addr };
414 }
416 time_t stats(time_t t0, uint64_t offset, uint64_t memory, float *percent, float *rate){
417 time_t t1 = time(NULL);
418 *percent = (offset * 100.0f) / memory;
419 t1 = time(NULL) - t0;
420 *rate = (t1 ? offset/(t1 * 1024.0f) : 0.0f);
421 return t1;
422 }
424 /** Notify success or error.
425 *
426 * @param conn connection
427 * @param errcode error code
428 * @return 0 on success, error code otherwise
429 */
430 int xfr_error(Conn *conn, int errcode){
431 int err = 0;
433 if(!conn->out) return -ENOTCONN;
434 if(errcode <0) errcode = -errcode;
435 err = IOStream_print(conn->out, "(%s %d)",
436 atom_name(oxfr_err), errcode);
437 return (err < 0 ? err : 0);
438 }
440 /** Read a response message - error or ok.
441 *
442 * @param conn connection
443 * @return 0 on success, error code otherwise
444 */
445 int xfr_response(Conn *conn){
446 int err;
447 Sxpr sxpr;
449 dprintf(">\n");
450 if(!conn->out) return -ENOTCONN;
451 err = Conn_sxpr(conn, &sxpr);
452 if(err) goto exit;
453 if(sxpr_elementp(sxpr, oxfr_err)){
454 int errcode;
455 err = intof(sxpr_childN(sxpr, 0, ONONE), &errcode);
456 if(err) goto exit;
457 err = errcode;
458 }
459 exit:
460 dprintf("< err=%d\n", err);
461 return err;
462 }
464 /** Get the initial hello message and check the protocol version.
465 * It is an error to receive anything other than a hello message
466 * with the correct protocol version.
467 *
468 * @param conn connection
469 * @return 0 on success, error code otherwise
470 */
471 int xfr_hello(Conn *conn){
472 int err;
473 uint32_t major = XFR_PROTO_MAJOR, minor = XFR_PROTO_MINOR;
474 uint32_t hello_major, hello_minor;
475 Sxpr sxpr;
476 if(!conn->in) return -ENOTCONN;
477 dprintf(">\n");
478 err = Conn_sxpr(conn, &sxpr);
479 if(err) goto exit;
480 if(!sxpr_elementp(sxpr, oxfr_hello)){
481 wprintf("> sxpr_elementp test failed\n");
482 err = -EINVAL;
483 goto exit;
484 }
485 err = intof(sxpr_childN(sxpr, 0, ONONE), &hello_major);
486 if(err) goto exit;
487 err = intof(sxpr_childN(sxpr, 1, ONONE), &hello_minor);
488 if(err) goto exit;
489 if(hello_major != major || hello_minor != minor){
490 eprintf("> Wanted protocol version %d.%d, got %d.%d",
491 major, minor, hello_major, hello_minor);
492 err = -EINVAL;
493 goto exit;
494 }
495 exit:
496 xfr_error(conn, err);
497 if(err){
498 eprintf("> Hello failed: %d\n", err);
499 }
500 dprintf("< err=%d\n", err);
501 return err;
502 }
504 /** Send the initial hello message.
505 *
506 * @param conn connection
507 * @param msg message
508 * @return 0 on success, error code otherwise
509 */
510 int xfr_send_hello(Conn *conn){
511 int err = 0;
512 dprintf(">\n");
514 err = IOStream_print(conn->out, "(%s %d %d)",
515 atom_name(oxfr_hello),
516 XFR_PROTO_MAJOR,
517 XFR_PROTO_MINOR);
518 if(err < 0) goto exit;
519 IOStream_flush(conn->out);
520 err = xfr_response(conn);
521 exit:
522 dprintf("< err=%d\n", err);
523 return err;
524 }
526 int xfr_send_xfr(Conn *conn, uint32_t vmid){
527 int err;
529 err = IOStream_print(conn->out, "(%s %d)",
530 atom_name(oxfr_xfr), vmid);
531 return (err < 0 ? err : 0);
532 }
534 int xfr_send_xfr_ok(Conn *conn, uint32_t vmid){
535 int err = 0;
537 err = IOStream_print(conn->out, "(%s %d)",
538 atom_name(oxfr_xfr_ok), vmid);
539 return (err < 0 ? err : 0);
540 }
542 int xfr_send_migrate_ok(Conn *conn, uint32_t vmid){
543 int err = 0;
545 err = IOStream_print(conn->out, "(%s %d)",
546 atom_name(oxfr_migrate_ok), vmid);
547 return (err < 0 ? err : 0);
548 }
550 int xfr_send_restore_ok(Conn *conn, uint32_t vmid){
551 int err = 0;
553 err = IOStream_print(conn->out, "(%s %d)",
554 atom_name(oxfr_restore_ok), vmid);
555 return (err < 0 ? err : 0);
556 }
558 int xfr_send_save_ok(Conn *conn){
559 int err = 0;
561 err = IOStream_print(conn->out, "(%s)",
562 atom_name(oxfr_save_ok));
563 return (err < 0 ? err : 0);
564 }
566 int xfr_send_suspend(Conn *conn, uint32_t vmid){
567 int err = 0;
569 err = IOStream_print(conn->out, "(%s %d)",
570 atom_name(oxfr_vm_suspend), vmid);
571 return (err < 0 ? err : 0);
572 }
574 /** Suspend a vm on behalf of save/migrate.
575 */
576 int xfr_vm_suspend(Conn *xend, uint32_t vmid){
577 int err = 0;
578 dprintf("> vmid=%u\n", vmid);
579 err = xfr_send_suspend(xend, vmid);
580 if(err) goto exit;
581 IOStream_flush(xend->out);
582 err = xfr_response(xend);
583 exit:
584 dprintf("< err=%d\n", err);
585 return err;
586 }
588 int xfr_send_destroy(Conn *conn, uint32_t vmid){
589 int err = 0;
591 err = IOStream_print(conn->out, "(%s %d)",
592 atom_name(oxfr_vm_destroy), vmid);
593 return (err < 0 ? err : 0);
594 }
596 /** Destroy a vm on behalf of save/migrate.
597 */
598 int xfr_vm_destroy(Conn *xend, uint32_t vmid){
599 int err = 0;
600 dprintf("> vmid=%u\n", vmid);
601 err = xfr_send_destroy(xend, vmid);
602 if(err) goto exit;
603 IOStream_flush(xend->out);
604 err = xfr_response(xend);
605 exit:
606 dprintf("< err=%d\n", err);
607 return err;
608 }
610 /** Get vm state. Send transfer message.
611 *
612 * @param peer connection
613 * @param msg message
614 * @return 0 on success, error code otherwise
615 */
616 int xfr_send_state(XfrState *state, Conn *xend, Conn *peer){
617 int err = 0;
618 Sxpr sxpr;
620 dprintf(">\n");
621 XfrState_set_state(state, XFR_STATE);
622 // Send xfr message and the domain state.
623 err = xfr_send_xfr(peer, state->vmid);
624 if(err) goto exit;
625 dprintf(">*** Sending domain %u\n", state->vmid);
626 err = xen_domain_snd(xend, peer->out,
627 state->vmid,
628 state->vmconfig, state->vmconfig_n,
629 state->live, state->resource);
630 dprintf(">*** Sent domain %u\n", state->vmid);
631 if(err) goto exit;
632 // Sending the domain suspends it, and there's no way back.
633 // So destroy it now. If anything goes wrong now it's too late.
634 dprintf(">*** Destroying domain %u\n", state->vmid);
635 err = xfr_vm_destroy(xend, state->vmid);
636 if(err) goto exit;
637 err = xfr_error(peer, err);
638 if(err) goto exit;
639 IOStream_flush(peer->out);
640 // Read the response from the peer.
641 err = Conn_sxpr(peer, &sxpr);
642 if(err) goto exit;
643 if(sxpr_elementp(sxpr, oxfr_err)){
644 // Error.
645 int errcode;
646 err = intof(sxpr_childN(sxpr, 0, ONONE), &errcode);
647 if(!err) err = errcode;
648 } else if(sxpr_elementp(sxpr, oxfr_xfr_ok)){
649 // Ok - get the new domain id.
650 err = intof(sxpr_childN(sxpr, 0, ONONE), &state->vmid_new);
651 xfr_error(peer, err);
652 } else {
653 // Anything else is invalid. But it may be too late.
654 err = -EINVAL;
655 xfr_error(peer, err);
656 }
657 exit:
658 XfrState_set_err(state, err);
659 dprintf("< err=%d\n", err);
660 return err;
661 }
663 /** Finish the transfer.
664 */
665 int xfr_send_done(XfrState *state, Conn *xend){
666 int err = 0;
667 int first_err = 0;
669 first_err = XfrState_first_err(state);
670 if(first_err){
671 XfrState_set_state(state, XFR_FAIL);
672 } else {
673 XfrState_set_state(state, XFR_DONE);
674 }
675 if(first_err){
676 err = xfr_error(xend, first_err);
677 } else {
678 // Report new domain id to xend.
679 err = xfr_send_migrate_ok(xend, state->vmid_new);
680 }
682 XfrState_set_err(state, err);
683 if(XfrState_first_err(state)){
684 int s, serr;
686 wprintf("> Transfer errors:\n");
687 for(s = 0; s < XFR_MAX; s++){
688 serr = state->state_err[s];
689 if(!serr) continue;
690 wprintf("> state=%-12s err=%d\n", xfr_state_name(s), serr);
691 }
692 } else {
693 wprintf("> Transfer OK\n");
694 }
695 dprintf("< err=%d\n", err);
696 return err;
697 }
699 /** Migrate a vm to another node.
700 *
701 * @param xend connection
702 * @return 0 on success, error code otherwise
703 */
704 int xfr_send(Args *args, XfrState *state, Conn *xend, uint32_t addr, uint32_t port){
705 int err = 0;
706 Conn _peer = {}, *peer = &_peer;
707 int flags = 0;
708 struct in_addr xfr_addr;
709 uint16_t xfr_port;
710 time_t t0 = time(NULL), t1;
712 dprintf(">\n");
713 flags |= CONN_NOBUFFER;
714 if(args->compress){
715 flags |= CONN_WRITE_COMPRESS;
716 }
717 xfr_addr.s_addr = addr;
718 xfr_port = port;
719 if(!xfr_port) xfr_port = htons(XFRD_PORT);
720 dprintf("> Xfr vmid=%u\n", state->vmid);
721 dprintf("> Xfr xfr_addr=%s:%d\n", inet_ntoa(xfr_addr), ntohs(xfr_port));
722 err = Conn_connect(peer, flags, xfr_addr, xfr_port);
723 if(err) goto exit;
724 XfrState_set_state(state, XFR_HELLO);
725 // Send hello message.
726 err = xfr_send_hello(peer);
727 if(err) goto exit;
728 printf("\n");
729 // Send vm state.
730 err = xfr_send_state(state, xend, peer);
731 if(err) goto exit;
732 if(args->compress){
733 IOStream *zio = peer->out;
734 int plain_bytes = lzi_stream_plain_bytes(zio);
735 int comp_bytes = lzi_stream_comp_bytes(zio);
736 float ratio = lzi_stream_ratio(zio);
737 iprintf("> Compression: plain %d bytes, compressed %d bytes, ratio %3.2f\n",
738 plain_bytes, comp_bytes, ratio);
739 }
740 exit:
741 dprintf("> err=%d\n", err);
742 if(err && !XfrState_get_err(state)){
743 XfrState_set_err(state, err);
744 }
745 Conn_close(peer);
746 if(!err){
747 t1 = time(NULL) - t0;
748 iprintf("> Transfer complete in %lu seconds\n", t1);
749 }
750 dprintf("> done err=%d, notifying xend...\n", err);
751 xfr_send_done(state, xend);
752 dprintf("< err=%d\n", err);
753 return err;
754 }
756 /** Save a vm to file.
757 */
758 int xfr_save(Args *args, XfrState *state, Conn *xend, char *file){
759 int err = 0;
760 int compress = 0;
761 IOStream *io = NULL;
763 dprintf("> file=%s\n", file);
764 if(compress){
765 io = gzip_stream_fopen(file, "wb1");
766 } else {
767 io = file_stream_fopen(file, "wb");
768 }
769 if(!io){
770 eprintf("> Failed to open %s\n", file);
771 err = -EINVAL;
772 goto exit;
773 }
774 err = xen_domain_snd(xend, io,
775 state->vmid,
776 state->vmconfig, state->vmconfig_n,
777 0, 0);
778 if(err){
779 err = xfr_error(xend, err);
780 } else {
781 err = xfr_send_save_ok(xend);
782 }
783 exit:
784 if(io){
785 IOStream_close(io);
786 IOStream_free(io);
787 }
788 if(err){
789 unlink(file);
790 }
791 dprintf("< err=%d\n", err);
792 return err;
793 }
795 /** Restore a vm from file.
796 *
797 * @return 0 on success, error code otherwise
798 */
799 int xfr_restore(Args *args, XfrState *state, Conn *xend, char *file){
800 int err = 0;
801 IOStream *io = NULL;
802 int configured=0;
804 dprintf("> file=%s\n", file);
805 io = gzip_stream_fopen(file, "rb");
806 if(!io){
807 eprintf("> Failed to open %s\n", file);
808 err = -EINVAL;
809 goto exit;
810 }
811 err = xen_domain_rcv(io,
812 &state->vmid_new,
813 &state->vmconfig, &state->vmconfig_n,
814 &configured);
815 if(err) goto exit;
816 if(!configured){
817 err = xen_domain_configure(state->vmid_new, state->vmconfig, state->vmconfig_n);
818 if(err) goto exit;
819 }
820 err = xen_domain_unpause(state->vmid_new);
821 exit:
822 if(io){
823 IOStream_close(io);
824 IOStream_free(io);
825 }
826 if(err){
827 xfr_error(xend, err);
828 } else {
829 xfr_send_restore_ok(xend, state->vmid_new);
830 }
831 dprintf("< err=%d\n", err);
832 return err;
833 }
835 /** Accept the transfer of a vm from another node.
836 *
837 * @param peer connection
838 * @param msg message
839 * @return 0 on success, error code otherwise
840 */
841 int xfr_recv(Args *args, XfrState *state, Conn *peer){
842 int err = 0;
843 time_t t0 = time(NULL), t1;
844 Sxpr sxpr;
845 int configured=0;
847 dprintf("> peer=%s\n", inet_ntoa(peer->addr.sin_addr));
848 // If receiving from localhost set configured so that that xen_domain_rcv()
849 // does not attempt to configure the new domain. This is because the old
850 // domain still exists and will make it fail.
851 if(peer->addr.sin_addr.s_addr == htonl(INADDR_LOOPBACK)){
852 dprintf("> Peer is localhost\n");
853 configured = 1;
854 }
855 err = xen_domain_rcv(peer->in,
856 &state->vmid_new,
857 &state->vmconfig, &state->vmconfig_n,
858 &configured);
859 if(err) goto exit;
860 // Read from the peer. This is just so we wait before configuring.
861 // When migrating to the same host the peer must destroy the domain
862 // before we configure the new one.
863 err = Conn_sxpr(peer, &sxpr);
864 if(err) goto exit;
865 if(!configured){
866 dprintf("> Configuring...\n");
867 err = xen_domain_configure(state->vmid_new, state->vmconfig, state->vmconfig_n);
868 if(err) goto exit;
869 err = xen_domain_unpause(state->vmid_new);
870 if(err) goto exit;
871 }
872 // Report new domain id to peer.
873 err = xfr_send_xfr_ok(peer, state->vmid_new);
874 if(err) goto exit;
875 // Get the final ok.
876 err = xfr_response(peer);
877 exit:
878 if(!err){
879 t1 = time(NULL) - t0;
880 iprintf("> Transfer complete in %lu seconds\n", t1);
881 }
882 if(err){
883 xfr_error(peer, err);
884 }
885 dprintf("< err=%d\n", err);
886 return err;
887 }
889 /** Listen for a hello followed by a service request.
890 * The request can be from the local xend or from xfrd on another node.
891 *
892 * @param peersock socket
893 * @param peer_in peer address
894 * @return 0 on success, error code otherwise
895 */
896 int xfrd_service(Args *args, int peersock, struct sockaddr_in peer_in){
897 int err = 0;
898 Sxpr sxpr;
899 Conn _conn = {}, *conn = &_conn;
900 int flags = CONN_NOBUFFER;
902 dprintf(">\n");
903 err = Conn_init(conn, flags, peersock, peer_in);
904 if(err) goto exit;
905 //dprintf(">xfr_hello... \n");
906 err = xfr_hello(conn);
907 if(err) goto exit;
908 //dprintf("> sxpr...\n");
909 err = Conn_sxpr(conn, &sxpr);
910 if(err) goto exit;
911 //dprintf("> sxpr=\n");
912 //objprint(iostdout, sxpr, PRINT_TYPE); IOStream_print(iostdout, "\n");
913 if(sxpr_elementp(sxpr, oxfr_migrate)){
914 // Migrate message from xend.
915 uint32_t addr;
916 uint16_t port;
917 XfrState _state = {}, *state = &_state;
918 int n = 0;
920 dprintf("> xfr.migrate\n");
921 err = intof(sxpr_childN(sxpr, n++, ONONE), &state->vmid);
922 if(err) goto exit;
923 err = stringof(sxpr_childN(sxpr, n++, ONONE), &state->vmconfig);
924 if(err) goto exit;
925 state->vmconfig_n = strlen(state->vmconfig);
926 err = addrof(sxpr_childN(sxpr, n++, ONONE), &addr);
927 if(err) goto exit;
928 err = portof(sxpr_childN(sxpr, n++, ONONE), &port);
929 if(err) goto exit;
930 err = intof(sxpr_childN(sxpr, n++, ONONE), &state->live);
931 if(err) goto exit;
932 err = intof(sxpr_childN(sxpr, n++, ONONE), &state->resource);
933 if(err) goto exit;
934 err = xfr_send(args, state, conn, addr, port);
936 } else if(sxpr_elementp(sxpr, oxfr_save)){
937 // Save message from xend.
938 char *file;
939 XfrState _state = {}, *state = &_state;
940 int n = 0;
942 dprintf("> xfr.save\n");
943 err = intof(sxpr_childN(sxpr, n++, ONONE), &state->vmid);
944 if(err) goto exit;
945 err = stringof(sxpr_childN(sxpr, n++, ONONE), &state->vmconfig);
946 if(err) goto exit;
947 state->vmconfig_n = strlen(state->vmconfig);
948 err = stringof(sxpr_childN(sxpr, n++, ONONE), &file);
949 if(err) goto exit;
950 err = xfr_save(args, state, conn, file);
952 } else if(sxpr_elementp(sxpr, oxfr_restore)){
953 // Restore message from xend.
954 char *file;
955 XfrState _state = {}, *state = &_state;
956 int n = 0;
958 dprintf("> xfr.restore\n");
959 err = stringof(sxpr_childN(sxpr, n++, ONONE), &file);
960 if(err) goto exit;
961 err = xfr_restore(args, state, conn, file);
963 } else if(sxpr_elementp(sxpr, oxfr_xfr)){
964 // Xfr message from peer xfrd.
965 XfrState _state = {}, *state = &_state;
966 int n = 0;
968 dprintf("> xfr.xfr\n");
969 err = intof(sxpr_childN(sxpr, n++, ONONE), &state->vmid);
970 if(err) goto exit;
971 err = xfr_recv(args, state, conn);
973 } else{
974 // Anything else is invalid.
975 err = -EINVAL;
976 eprintf("> Invalid message: ");
977 objprint(iostderr, sxpr, 0);
978 IOStream_print(iostderr, "\n");
979 xfr_error(conn, err);
980 }
981 exit:
982 Conn_close(conn);
983 dprintf("< err=%d\n", err);
984 return err;
985 }
987 /** Accept an incoming connection.
988 *
989 * @param sock tcp socket
990 * @return 0 on success, error code otherwise
991 */
992 int xfrd_accept(Args *args, int sock){
993 struct sockaddr_in peer_in;
994 struct sockaddr *peer = (struct sockaddr *)&peer_in;
995 socklen_t peer_n = sizeof(peer_in);
996 int peersock;
997 pid_t pid;
998 int err = 0;
1000 dprintf("> sock=%d\n", sock);
1001 dprintf("> accept...\n");
1002 peersock = accept(sock, peer, &peer_n);
1003 dprintf("> accept=%d\n", peersock);
1004 if(peersock < 0){
1005 perror("accept");
1006 err = -errno;
1007 goto exit;
1009 iprintf("> Accepted connection from %s:%d on %d\n",
1010 inet_ntoa(peer_in.sin_addr), htons(peer_in.sin_port), sock);
1011 pid = fork();
1012 if(pid > 0){
1013 // Parent, fork succeeded.
1014 iprintf("> Forked child pid=%d\n", pid);
1015 close(peersock);
1016 } else if (pid < 0){
1017 // Parent, fork failed.
1018 perror("fork");
1019 close(peersock);
1020 } else {
1021 // Child.
1022 iprintf("> Xfr service for %s:%d\n",
1023 inet_ntoa(peer_in.sin_addr), htons(peer_in.sin_port));
1024 err = xfrd_service(args, peersock, peer_in);
1025 iprintf("> Xfr service err=%d\n", err);
1026 shutdown(peersock, 2);
1027 exit(err ? 1 : 0);
1029 exit:
1030 dprintf("< err=%d\n", err);
1031 return err;
1034 /** Socket select loop.
1035 * Accepts connections on the tcp socket.
1037 * @param listen_sock tcp listen socket
1038 * @return 0 on success, error code otherwise
1039 */
1040 int xfrd_select(Args *args, int listen_sock){
1041 int err = 0;
1042 SelectSet set = {};
1043 dprintf("> socks: %d\n", listen_sock);
1044 while(1){
1045 SelectSet_zero(&set);
1046 SelectSet_add_read(&set, listen_sock);
1047 err = SelectSet_select(&set, NULL);
1048 if(err < 0){
1049 if(errno == EINTR) continue;
1050 perror("select");
1051 goto exit;
1053 if(FD_ISSET(listen_sock, &set.rd)){
1054 xfrd_accept(args, listen_sock);
1057 exit:
1058 dprintf("< err=%d\n", err);
1059 return err;
1062 /** Create a socket.
1064 * @param args program arguments
1065 * @param socktype socket type
1066 * @param reuse whether to set SO_REUSEADDR
1067 * @param val return value for the socket
1068 * @return 0 on success, error code otherwise
1069 */
1070 int create_socket(Args *args, int socktype, int reuse, int *val){
1071 int err = 0;
1072 int sock = 0;
1073 struct sockaddr_in addr_in;
1074 struct sockaddr *addr = (struct sockaddr *)&addr_in;
1075 socklen_t addr_n = sizeof(addr_in);
1077 dprintf(">\n");
1078 // Create socket and bind it.
1079 sock = socket(AF_INET, socktype, 0);
1080 if(sock < 0){
1081 err = -errno;
1082 goto exit;
1084 addr_in.sin_family = AF_INET;
1085 addr_in.sin_addr.s_addr = INADDR_ANY;
1086 addr_in.sin_port = args->port;
1087 dprintf("> port=%d\n", ntohs(addr_in.sin_port));
1088 if(reuse){
1089 // Set socket option to reuse address.
1090 int val = 1;
1091 err = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
1092 if(err < 0){
1093 err = -errno;
1094 perror("setsockopt");
1095 goto exit;
1098 err = bind(sock, addr, addr_n);
1099 if(err < 0){
1100 err = -errno;
1101 perror("bind");
1102 goto exit;
1104 exit:
1105 *val = (err ? -1 : sock);
1106 dprintf("< err=%d\n", err);
1107 return err;
1110 /** Create the tcp listen socket.
1112 * @param args program arguments
1113 * @param val return value for the socket
1114 * @return 0 on success, error code otherwise
1115 */
1116 int xfrd_listen_socket(Args *args, int *val){
1117 int err = 0;
1118 int sock;
1119 dprintf(">\n");
1120 err = create_socket(args, SOCK_STREAM, 1, &sock);
1121 if(err) goto exit;
1122 dprintf("> listen...\n");
1123 err = listen(sock, 5);
1124 if(err < 0){
1125 err = -errno;
1126 perror("listen");
1127 goto exit;
1129 exit:
1130 *val = (err ? -1 : sock);
1131 if(err) close(sock);
1132 dprintf("< err=%d\n", err);
1133 return err;
1136 /** Type for signal handling functions. */
1137 typedef void SignalAction(int code, siginfo_t *info, void *data);
1139 /** Handle SIGCHLD by getting child exit status.
1140 * This prevents child processes being defunct.
1142 * @param code signal code
1143 * @param info signal info
1144 * @param data
1145 */
1146 void sigaction_SIGCHLD(int code, siginfo_t *info, void *data){
1147 int status;
1148 pid_t pid;
1149 //dprintf("> child_exit=%d waiting...\n", child_exit);
1150 pid = wait(&status);
1151 dprintf("> child pid=%d status=%d\n", pid, status);
1154 /** Handle SIGPIPE.
1156 * @param code signal code
1157 * @param info signal info
1158 * @param data
1159 */
1160 void sigaction_SIGPIPE(int code, siginfo_t *info, void *data){
1161 dprintf("> SIGPIPE\n");
1162 //fflush(stdout);
1163 //fflush(stderr);
1164 //exit(1);
1167 /** Handle SIGALRM.
1169 * @param code signal code
1170 * @param info signal info
1171 * @param data
1172 */
1173 void sigaction_SIGALRM(int code, siginfo_t *info, void *data){
1174 dprintf("> SIGALRM\n");
1177 /** Install a handler for a signal.
1179 * @param signum signal
1180 * @param action handler
1181 * @return 0 on success, error code otherwise
1182 */
1183 int catch_signal(int signum, SignalAction *action){
1184 int err = 0;
1185 struct sigaction sig = {};
1186 sig.sa_sigaction = action;
1187 sig.sa_flags = SA_SIGINFO;
1188 err = sigaction(signum, &sig, NULL);
1189 if(err){
1190 perror("sigaction");
1192 return err;
1195 /** Transfer daemon main program.
1197 * @param args program arguments
1198 * @return 0 on success, error code otherwise
1199 */
1200 int xfrd_main(Args *args){
1201 int err = 0;
1202 int listen_sock;
1204 dprintf(">\n");
1205 catch_signal(SIGCHLD,sigaction_SIGCHLD);
1206 catch_signal(SIGPIPE,sigaction_SIGPIPE);
1207 catch_signal(SIGALRM,sigaction_SIGALRM);
1208 err = xfrd_listen_socket(args, &listen_sock);
1209 if(err) goto exit;
1210 err = xfrd_select(args, listen_sock);
1211 exit:
1212 close(listen_sock);
1213 dprintf("< err=%d\n", err);
1214 return err;
1217 #ifndef SXPR_PARSER_MAIN
1218 /** Parse command-line arguments and call the xfrd main program.
1220 * @param arg argument count
1221 * @param argv arguments
1222 * @return 0 on success, 1 otherwise
1223 */
1224 int main(int argc, char *argv[]){
1225 int err = 0;
1226 int key = 0;
1227 int long_index = 0;
1228 static const char * LOGFILE = "/var/log/xfrd.log";
1230 #ifndef DEBUG
1231 freopen(LOGFILE, "w+", stdout);
1232 fclose(stderr);
1233 stderr = stdout;
1234 #endif
1235 dprintf(">\n");
1236 set_defaults(args);
1237 while(1){
1238 key = getopt_long(argc, argv, short_opts, long_opts, &long_index);
1239 if(key == -1) break;
1240 switch(key){
1241 case OPT_PORT:
1242 err = !convert_service_to_port(optarg, &args->port);
1243 if(err) goto exit;
1244 break;
1245 case OPT_COMPRESS:
1246 args->compress = TRUE;
1247 break;
1248 case OPT_HELP:
1249 usage(0);
1250 break;
1251 case OPT_VERBOSE:
1252 args->verbose = TRUE;
1253 break;
1254 case OPT_VERSION:
1255 printf("> Version %d.%d\n", XFR_PROTO_MAJOR, XFR_PROTO_MINOR);
1256 exit(0);
1257 break;
1258 default:
1259 usage(EINVAL);
1260 break;
1263 xfr_init();
1264 err = xfrd_main(args);
1265 exit:
1266 if(err && key > 0){
1267 fprintf(stderr, "Error in arg %c\n", key);
1269 return (err ? 1 : 0);
1271 #endif