ia64/xen-unstable

view tools/xfrd/xfrd.c @ 2422:2274a0386cc9

bitkeeper revision 1.1159.69.5 (4138e882jA1YaR_OfTfNHe_uT4PDIg)

trivial
author iap10@labyrinth.cl.cam.ac.uk
date Fri Sep 03 21:56:18 2004 +0000 (2004-09-03)
parents ccdb04941d97
children aed97013f9fe
line source
1 /** @file
2 * XFRD - Domain Transfer Daemon for Xen.
3 *
4 * The xfrd is forked by xend to transfer a vm to a remote system.
5 *
6 * The vm is suspended, then its state and memory are transferred to the remote system.
7 * The remote system attempts to create a vm and copy the transferred state and memory into it,
8 * finally resuming the vm. If all is OK the vm ends up running on the remote
9 * system and is removed from the originating system. If the transfer does not complete
10 * successfully the originating system attempts to resume the vm.
11 * The children exit when the transfer completes.
12 *
13 * @author Mike Wray <mike.wray@hpl.hp.com>
14 */
16 #include <stdlib.h>
17 #include <unistd.h>
18 #include <stdio.h>
19 #include <getopt.h>
20 #include <errno.h>
21 #include <sys/types.h>
22 #include <sys/stat.h>
23 #include <fcntl.h>
24 #include <time.h>
25 #include <sys/socket.h>
26 #include <netinet/in.h>
27 #include <arpa/inet.h>
28 #include <string.h>
30 #include <signal.h>
31 #include <sys/wait.h>
32 #include <sys/select.h>
34 #include "allocate.h"
35 #include "file_stream.h"
36 #include "string_stream.h"
37 #include "lzi_stream.h"
38 #include "gzip_stream.h"
39 #include "sys_net.h"
40 #include "sys_string.h"
42 //#include "xdr.h"
43 #include "enum.h"
44 #include "xfrd.h"
46 #include "xen_domain.h"
48 #include "connection.h"
49 #include "select.h"
51 #define MODULE_NAME "XFRD"
52 #define DEBUG 0
53 #undef DEBUG
54 #include "debug.h"
56 /*
57 sender:
58 xend connects to xfrd and writes migrate message
59 xend writes domain config to xfrd
61 xfrd forks
63 xfrd connects to peer
64 xfrd sends hello, reads response
65 xfrd sends domain
66 xfrd reads response
67 reports progress/status to xend
69 xend reads xfrd for progress/status, disconnects
70 If ok, destroys domain.
71 If not ok, unpauses domain.
73 receiver:
74 xfrd accepts connection on inbound port
75 xfrd forks and accepts connection
76 xfrd receives hello, writes response
77 xfrd receives domain
78 xfrd connects to xend, configures new domain
79 xfrd writes status back to peer, child exits
82 (xfr.hello <major> <minor>)
83 (xfr.err <code> <reason>)
85 xend->xfrd (xfr.migrate <domain> <vmconfig> <host> <port> <live>)
86 (xfr.save <domain> <vmconfig> <file>)
87 xfrd->xend (xfr.suspend <domain>)
88 xfrd->xend (xfr.progress <percent> <rate: kb/s>)
89 xfrd->xend (xfr.err <code> <reason>) | (xfr.ok <domain>)
90 xfrd->xfrd (xfr.xfr <domain>)
91 xfrd->xfrd (xfr.err <code>) | (xfr.ok <domain>)
93 xfrd->xend (xfr.configure <domain> <vmconfig>)
94 */
96 Sxpr oxfr_configure; // (xfr.configure <vmid> <vmconfig>)
97 Sxpr oxfr_err; // (xfr.err <code>)
98 Sxpr oxfr_hello; // (xfr.hello <major> <minor>)
99 Sxpr oxfr_migrate; // (xfr.migrate <vmid> <vmconfig> <host> <port> <live>)
100 Sxpr oxfr_migrate_ok;// (xfr.migrate.ok <value>)
101 Sxpr oxfr_progress; // (xfr.progress <percent> <rate: kb/s>)
102 Sxpr oxfr_save; // (xfr.save <vmid> <vmconfig> <file>)
103 Sxpr oxfr_save_ok; // (xfr.save.ok)
104 Sxpr oxfr_vm_destroy;// (xfr.vm.destroy <vmid>)
105 Sxpr oxfr_vm_suspend;// (xfr.vm.suspend <vmid>)
106 Sxpr oxfr_xfr; // (xfr.xfr <vmid>)
107 Sxpr oxfr_xfr_ok; // (xfr.xfr.ok <vmid>)
109 void xfr_init(void){
110 oxfr_configure = intern("xfr.configure");
111 oxfr_err = intern("xfr.err");
112 oxfr_hello = intern("xfr.hello");
113 oxfr_migrate = intern("xfr.migrate");
114 oxfr_migrate_ok = intern("xfr.migrate.ok");
115 oxfr_progress = intern("xfr.progress");
116 oxfr_save = intern("xfr.save");
117 oxfr_save_ok = intern("xfr.save.ok");
118 oxfr_vm_destroy = intern("xfr.vm.destroy");
119 oxfr_vm_suspend = intern("xfr.vm.suspend");
120 oxfr_xfr = intern("xfr.xfr");
121 oxfr_xfr_ok = intern("xfr.xfr.ok");
122 }
124 #ifndef TRUE
125 #define TRUE 1
126 #endif
128 #ifndef FALSE
129 #define FALSE 0
130 #endif
132 #define PROGRAM "xfrd"
134 #define OPT_PORT 'P'
135 #define KEY_PORT "port"
136 #define DOC_PORT "<port>\n\txfr port (as a number or service name)"
138 #define OPT_COMPRESS 'Z'
139 #define KEY_COMPRESS "compress"
140 #define DOC_COMPRESS "\n\tuse compression for migration"
142 #define OPT_HELP 'h'
143 #define KEY_HELP "help"
144 #define DOC_HELP "\n\tprint help"
146 #define OPT_VERSION 'v'
147 #define KEY_VERSION "version"
148 #define DOC_VERSION "\n\tprint version"
150 #define OPT_VERBOSE 'V'
151 #define KEY_VERBOSE "verbose"
152 #define DOC_VERBOSE "\n\tverbose flag"
154 /** Print a usage message.
155 * Prints to stdout if err is zero, and exits with 0.
156 * Prints to stderr if err is non-zero, and exits with 1.
157 */
158 void usage(int err){
159 FILE *out = (err ? stderr : stdout);
161 fprintf(out, "Usage: %s [options]\n", PROGRAM);
162 fprintf(out, "-%c, --%s %s\n", OPT_PORT, KEY_PORT, DOC_PORT);
163 fprintf(out, "-%c, --%s %s\n", OPT_COMPRESS, KEY_COMPRESS, DOC_COMPRESS);
164 fprintf(out, "-%c, --%s %s\n", OPT_VERBOSE, KEY_VERBOSE, DOC_VERBOSE);
165 fprintf(out, "-%c, --%s %s\n", OPT_VERSION, KEY_VERSION, DOC_VERSION);
166 fprintf(out, "-%c, --%s %s\n", OPT_HELP, KEY_HELP, DOC_HELP);
167 exit(err ? 1 : 0);
168 }
170 /** Short options. Options followed by ':' take an argument. */
171 static char *short_opts = (char[]){
172 OPT_PORT, ':',
173 OPT_COMPRESS,
174 OPT_HELP,
175 OPT_VERSION,
176 OPT_VERBOSE,
177 0 };
179 /** Long options. */
180 static struct option const long_opts[] = {
181 { KEY_PORT, required_argument, NULL, OPT_PORT },
182 { KEY_COMPRESS, no_argument, NULL, OPT_COMPRESS },
183 { KEY_HELP, no_argument, NULL, OPT_HELP },
184 { KEY_VERSION, no_argument, NULL, OPT_VERSION },
185 { KEY_VERBOSE, no_argument, NULL, OPT_VERBOSE },
186 { NULL, 0, NULL, 0 }
187 };
189 typedef struct Args {
190 int bufsize;
191 unsigned long port;
192 int verbose;
193 int compress;
194 } Args;
196 /** Transfer states. */
197 enum {
198 XFR_INIT,
199 XFR_HELLO,
200 XFR_STATE,
201 XFR_RUN,
202 XFR_FAIL,
203 XFR_DONE,
204 XFR_MAX
205 };
207 /** Initialize an array element for a constant to its string name. */
208 #define VALDEF(val) { val, #val }
210 /** Names for the transfer states. */
211 static EnumDef xfr_states[] = {
212 VALDEF(XFR_INIT),
213 VALDEF(XFR_HELLO),
214 VALDEF(XFR_STATE),
215 VALDEF(XFR_RUN),
216 VALDEF(XFR_FAIL),
217 VALDEF(XFR_DONE),
218 { 0, NULL }
219 };
222 /** State machine for transfer. */
223 typedef struct XfrState {
224 /** Current state. */
225 int state;
226 /** Error codes for the states. */
227 int state_err[XFR_MAX];
228 /** First error. */
229 int err;
230 /** State when first error happened. */
231 int err_state;
233 uint32_t vmid;
234 char* vmconfig;
235 int vmconfig_n;
236 unsigned long xfr_port;
237 char *xfr_host;
238 uint32_t vmid_new;
239 int live;
240 } XfrState;
242 /** Get the name of a transfer state.
243 *
244 * @param s state
245 * @return name
246 */
247 char * xfr_state_name(int s){
248 return enum_val_to_name(s, xfr_states);
249 }
251 /** Set the state of a transfer.
252 *
253 * @param s transfer
254 * @param state state
255 * @return state
256 */
257 int XfrState_set_state(XfrState *s, int state){
258 s->state = state;
259 return s->state;
260 }
262 /** Get the state of a transfer.
263 *
264 * @param s transfer
265 * @return state
266 */
267 int XfrState_get_state(XfrState *s){
268 return s->state;
269 }
271 /** Set an error in the current state.
272 * Does nothing if an error is already set.
273 *
274 * @param s transfer
275 * @param err error
276 * @return error
277 */
278 int XfrState_set_err(XfrState *s, int err){
279 if(!s->state_err[s->state]){
280 s->state_err[s->state] = err;
281 }
282 if(!s->err){
283 s->err = err;
284 s->err_state = s->state;
285 }
286 return err;
287 }
289 /** Get the error in the current state.
290 *
291 * @param s transfer
292 * @return error
293 */
294 int XfrState_get_err(XfrState *s){
295 return s->state_err[s->state];
296 }
298 /** Get the first error of a transfer.
299 *
300 * @param s transfer
301 * @return error
302 */
303 int XfrState_first_err(XfrState *s){
304 return s->err;
305 }
307 /** Get the state a transfer was in when it had its first error.
308 *
309 * @param s transfer
310 * @return error state
311 */
312 int XfrState_first_err_state(XfrState *s){
313 return s->err_state;
314 }
316 /** Xfrd arguments. */
317 static Args _args = {};
319 /** Xfrd arguments. */
320 static Args *args = &_args;
322 /** Set xfrd default arguments.
323 *
324 * @param args arguments to set
325 */
326 void set_defaults(Args *args){
327 args->compress = FALSE;
328 args->bufsize = 128 * 1024;
329 args->port = htons(XFRD_PORT);
330 }
332 int stringof(Sxpr exp, char **s){
333 int err = 0;
334 //dprintf(">\n"); objprint(iostdout, exp, PRINT_TYPE); IOStream_print(iostdout, "\n");
335 if(ATOMP(exp)){
336 *s = atom_name(exp);
337 } else if(STRINGP(exp)){
338 *s = string_string(exp);
339 } else {
340 err = -EINVAL;
341 *s = NULL;
342 }
343 //dprintf("< err=%d s=%s\n", err, *s);
344 return err;
345 }
347 int intof(Sxpr exp, int *v){
348 int err = 0;
349 char *s;
350 unsigned long l;
351 //dprintf(">\n"); objprint(iostdout, exp, 0); IOStream_print(iostdout, "\n");
352 if(INTP(exp)){
353 *v = OBJ_INT(exp);
354 } else {
355 err = stringof(exp, &s);
356 if(err) goto exit;
357 err = convert_atoul(s, &l);
358 *v = (int)l;
359 }
360 exit:
361 //dprintf("< err=%d v=%d\n", err, *v);
362 return err;
363 }
365 int addrof(Sxpr exp, uint32_t *v){
366 char *h;
367 unsigned long a;
368 int err = 0;
369 //dprintf(">\n"); objprint(iostdout, exp, 0); IOStream_print(iostdout, "\n");
370 err = stringof(exp, &h);
371 if(err) goto exit;
372 if(get_host_address(h, &a)){
373 err = -EINVAL;
374 goto exit;
375 }
376 *v = a;
377 exit:
378 //dprintf("< err=%d v=%x\n", err, *v);
379 return err;
380 }
382 int portof(Sxpr exp, uint16_t *v){
383 char *s;
384 int err = 0;
385 //dprintf(">\n"); objprint(iostdout, exp, 0); IOStream_print(iostdout, "\n");
386 if(INTP(exp)){
387 *v = get_ul(exp);
388 *v = htons(*v);
389 } else {
390 unsigned long p;
391 err = stringof(exp, &s);
392 if(err) goto exit;
393 err = convert_service_to_port(s, &p);
394 if(err){
395 err = -EINVAL;
396 goto exit;
397 }
398 *v = p;
399 }
400 exit:
401 //dprintf("< err=%d v=%u\n", err, *v);
402 return err;
403 }
405 static inline struct in_addr inaddr(uint32_t addr){
406 return (struct in_addr){ .s_addr = addr };
407 }
409 time_t stats(time_t t0, uint64_t offset, uint64_t memory, float *percent, float *rate){
410 time_t t1 = time(NULL);
411 *percent = (offset * 100.0f) / memory;
412 t1 = time(NULL) - t0;
413 *rate = (t1 ? offset/(t1 * 1024.0f) : 0.0f);
414 return t1;
415 }
417 /** Notify success or error.
418 *
419 * @param conn connection
420 * @param errcode error code
421 * @return 0 on success, error code otherwise
422 */
423 int xfr_error(Conn *conn, int errcode){
424 int err = 0;
426 if(!conn->out) return -ENOTCONN;
427 if(errcode <0) errcode = -errcode;
428 err = IOStream_print(conn->out, "(%s %d)",
429 atom_name(oxfr_err), errcode);
430 return (err < 0 ? err : 0);
431 }
433 /** Read a response message - error or ok.
434 *
435 * @param conn connection
436 * @return 0 on success, error code otherwise
437 */
438 int xfr_response(Conn *conn){
439 int err;
440 Sxpr sxpr;
442 dprintf(">\n");
443 if(!conn->out) return -ENOTCONN;
444 err = Conn_sxpr(conn, &sxpr);
445 if(err) goto exit;
446 if(sxpr_elementp(sxpr, oxfr_err)){
447 int errcode;
448 err = intof(sxpr_childN(sxpr, 0, ONONE), &errcode);
449 if(err) goto exit;
450 err = errcode;
451 }
452 exit:
453 dprintf("< err=%d\n", err);
454 return err;
455 }
457 /** Get the initial hello message and check the protocol version.
458 * It is an error to receive anything other than a hello message
459 * with the correct protocol version.
460 *
461 * @param conn connection
462 * @return 0 on success, error code otherwise
463 */
464 int xfr_hello(Conn *conn){
465 int err;
466 uint32_t major = XFR_PROTO_MAJOR, minor = XFR_PROTO_MINOR;
467 uint32_t hello_major, hello_minor;
468 Sxpr sxpr;
469 if(!conn->in) return -ENOTCONN;
470 dprintf(">\n");
471 err = Conn_sxpr(conn, &sxpr);
472 if(err) goto exit;
473 if(!sxpr_elementp(sxpr, oxfr_hello)){
474 wprintf("> sxpr_elementp test failed\n");
475 err = -EINVAL;
476 goto exit;
477 }
478 err = intof(sxpr_childN(sxpr, 0, ONONE), &hello_major);
479 if(err) goto exit;
480 err = intof(sxpr_childN(sxpr, 1, ONONE), &hello_minor);
481 if(err) goto exit;
482 if(hello_major != major || hello_minor != minor){
483 eprintf("> Wanted protocol version %d.%d, got %d.%d",
484 major, minor, hello_major, hello_minor);
485 err = -EINVAL;
486 goto exit;
487 }
488 exit:
489 xfr_error(conn, err);
490 if(err){
491 eprintf("> Hello failed: %d\n", err);
492 }
493 dprintf("< err=%d\n", err);
494 return err;
495 }
497 /** Send the initial hello message.
498 *
499 * @param conn connection
500 * @param msg message
501 * @return 0 on success, error code otherwise
502 */
503 int xfr_send_hello(Conn *conn){
504 int err = 0;
505 dprintf(">\n");
507 err = IOStream_print(conn->out, "(%s %d %d)",
508 atom_name(oxfr_hello),
509 XFR_PROTO_MAJOR,
510 XFR_PROTO_MINOR);
511 if(err < 0) goto exit;
512 IOStream_flush(conn->out);
513 err = xfr_response(conn);
514 exit:
515 dprintf("< err=%d\n", err);
516 return err;
517 }
519 int xfr_send_xfr(Conn *conn, uint32_t vmid){
520 int err;
522 err = IOStream_print(conn->out, "(%s %d)",
523 atom_name(oxfr_xfr), vmid);
524 return (err < 0 ? err : 0);
525 }
527 int xfr_send_xfr_ok(Conn *conn, uint32_t vmid){
528 int err = 0;
530 err = IOStream_print(conn->out, "(%s %d)",
531 atom_name(oxfr_xfr_ok), vmid);
532 return (err < 0 ? err : 0);
533 }
535 int xfr_send_migrate_ok(Conn *conn, uint32_t vmid){
536 int err = 0;
538 err = IOStream_print(conn->out, "(%s %d)",
539 atom_name(oxfr_migrate_ok), vmid);
540 return (err < 0 ? err : 0);
541 }
543 int xfr_send_save_ok(Conn *conn){
544 int err = 0;
546 err = IOStream_print(conn->out, "(%s)",
547 atom_name(oxfr_save_ok));
548 return (err < 0 ? err : 0);
549 }
551 int xfr_send_suspend(Conn *conn, uint32_t vmid){
552 int err = 0;
554 err = IOStream_print(conn->out, "(%s %d)",
555 atom_name(oxfr_vm_suspend), vmid);
556 return (err < 0 ? err : 0);
557 }
559 /** Suspend a vm on behalf of save/migrate.
560 */
561 int xfr_vm_suspend(Conn *xend, uint32_t vmid){
562 int err = 0;
563 dprintf("> vmid=%u\n", vmid);
564 err = xfr_send_suspend(xend, vmid);
565 if(err) goto exit;
566 IOStream_flush(xend->out);
567 err = xfr_response(xend);
568 exit:
569 dprintf("< err=%d\n", err);
570 return err;
571 }
573 int xfr_send_destroy(Conn *conn, uint32_t vmid){
574 int err = 0;
576 err = IOStream_print(conn->out, "(%s %d)",
577 atom_name(oxfr_vm_destroy), vmid);
578 return (err < 0 ? err : 0);
579 }
581 /** Destroy a vm on behalf of save/migrate.
582 */
583 int xfr_vm_destroy(Conn *xend, uint32_t vmid){
584 int err = 0;
585 dprintf("> vmid=%u\n", vmid);
586 err = xfr_send_destroy(xend, vmid);
587 if(err) goto exit;
588 IOStream_flush(xend->out);
589 err = xfr_response(xend);
590 exit:
591 dprintf("< err=%d\n", err);
592 return err;
593 }
595 /** Get vm state. Send transfer message.
596 *
597 * @param peer connection
598 * @param msg message
599 * @return 0 on success, error code otherwise
600 */
601 int xfr_send_state(XfrState *state, Conn *xend, Conn *peer){
602 int err = 0;
603 Sxpr sxpr;
605 dprintf(">\n");
606 XfrState_set_state(state, XFR_STATE);
607 // Send xfr message and the domain state.
608 err = xfr_send_xfr(peer, state->vmid);
609 if(err) goto exit;
610 dprintf(">*** Sending domain %u\n", state->vmid);
611 err = xen_domain_snd(xend, peer->out,
612 state->vmid,
613 state->vmconfig, state->vmconfig_n,
614 state->live);
615 dprintf(">*** Sent domain %u\n", state->vmid);
616 if(err) goto exit;
617 // Sending the domain suspends it, and there's no way back.
618 // So destroy it now. If anything goes wrong now it's too late.
619 dprintf(">*** Destroying domain %u\n", state->vmid);
620 err = xfr_vm_destroy(xend, state->vmid);
621 if(err) goto exit;
622 err = xfr_error(peer, err);
623 if(err) goto exit;
624 IOStream_flush(peer->out);
625 // Read the response from the peer.
626 err = Conn_sxpr(peer, &sxpr);
627 if(err) goto exit;
628 if(sxpr_elementp(sxpr, oxfr_err)){
629 // Error.
630 int errcode;
631 err = intof(sxpr_childN(sxpr, 0, ONONE), &errcode);
632 if(!err) err = errcode;
633 } else if(sxpr_elementp(sxpr, oxfr_xfr_ok)){
634 // Ok - get the new domain id.
635 err = intof(sxpr_childN(sxpr, 0, ONONE), &state->vmid_new);
636 xfr_error(peer, err);
637 } else {
638 // Anything else is invalid. But it may be too late.
639 err = -EINVAL;
640 xfr_error(peer, err);
641 }
642 exit:
643 XfrState_set_err(state, err);
644 dprintf("< err=%d\n", err);
645 return err;
646 }
648 /** Finish the transfer.
649 */
650 int xfr_send_done(XfrState *state, Conn *xend){
651 int err = 0;
652 int first_err = 0;
654 first_err = XfrState_first_err(state);
655 if(first_err){
656 XfrState_set_state(state, XFR_FAIL);
657 } else {
658 XfrState_set_state(state, XFR_DONE);
659 }
660 if(first_err){
661 err = xfr_error(xend, first_err);
662 } else {
663 // Report new domain id to xend.
664 err = xfr_send_migrate_ok(xend, state->vmid_new);
665 }
667 XfrState_set_err(state, err);
668 if(XfrState_first_err(state)){
669 int s, serr;
671 wprintf("> Transfer errors:\n");
672 for(s = 0; s < XFR_MAX; s++){
673 serr = state->state_err[s];
674 if(!serr) continue;
675 wprintf("> state=%-12s err=%d\n", xfr_state_name(s), serr);
676 }
677 } else {
678 wprintf("> Transfer OK\n");
679 }
680 dprintf("< err=%d\n", err);
681 return err;
682 }
684 /** Migrate a vm to another node.
685 *
686 * @param xend connection
687 * @return 0 on success, error code otherwise
688 */
689 int xfr_send(Args *args, XfrState *state, Conn *xend, uint32_t addr, uint32_t port){
690 int err = 0;
691 Conn _peer = {}, *peer = &_peer;
692 int flags = 0;
693 struct in_addr xfr_addr;
694 uint16_t xfr_port;
695 time_t t0 = time(NULL), t1;
697 dprintf(">\n");
698 flags |= CONN_NOBUFFER;
699 if(args->compress){
700 flags |= CONN_WRITE_COMPRESS;
701 }
702 xfr_addr.s_addr = addr;
703 xfr_port = port;
704 if(!xfr_port) xfr_port = htons(XFRD_PORT);
705 dprintf("> Xfr vmid=%u\n", state->vmid);
706 dprintf("> Xfr xfr_addr=%s:%d\n", inet_ntoa(xfr_addr), ntohs(xfr_port));
707 err = Conn_connect(peer, flags, xfr_addr, xfr_port);
708 if(err) goto exit;
709 XfrState_set_state(state, XFR_HELLO);
710 // Send hello message.
711 err = xfr_send_hello(peer);
712 if(err) goto exit;
713 printf("\n");
714 // Send vm state.
715 err = xfr_send_state(state, xend, peer);
716 if(err) goto exit;
717 if(args->compress){
718 IOStream *zio = peer->out;
719 int plain_bytes = lzi_stream_plain_bytes(zio);
720 int comp_bytes = lzi_stream_comp_bytes(zio);
721 float ratio = lzi_stream_ratio(zio);
722 iprintf("> Compression: plain %d bytes, compressed %d bytes, ratio %3.2f\n",
723 plain_bytes, comp_bytes, ratio);
724 }
725 exit:
726 dprintf("> err=%d\n", err);
727 if(err && !XfrState_get_err(state)){
728 XfrState_set_err(state, err);
729 }
730 Conn_close(peer);
731 if(!err){
732 t1 = time(NULL) - t0;
733 iprintf("> Transfer complete in %lu seconds\n", t1);
734 }
735 dprintf("> done err=%d, notifying xend...\n", err);
736 xfr_send_done(state, xend);
737 dprintf("< err=%d\n", err);
738 return err;
739 }
741 /** Save a vm to file.
742 */
743 int xfr_save(Args *args, XfrState *state, Conn *xend, char *file){
744 int err = 0;
745 int compress = 0;
746 IOStream *io = NULL;
748 dprintf("> file=%s\n", file);
749 if(compress){
750 io = gzip_stream_fopen(file, "wb1");
751 } else {
752 io = file_stream_fopen(file, "wb");
753 }
754 if(!io){
755 eprintf("> Failed to open %s\n", file);
756 err = -EINVAL;
757 goto exit;
758 }
759 err = xen_domain_snd(xend, io,
760 state->vmid,
761 state->vmconfig, state->vmconfig_n,
762 0);
763 if(err){
764 err = xfr_error(xend, err);
765 } else {
766 err = xfr_send_save_ok(xend);
767 }
768 exit:
769 if(io){
770 IOStream_close(io);
771 IOStream_free(io);
772 }
773 if(err){
774 unlink(file);
775 }
776 dprintf("< err=%d\n", err);
777 return err;
778 }
780 /** Accept the transfer of a vm from another node.
781 *
782 * @param peer connection
783 * @param msg message
784 * @return 0 on success, error code otherwise
785 */
786 int xfr_recv(Args *args, XfrState *state, Conn *peer){
787 int err = 0;
788 time_t t0 = time(NULL), t1;
789 Sxpr sxpr;
791 dprintf(">\n");
792 err = xen_domain_rcv(peer->in, &state->vmid_new, &state->vmconfig, &state->vmconfig_n);
793 if(err) goto exit;
794 // Read from the peer. This is just so we wait before configuring.
795 // When migrating to the same host the peer must destroy the domain
796 // before we configure the new one.
797 err = Conn_sxpr(peer, &sxpr);
798 if(err) goto exit;
799 //sleep(2);
800 err = xen_domain_configure(state->vmid_new, state->vmconfig, state->vmconfig_n);
801 if(err) goto exit;
802 err = xen_domain_unpause(state->vmid_new);
803 if(err) goto exit;
804 // Report new domain id to peer.
805 err = xfr_send_xfr_ok(peer, state->vmid_new);
806 if(err) goto exit;
807 // Get the final ok.
808 err = xfr_response(peer);
809 exit:
810 if(!err){
811 t1 = time(NULL) - t0;
812 iprintf("> Transfer complete in %lu seconds\n", t1);
813 }
814 if(err){
815 xfr_error(peer, err);
816 }
817 dprintf("< err=%d\n", err);
818 return err;
819 }
821 /** Listen for a hello followed by a service request.
822 * The request can be from the local xend or from xfrd on another node.
823 *
824 * @param peersock socket
825 * @param peer_in peer address
826 * @return 0 on success, error code otherwise
827 */
828 int xfrd_service(Args *args, int peersock, struct sockaddr_in peer_in){
829 int err = 0;
830 Sxpr sxpr;
831 Conn _conn = {}, *conn = &_conn;
832 int flags = CONN_NOBUFFER;
834 dprintf(">\n");
835 err = Conn_init(conn, flags, peersock, peer_in);
836 if(err) goto exit;
837 //dprintf(">xfr_hello... \n");
838 err = xfr_hello(conn);
839 if(err) goto exit;
840 //dprintf("> sxpr...\n");
841 err = Conn_sxpr(conn, &sxpr);
842 if(err) goto exit;
843 //dprintf("> sxpr=\n");
844 //objprint(iostdout, sxpr, PRINT_TYPE); IOStream_print(iostdout, "\n");
845 if(sxpr_elementp(sxpr, oxfr_migrate)){
846 // Migrate message from xend.
847 uint32_t addr;
848 uint16_t port;
849 XfrState _state = {}, *state = &_state;
850 int n = 0;
852 dprintf("> xfr.migrate\n");
853 err = intof(sxpr_childN(sxpr, n++, ONONE), &state->vmid);
854 if(err) goto exit;
855 err = stringof(sxpr_childN(sxpr, n++, ONONE), &state->vmconfig);
856 if(err) goto exit;
857 state->vmconfig_n = strlen(state->vmconfig);
858 err = addrof(sxpr_childN(sxpr, n++, ONONE), &addr);
859 if(err) goto exit;
860 err = portof(sxpr_childN(sxpr, n++, ONONE), &port);
861 if(err) goto exit;
862 err = intof(sxpr_childN(sxpr, n++, ONONE), &state->live);
863 if(err) goto exit;
864 err = xfr_send(args, state, conn, addr, port);
866 } else if(sxpr_elementp(sxpr, oxfr_save)){
867 // Save message from xend.
868 char *file;
869 XfrState _state = {}, *state = &_state;
870 int n = 0;
872 dprintf("> xfr.save\n");
873 err = intof(sxpr_childN(sxpr, n++, ONONE), &state->vmid);
874 if(err) goto exit;
875 err = stringof(sxpr_childN(sxpr, n++, ONONE), &state->vmconfig);
876 if(err) goto exit;
877 state->vmconfig_n = strlen(state->vmconfig);
878 err = stringof(sxpr_childN(sxpr, n++, ONONE), &file);
879 if(err) goto exit;
880 err = xfr_save(args, state, conn, file);
882 } else if(sxpr_elementp(sxpr, oxfr_xfr)){
883 // Xfr message from peer xfrd.
884 XfrState _state = {}, *state = &_state;
885 int n = 0;
887 dprintf("> xfr.xfr\n");
888 err = intof(sxpr_childN(sxpr, n++, ONONE), &state->vmid);
889 if(err) goto exit;
890 err = xfr_recv(args, state, conn);
892 } else{
893 // Anything else is invalid.
894 err = -EINVAL;
895 eprintf("> Invalid message: ");
896 objprint(iostderr, sxpr, 0);
897 IOStream_print(iostderr, "\n");
898 xfr_error(conn, err);
899 }
900 exit:
901 Conn_close(conn);
902 dprintf("< err=%d\n", err);
903 return err;
904 }
906 /** Accept an incoming connection.
907 *
908 * @param sock tcp socket
909 * @return 0 on success, error code otherwise
910 */
911 int xfrd_accept(Args *args, int sock){
912 struct sockaddr_in peer_in;
913 struct sockaddr *peer = (struct sockaddr *)&peer_in;
914 socklen_t peer_n = sizeof(peer_in);
915 int peersock;
916 pid_t pid;
917 int err = 0;
919 dprintf(">\n");
920 dprintf("> accept...\n");
921 peersock = accept(sock, peer, &peer_n);
922 dprintf("> accept=%d\n", peersock);
923 if(peersock < 0){
924 perror("accept");
925 err = -errno;
926 goto exit;
927 }
928 iprintf("> Accepted connection from %s:%d\n",
929 inet_ntoa(peer_in.sin_addr), htons(peer_in.sin_port));
930 pid = fork();
931 if(pid > 0){
932 // Parent, fork succeeded.
933 iprintf("> Forked child pid=%d\n", pid);
934 close(peersock);
935 } else if (pid < 0){
936 // Parent, fork failed.
937 perror("fork");
938 close(peersock);
939 } else {
940 // Child.
941 iprintf("> Xfr service for %s:%d\n",
942 inet_ntoa(peer_in.sin_addr), htons(peer_in.sin_port));
943 err = xfrd_service(args, peersock, peer_in);
944 iprintf("> Xfr service err=%d\n", err);
945 shutdown(peersock, 2);
946 exit(err ? 1 : 0);
947 }
948 exit:
949 dprintf("< err=%d\n", err);
950 return err;
951 }
953 /** Socket select loop.
954 * Accepts connections on the tcp socket.
955 *
956 * @param listen_sock tcp listen socket
957 * @return 0 on success, error code otherwise
958 */
959 int xfrd_select(Args *args, int listen_sock){
960 int err = 0;
961 SelectSet set = {};
962 dprintf("> socks: %d\n", listen_sock);
963 while(1){
964 SelectSet_zero(&set);
965 SelectSet_add_read(&set, listen_sock);
966 err = SelectSet_select(&set, NULL);
967 if(err < 0){
968 if(errno == EINTR) continue;
969 perror("select");
970 goto exit;
971 }
972 if(FD_ISSET(listen_sock, &set.rd)){
973 xfrd_accept(args, listen_sock);
974 }
975 }
976 exit:
977 dprintf("< err=%d\n", err);
978 return err;
979 }
981 /** Create a socket.
982 *
983 * @param args program arguments
984 * @param socktype socket type
985 * @param reuse whether to set SO_REUSEADDR
986 * @param val return value for the socket
987 * @return 0 on success, error code otherwise
988 */
989 int create_socket(Args *args, int socktype, int reuse, int *val){
990 int err = 0;
991 int sock = 0;
992 struct sockaddr_in addr_in;
993 struct sockaddr *addr = (struct sockaddr *)&addr_in;
994 socklen_t addr_n = sizeof(addr_in);
996 dprintf(">\n");
997 // Create socket and bind it.
998 sock = socket(AF_INET, socktype, 0);
999 if(sock < 0){
1000 err = -errno;
1001 goto exit;
1003 addr_in.sin_family = AF_INET;
1004 addr_in.sin_addr.s_addr = INADDR_ANY;
1005 addr_in.sin_port = args->port;
1006 dprintf("> port=%d\n", ntohs(addr_in.sin_port));
1007 if(reuse){
1008 // Set socket option to reuse address.
1009 int val = 1;
1010 err = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
1011 if(err < 0){
1012 err = -errno;
1013 perror("setsockopt");
1014 goto exit;
1017 err = bind(sock, addr, addr_n);
1018 if(err < 0){
1019 err = -errno;
1020 perror("bind");
1021 goto exit;
1023 exit:
1024 *val = (err ? -1 : sock);
1025 dprintf("< err=%d\n", err);
1026 return err;
1029 /** Create the tcp listen socket.
1031 * @param args program arguments
1032 * @param val return value for the socket
1033 * @return 0 on success, error code otherwise
1034 */
1035 int xfrd_listen_socket(Args *args, int *val){
1036 int err = 0;
1037 int sock;
1038 dprintf(">\n");
1039 err = create_socket(args, SOCK_STREAM, 1, &sock);
1040 if(err) goto exit;
1041 dprintf("> listen...\n");
1042 err = listen(sock, 5);
1043 if(err < 0){
1044 err = -errno;
1045 perror("listen");
1046 goto exit;
1048 exit:
1049 *val = (err ? -1 : sock);
1050 if(err) close(sock);
1051 dprintf("< err=%d\n", err);
1052 return err;
1055 /** Type for signal handling functions. */
1056 typedef void SignalAction(int code, siginfo_t *info, void *data);
1058 /** Handle SIGCHLD by getting child exit status.
1059 * This prevents child processes being defunct.
1061 * @param code signal code
1062 * @param info signal info
1063 * @param data
1064 */
1065 void sigaction_SIGCHLD(int code, siginfo_t *info, void *data){
1066 int status;
1067 pid_t pid;
1068 //dprintf("> child_exit=%d waiting...\n", child_exit);
1069 pid = wait(&status);
1070 dprintf("> child pid=%d status=%d\n", pid, status);
1073 /** Handle SIGPIPE.
1075 * @param code signal code
1076 * @param info signal info
1077 * @param data
1078 */
1079 void sigaction_SIGPIPE(int code, siginfo_t *info, void *data){
1080 dprintf("> SIGPIPE\n");
1081 //fflush(stdout);
1082 //fflush(stderr);
1083 //exit(1);
1086 /** Handle SIGALRM.
1088 * @param code signal code
1089 * @param info signal info
1090 * @param data
1091 */
1092 void sigaction_SIGALRM(int code, siginfo_t *info, void *data){
1093 dprintf("> SIGALRM\n");
1096 /** Install a handler for a signal.
1098 * @param signum signal
1099 * @param action handler
1100 * @return 0 on success, error code otherwise
1101 */
1102 int catch_signal(int signum, SignalAction *action){
1103 int err = 0;
1104 struct sigaction sig = {};
1105 sig.sa_sigaction = action;
1106 sig.sa_flags = SA_SIGINFO;
1107 err = sigaction(signum, &sig, NULL);
1108 if(err){
1109 perror("sigaction");
1111 return err;
1114 /** Transfer daemon main program.
1116 * @param args program arguments
1117 * @return 0 on success, error code otherwise
1118 */
1119 int xfrd_main(Args *args){
1120 int err = 0;
1121 int listen_sock;
1123 dprintf(">\n");
1124 catch_signal(SIGCHLD,sigaction_SIGCHLD);
1125 catch_signal(SIGPIPE,sigaction_SIGPIPE);
1126 catch_signal(SIGALRM,sigaction_SIGALRM);
1127 err = xfrd_listen_socket(args, &listen_sock);
1128 if(err) goto exit;
1129 err = xfrd_select(args, listen_sock);
1130 exit:
1131 close(listen_sock);
1132 dprintf("< err=%d\n", err);
1133 return err;
1136 /** Parse command-line arguments and call the xfrd main program.
1138 * @param arg argument count
1139 * @param argv arguments
1140 * @return 0 on success, 1 otherwise
1141 */
1142 int main(int argc, char *argv[]){
1143 int err = 0;
1144 int key = 0;
1145 int long_index = 0;
1146 static const char * LOGFILE = "/var/log/xfrd.log";
1148 freopen(LOGFILE, "w+", stdout);
1149 fclose(stderr);
1150 stderr = stdout;
1151 dprintf(">\n");
1152 set_defaults(args);
1153 while(1){
1154 key = getopt_long(argc, argv, short_opts, long_opts, &long_index);
1155 if(key == -1) break;
1156 switch(key){
1157 case OPT_PORT:
1158 err = !convert_service_to_port(optarg, &args->port);
1159 if(err) goto exit;
1160 break;
1161 case OPT_COMPRESS:
1162 args->compress = TRUE;
1163 break;
1164 case OPT_HELP:
1165 usage(0);
1166 break;
1167 case OPT_VERBOSE:
1168 args->verbose = TRUE;
1169 break;
1170 case OPT_VERSION:
1171 printf("> Version %d.%d\n", XFR_PROTO_MAJOR, XFR_PROTO_MINOR);
1172 exit(0);
1173 break;
1174 default:
1175 usage(EINVAL);
1176 break;
1179 xfr_init();
1180 err = xfrd_main(args);
1181 exit:
1182 if(err && key > 0){
1183 fprintf(stderr, "Error in arg %c\n", key);
1185 return (err ? 1 : 0);