ia64/xen-unstable

view tools/xfrd/xfrd.c @ 1921:24ecc060e9d7

bitkeeper revision 1.1108.21.1 (41062740xHG36OEbpVAmVX5N9WCaNw)

make vmlinuz really stripped
author cl349@freefall.cl.cam.ac.uk
date Tue Jul 27 09:58:24 2004 +0000 (2004-07-27)
parents 095e969226c4
children c6a98003938a 0a4b76b6b5a0
line source
1 /** @file
2 * XFRD - Domain Transfer Daemon for Xen.
3 *
4 * The xfrd is forked by xend to transfer a vm to a remote system.
5 *
6 * The vm is suspended, then its state and memory are transferred to the remote system.
7 * The remote system attempts to create a vm and copy the transferred state and memory into it,
8 * finally resuming the vm. If all is OK the vm ends up running on the remote
9 * system and is removed from the originating system. If the transfer does not complete
10 * successfully the originating system attempts to resume the vm.
11 * The children exit when the transfer completes.
12 *
13 * @author Mike Wray <mike.wray@hpl.hp.com>
14 */
16 #include <stdlib.h>
17 #include <unistd.h>
18 #include <stdio.h>
19 #include <getopt.h>
20 #include <errno.h>
21 #include <sys/types.h>
22 #include <time.h>
23 #include <sys/socket.h>
24 #include <netinet/in.h>
25 #include <arpa/inet.h>
26 #include <string.h>
28 #include <signal.h>
29 #include <sys/wait.h>
30 #include <sys/select.h>
32 #include "allocate.h"
33 #include "file_stream.h"
34 #include "string_stream.h"
35 #include "lzi_stream.h"
36 #include "sys_net.h"
37 #include "sys_string.h"
39 //#include "xdr.h"
40 #include "enum.h"
41 #include "xfrd.h"
43 #include "xen_domain.h"
45 #include "connection.h"
46 #include "select.h"
48 #define MODULE_NAME "XFRD"
49 #define DEBUG 1
50 #include "debug.h"
52 /*
53 sender:
54 xend connects to xfrd and writes migrate message
55 xend writes domain config to xfrd
57 xfrd forks
59 xfrd connects to peer
60 xfrd sends hello, reads response
61 xfrd sends domain
62 xfrd reads response
63 reports progress/status to xend
65 xend reads xfrd for progress/status, disconnects
66 If ok, destroys domain.
67 If not ok, unpauses domain.
69 receiver:
70 xfrd accepts connection on inbound port
71 xfrd forks and accepts connection
72 xfrd receives hello, writes response
73 xfrd receives domain
74 xfrd connects to xend, configures new domain
75 xfrd writes status back to peer, child exits
78 (xfr.hello <major> <minor>)
79 (xfr.err <code> <reason>)
81 xend->xfrd (xfr.migrate <domain> <vmconfig> <host> <port>)
82 (xfr.save <domain> <vmconfig> <file>)
83 xfrd->xend (xfr.suspend <domain>)
84 xfrd->xend (xfr.progress <percent> <rate: kb/s>)
85 xfrd->xend (xfr.err <code> <reason>) | (xfr.ok <domain>)
86 xfrd->xfrd (xfr.xfr <domain>)
87 xfrd->xfrd (xfr.err <code>) | (xfr.ok <domain>)
89 xfrd->xend (xfr.configure <domain> <vmconfig>)
90 */
92 Sxpr oxfr_configure; // (xfr.configure <vmid> <vmconfig>)
93 Sxpr oxfr_err; // (xfr.err <code>)
94 Sxpr oxfr_hello; // (xfr.hello <major> <minor>)
95 Sxpr oxfr_migrate; // (xfr.migrate <vmid> <vmconfig> <host> <port>)
96 Sxpr oxfr_migrate_ok;// (xfr.migrate.ok <value>)
97 Sxpr oxfr_progress; // (xfr.progress <percent> <rate: kb/s>)
98 Sxpr oxfr_save; // (xfr.save <vmid> <vmconfig> <file>)
99 Sxpr oxfr_save_ok; // (xfr.save.ok)
100 Sxpr oxfr_vm_suspend;// (xfr.vm.suspend <vmid>)
101 Sxpr oxfr_xfr; // (xfr.xfr <vmid>)
102 Sxpr oxfr_xfr_ok; // (xfr.xfr.ok <vmid>)
104 void xfr_init(void){
105 oxfr_configure = intern("xfr.configure");
106 oxfr_err = intern("xfr.err");
107 oxfr_hello = intern("xfr.hello");
108 oxfr_migrate = intern("xfr.migrate");
109 oxfr_migrate_ok = intern("xfr.migrate.ok");
110 oxfr_progress = intern("xfr.progress");
111 oxfr_save = intern("xfr.save");
112 oxfr_save_ok = intern("xfr.save.ok");
113 oxfr_vm_suspend = intern("xfr.vm.suspend");
114 oxfr_xfr = intern("xfr.xfr");
115 oxfr_xfr_ok = intern("xfr.xfr.ok");
116 }
118 #ifndef TRUE
119 #define TRUE 1
120 #endif
122 #ifndef FALSE
123 #define FALSE 0
124 #endif
126 #define PROGRAM "xfrd"
128 #define OPT_PORT 'P'
129 #define KEY_PORT "port"
130 #define DOC_PORT "<port>\n\txfr port (as a number or service name)"
132 #define OPT_COMPRESS 'Z'
133 #define KEY_COMPRESS "compress"
134 #define DOC_COMPRESS "\n\tuse compression for migration"
136 #define OPT_HELP 'h'
137 #define KEY_HELP "help"
138 #define DOC_HELP "\n\tprint help"
140 #define OPT_VERSION 'v'
141 #define KEY_VERSION "version"
142 #define DOC_VERSION "\n\tprint version"
144 #define OPT_VERBOSE 'V'
145 #define KEY_VERBOSE "verbose"
146 #define DOC_VERBOSE "\n\tverbose flag"
148 /** Print a usage message.
149 * Prints to stdout if err is zero, and exits with 0.
150 * Prints to stderr if err is non-zero, and exits with 1.
151 */
152 void usage(int err){
153 FILE *out = (err ? stderr : stdout);
155 fprintf(out, "Usage: %s [options]\n", PROGRAM);
156 fprintf(out, "-%c, --%s %s\n", OPT_PORT, KEY_PORT, DOC_PORT);
157 fprintf(out, "-%c, --%s %s\n", OPT_COMPRESS, KEY_COMPRESS, DOC_COMPRESS);
158 fprintf(out, "-%c, --%s %s\n", OPT_VERBOSE, KEY_VERBOSE, DOC_VERBOSE);
159 fprintf(out, "-%c, --%s %s\n", OPT_VERSION, KEY_VERSION, DOC_VERSION);
160 fprintf(out, "-%c, --%s %s\n", OPT_HELP, KEY_HELP, DOC_HELP);
161 exit(err ? 1 : 0);
162 }
164 /** Short options. Options followed by ':' take an argument. */
165 static char *short_opts = (char[]){
166 OPT_PORT, ':',
167 OPT_COMPRESS,
168 OPT_HELP,
169 OPT_VERSION,
170 OPT_VERBOSE,
171 0 };
173 /** Long options. */
174 static struct option const long_opts[] = {
175 { KEY_PORT, required_argument, NULL, OPT_PORT },
176 { KEY_COMPRESS, no_argument, NULL, OPT_COMPRESS },
177 { KEY_HELP, no_argument, NULL, OPT_HELP },
178 { KEY_VERSION, no_argument, NULL, OPT_VERSION },
179 { KEY_VERBOSE, no_argument, NULL, OPT_VERBOSE },
180 { NULL, 0, NULL, 0 }
181 };
183 typedef struct Args {
184 int bufsize;
185 unsigned long port;
186 int verbose;
187 int compress;
188 } Args;
190 /** Transfer states. */
191 enum {
192 XFR_INIT,
193 XFR_HELLO,
194 XFR_STATE,
195 XFR_RUN,
196 XFR_FAIL,
197 XFR_DONE,
198 XFR_MAX
199 };
201 /** Initialize an array element for a constant to its string name. */
202 #define VALDEF(val) { val, #val }
204 /** Names for the transfer states. */
205 static EnumDef xfr_states[] = {
206 VALDEF(XFR_INIT),
207 VALDEF(XFR_HELLO),
208 VALDEF(XFR_STATE),
209 VALDEF(XFR_RUN),
210 VALDEF(XFR_FAIL),
211 VALDEF(XFR_DONE),
212 { 0, NULL }
213 };
216 /** State machine for transfer. */
217 typedef struct XfrState {
218 /** Current state. */
219 int state;
220 /** Error codes for the states. */
221 int state_err[XFR_MAX];
222 /** First error. */
223 int err;
224 /** State when first error happened. */
225 int err_state;
227 uint32_t vmid;
228 char* vmconfig;
229 int vmconfig_n;
230 unsigned long xfr_port;
231 char *xfr_host;
232 uint32_t vmid_new;
233 } XfrState;
235 /** Get the name of a transfer state.
236 *
237 * @param s state
238 * @return name
239 */
240 char * xfr_state_name(int s){
241 return enum_val_to_name(s, xfr_states);
242 }
244 /** Set the state of a transfer.
245 *
246 * @param s transfer
247 * @param state state
248 * @return state
249 */
250 int XfrState_set_state(XfrState *s, int state){
251 s->state = state;
252 return s->state;
253 }
255 /** Get the state of a transfer.
256 *
257 * @param s transfer
258 * @return state
259 */
260 int XfrState_get_state(XfrState *s){
261 return s->state;
262 }
264 /** Set an error in the current state.
265 * Does nothing if an error is already set.
266 *
267 * @param s transfer
268 * @param err error
269 * @return error
270 */
271 int XfrState_set_err(XfrState *s, int err){
272 if(!s->state_err[s->state]){
273 s->state_err[s->state] = err;
274 }
275 if(!s->err){
276 s->err = err;
277 s->err_state = s->state;
278 }
279 return err;
280 }
282 /** Get the error in the current state.
283 *
284 * @param s transfer
285 * @return error
286 */
287 int XfrState_get_err(XfrState *s){
288 return s->state_err[s->state];
289 }
291 /** Get the first error of a transfer.
292 *
293 * @param s transfer
294 * @return error
295 */
296 int XfrState_first_err(XfrState *s){
297 return s->err;
298 }
300 /** Get the state a transfer was in when it had its first error.
301 *
302 * @param s transfer
303 * @return error state
304 */
305 int XfrState_first_err_state(XfrState *s){
306 return s->err_state;
307 }
309 /** Xfrd arguments. */
310 static Args _args = {};
312 /** Xfrd arguments. */
313 static Args *args = &_args;
315 /** Set xfrd default arguments.
316 *
317 * @param args arguments to set
318 */
319 void set_defaults(Args *args){
320 args->compress = FALSE;
321 args->bufsize = 128 * 1024;
322 args->port = htons(XFRD_PORT);
323 }
325 int stringof(Sxpr exp, char **s){
326 int err = 0;
327 dprintf(">\n");
328 objprint(iostdout, exp, PRINT_TYPE); IOStream_print(iostdout, "\n");
329 if(ATOMP(exp)){
330 *s = atom_name(exp);
331 } else if(STRINGP(exp)){
332 *s = string_string(exp);
333 } else {
334 err = -EINVAL;
335 *s = NULL;
336 }
337 dprintf("< err=%d s=%s\n", err, *s);
338 return err;
339 }
341 int intof(Sxpr exp, int *v){
342 int err = 0;
343 char *s;
344 unsigned long l;
345 dprintf(">\n");
346 objprint(iostdout, exp, 0); IOStream_print(iostdout, "\n");
347 if(INTP(exp)){
348 *v = OBJ_INT(exp);
349 } else {
350 err = stringof(exp, &s);
351 if(err) goto exit;
352 err = convert_atoul(s, &l);
353 *v = (int)l;
354 }
355 exit:
356 dprintf("< err=%d v=%d\n", err, *v);
357 return err;
358 }
360 int addrof(Sxpr exp, uint32_t *v){
361 char *h;
362 unsigned long a;
363 int err = 0;
364 dprintf(">\n");
365 objprint(iostdout, exp, 0); IOStream_print(iostdout, "\n");
366 err = stringof(exp, &h);
367 if(err) goto exit;
368 if(get_host_address(h, &a)){
369 err = -EINVAL;
370 goto exit;
371 }
372 *v = a;
373 exit:
374 dprintf("< err=%d v=%x\n", err, *v);
375 return err;
376 }
378 int portof(Sxpr exp, uint16_t *v){
379 char *s;
380 int err = 0;
381 dprintf(">\n");
382 objprint(iostdout, exp, 0); IOStream_print(iostdout, "\n");
383 if(INTP(exp)){
384 *v = get_ul(exp);
385 *v = htons(*v);
386 } else {
387 unsigned long p;
388 err = stringof(exp, &s);
389 if(err) goto exit;
390 err = convert_service_to_port(s, &p);
391 if(err){
392 err = -EINVAL;
393 goto exit;
394 }
395 *v = p;
396 }
397 exit:
398 dprintf("< err=%d v=%u\n", err, *v);
399 return err;
400 }
402 static inline struct in_addr inaddr(uint32_t addr){
403 return (struct in_addr){ .s_addr = addr };
404 }
406 time_t stats(time_t t0, uint64_t offset, uint64_t memory, float *percent, float *rate){
407 time_t t1 = time(NULL);
408 *percent = (offset * 100.0f) / memory;
409 t1 = time(NULL) - t0;
410 *rate = (t1 ? offset/(t1 * 1024.0f) : 0.0f);
411 return t1;
412 }
414 /** Notify success or error.
415 *
416 * @param conn connection
417 * @param errcode error code
418 * @return 0 on success, error code otherwise
419 */
420 int xfr_error(Conn *conn, int errcode){
421 int err = 0;
423 if(!conn->out) return -ENOTCONN;
424 if(errcode <0) errcode = -errcode;
425 err = IOStream_print(conn->out, "(%s %d)",
426 atom_name(oxfr_err), errcode);
427 return (err < 0 ? err : 0);
428 }
430 /** Read a response message - error or ok.
431 *
432 * @param conn connection
433 * @return 0 on success, error code otherwise
434 */
435 int xfr_response(Conn *conn){
436 int err;
437 Sxpr sxpr;
439 dprintf(">\n");
440 if(!conn->out) return -ENOTCONN;
441 err = Conn_sxpr(conn, &sxpr);
442 if(err) goto exit;
443 if(sxpr_elementp(sxpr, oxfr_err)){
444 int errcode;
445 err = intof(sxpr_childN(sxpr, 0, ONONE), &errcode);
446 if(err) goto exit;
447 err = errcode;
448 }
449 exit:
450 dprintf("< err=%d\n", err);
451 return err;
452 }
454 /** Get the initial hello message and check the protocol version.
455 * It is an error to receive anything other than a hello message
456 * with the correct protocol version.
457 *
458 * @param conn connection
459 * @return 0 on success, error code otherwise
460 */
461 int xfr_hello(Conn *conn){
462 int err;
463 uint32_t major = XFR_PROTO_MAJOR, minor = XFR_PROTO_MINOR;
464 uint32_t hello_major, hello_minor;
465 Sxpr sxpr;
466 if(!conn->in) return -ENOTCONN;
467 dprintf(">\n");
468 err = Conn_sxpr(conn, &sxpr);
469 if(err) goto exit;
470 if(!sxpr_elementp(sxpr, oxfr_hello)){
471 dprintf("> sxpr_elementp test failed\n");
472 err = -EINVAL;
473 goto exit;
474 }
475 err = intof(sxpr_childN(sxpr, 0, ONONE), &hello_major);
476 if(err) goto exit;
477 err = intof(sxpr_childN(sxpr, 1, ONONE), &hello_minor);
478 if(err) goto exit;
479 if(hello_major != major || hello_minor != minor){
480 eprintf("> Wanted protocol version %d.%d, got %d.%d",
481 major, minor, hello_major, hello_minor);
482 err = -EINVAL;
483 goto exit;
484 }
485 exit:
486 xfr_error(conn, err);
487 if(err){
488 eprintf("> Hello failed: %d\n", err);
489 }
490 dprintf("< err=%d\n", err);
491 return err;
492 }
494 /** Send the initial hello message.
495 *
496 * @param conn connection
497 * @param msg message
498 * @return 0 on success, error code otherwise
499 */
500 int xfr_send_hello(Conn *conn){
501 int err = 0;
502 dprintf(">\n");
504 err = IOStream_print(conn->out, "(%s %d %d)",
505 atom_name(oxfr_hello),
506 XFR_PROTO_MAJOR,
507 XFR_PROTO_MINOR);
508 if(err < 0) goto exit;
509 IOStream_flush(conn->out);
510 dprintf("> xfr_response...\n");
511 err = xfr_response(conn);
512 exit:
513 dprintf("< err=%d\n", err);
514 return err;
515 }
517 int xfr_send_xfr(Conn *conn, uint32_t vmid){
518 int err;
520 err = IOStream_print(conn->out, "(%s %d)",
521 atom_name(oxfr_xfr), vmid);
522 return (err < 0 ? err : 0);
523 }
525 int xfr_send_xfr_ok(Conn *conn, uint32_t vmid){
526 int err = 0;
528 err = IOStream_print(conn->out, "(%s %d)",
529 atom_name(oxfr_xfr_ok), vmid);
530 return (err < 0 ? err : 0);
531 }
533 int xfr_send_migrate_ok(Conn *conn, uint32_t vmid){
534 int err = 0;
536 err = IOStream_print(conn->out, "(%s %d)",
537 atom_name(oxfr_migrate_ok), vmid);
538 return (err < 0 ? err : 0);
539 }
541 int xfr_send_save_ok(Conn *conn){
542 int err = 0;
544 err = IOStream_print(conn->out, "(%s)",
545 atom_name(oxfr_save_ok));
546 return (err < 0 ? err : 0);
547 }
549 int xfr_send_suspend(Conn *conn, uint32_t vmid){
550 int err = 0;
552 err = IOStream_print(conn->out, "(%s %d)",
553 atom_name(oxfr_vm_suspend), vmid);
554 return (err < 0 ? err : 0);
555 }
557 /** Suspend a vm on behalf of save/migrate.
558 */
559 int xfr_vm_suspend(Conn *xend, uint32_t vmid){
560 int err = 0;
561 dprintf("> vmid=%u\n", vmid);
562 err = xfr_send_suspend(xend, vmid);
563 if(err) goto exit;
564 IOStream_flush(xend->out);
565 err = xfr_response(xend);
566 exit:
567 dprintf("< err=%d\n", err);
568 return err;
569 }
571 /** Get vm state. Send transfer message.
572 *
573 * @param peer connection
574 * @param msg message
575 * @return 0 on success, error code otherwise
576 */
577 int xfr_send_state(XfrState *state, Conn *xend, Conn *peer){
578 int err = 0;
579 Sxpr sxpr;
581 dprintf(">\n");
582 XfrState_set_state(state, XFR_STATE);
583 // Send xfr message and the domain state.
584 err = xfr_send_xfr(peer, state->vmid);
585 if(err) goto exit;
586 err = xen_domain_snd(xend, peer->out,
587 state->vmid, state->vmconfig, state->vmconfig_n);
588 if(err) goto exit;
589 IOStream_flush(peer->out);
590 // Read the response from the peer.
591 err = Conn_sxpr(peer, &sxpr);
592 if(err) goto exit;
593 if(sxpr_elementp(sxpr, oxfr_err)){
594 // Error.
595 int errcode;
596 err = intof(sxpr_childN(sxpr, 0, ONONE), &errcode);
597 if(!err) err = errcode;
598 } else if(sxpr_elementp(sxpr, oxfr_xfr_ok)){
599 // Ok - get the new domain id.
600 err = intof(sxpr_childN(sxpr, 0, ONONE), &state->vmid_new);
601 xfr_error(peer, err);
602 } else {
603 // Anything else is invalid. But it may be too late.
604 err = -EINVAL;
605 xfr_error(peer, err);
606 }
607 exit:
608 XfrState_set_err(state, err);
609 dprintf("< err=%d\n", err);
610 return err;
611 }
613 /** Finish the transfer.
614 */
615 int xfr_send_done(XfrState *state, Conn *xend){
616 int err = 0;
617 int first_err = 0;
619 first_err = XfrState_first_err(state);
620 if(first_err){
621 XfrState_set_state(state, XFR_FAIL);
622 } else {
623 XfrState_set_state(state, XFR_DONE);
624 }
625 if(first_err){
626 err = xfr_error(xend, first_err);
627 } else {
628 // Report new domain id to xend.
629 err = xfr_send_migrate_ok(xend, state->vmid_new);
630 }
632 XfrState_set_err(state, err);
633 if(XfrState_first_err(state)){
634 int s, serr;
636 wprintf("> Transfer errors:\n");
637 for(s = 0; s < XFR_MAX; s++){
638 serr = state->state_err[s];
639 if(!serr) continue;
640 wprintf("> state=%-12s err=%d\n", xfr_state_name(s), serr);
641 }
642 } else {
643 wprintf("> Transfer OK\n");
644 }
645 dprintf("< err=%d\n", err);
646 return err;
647 }
649 /** Migrate a vm to another node.
650 *
651 * @param xend connection
652 * @return 0 on success, error code otherwise
653 */
654 int xfr_send(Args *args, XfrState *state, Conn *xend, uint32_t addr, uint32_t port){
655 int err = 0;
656 Conn _peer = {}, *peer = &_peer;
657 int flags = 0;
658 struct in_addr xfr_addr;
659 uint16_t xfr_port;
660 time_t t0 = time(NULL), t1;
662 dprintf(">\n");
663 flags |= CONN_NOBUFFER;
664 if(args->compress){
665 flags |= CONN_WRITE_COMPRESS;
666 }
667 xfr_addr.s_addr = addr;
668 xfr_port = port;
669 if(!xfr_port) xfr_port = htons(XFRD_PORT);
670 dprintf("> Xfr vmid=%u\n", state->vmid);
671 dprintf("> Xfr xfr_addr=%s:%d\n", inet_ntoa(xfr_addr), ntohs(xfr_port));
672 err = Conn_connect(peer, flags, xfr_addr, xfr_port);
673 if(err) goto exit;
674 printf("\n");
675 XfrState_set_state(state, XFR_HELLO);
676 // Send hello message.
677 err = xfr_send_hello(peer);
678 if(err) goto exit;
679 printf("\n");
680 // Send vm state.
681 err = xfr_send_state(state, xend, peer);
682 if(err) goto exit;
683 if(args->compress){
684 IOStream *zio = peer->out;
685 int plain_bytes = lzi_stream_plain_bytes(zio);
686 int comp_bytes = lzi_stream_comp_bytes(zio);
687 float ratio = lzi_stream_ratio(zio);
688 dprintf("> Compression: plain %d bytes, compressed %d bytes, ratio %3.2f\n",
689 plain_bytes, comp_bytes, ratio);
690 }
691 printf("\n");
692 exit:
693 dprintf("> err=%d\n", err);
694 if(err && !XfrState_get_err(state)){
695 XfrState_set_err(state, err);
696 }
697 Conn_close(peer);
698 if(!err){
699 t1 = time(NULL) - t0;
700 dprintf("> Transfer complete in %lu seconds\n", t1);
701 }
702 dprintf("> done err=%d, notifying xend...\n", err);
703 xfr_send_done(state, xend);
704 dprintf("< err=%d\n", err);
705 return err;
706 }
708 /** Save a vm to file.
709 */
710 int xfr_save(Args *args, XfrState *state, Conn *xend, char *file){
711 int err = 0;
712 IOStream *io = NULL;
714 dprintf("> file=%s\n", file);
715 io = file_stream_fopen(file, "wb");
716 if(!io){
717 dprintf("> Failed to open %s\n", file);
718 err = -EIO;
719 goto exit;
720 }
721 err = xen_domain_snd(xend, io, state->vmid, state->vmconfig, state->vmconfig_n);
722 if(err){
723 err = xfr_error(xend, err);
724 } else {
725 err = xfr_send_save_ok(xend);
726 }
727 exit:
728 if(io){
729 IOStream_close(io);
730 IOStream_free(io);
731 }
732 dprintf("< err=%d\n", err);
733 return err;
734 }
736 /** Accept the transfer of a vm from another node.
737 *
738 * @param peer connection
739 * @param msg message
740 * @return 0 on success, error code otherwise
741 */
742 int xfr_recv(Args *args, XfrState *state, Conn *peer){
743 int err = 0;
744 time_t t0 = time(NULL), t1;
746 dprintf(">\n");
747 err = xen_domain_rcv(peer->in, &state->vmid_new, &state->vmconfig, &state->vmconfig_n);
748 if(err) goto exit;
750 err = xen_domain_configure(state->vmid_new, state->vmconfig, state->vmconfig_n);
751 if(err) goto exit;
753 // Report new domain id to peer.
754 err = xfr_send_xfr_ok(peer, state->vmid_new);
755 if(err) goto exit;
756 // Get the final ok.
757 err = xfr_response(peer);
758 exit:
759 if(!err){
760 t1 = time(NULL) - t0;
761 dprintf("> Transfer complete in %lu seconds\n", t1);
762 }
763 if(err){
764 xfr_error(peer, err);
765 }
766 dprintf("< err=%d\n", err);
767 return err;
768 }
770 /** Listen for a hello followed by a service request.
771 * The request can be from the local xend or from xfrd on another node.
772 *
773 * @param peersock socket
774 * @param peer_in peer address
775 * @return 0 on success, error code otherwise
776 */
777 int xfrd_service(Args *args, int peersock, struct sockaddr_in peer_in){
778 int err = 0;
779 Sxpr sxpr;
780 Conn _conn = {}, *conn = &_conn;
781 int flags = CONN_NOBUFFER;
783 dprintf(">\n");
784 err = Conn_init(conn, flags, peersock, peer_in);
785 if(err) goto exit;
786 dprintf(">xfr_hello... \n");
787 err = xfr_hello(conn);
788 if(err) goto exit;
789 dprintf("> sxpr...\n");
790 err = Conn_sxpr(conn, &sxpr);
791 if(err) goto exit;
792 dprintf("> sxpr=\n");
793 objprint(iostdout, sxpr, PRINT_TYPE); IOStream_print(iostdout, "\n");
794 if(sxpr_elementp(sxpr, oxfr_migrate)){
795 // Migrate message from xend.
796 uint32_t addr;
797 uint16_t port;
798 XfrState _state = {}, *state = &_state;
799 int n = 0;
801 dprintf("> xfr.migrate\n");
802 err = intof(sxpr_childN(sxpr, n++, ONONE), &state->vmid);
803 if(err) goto exit;
804 err = stringof(sxpr_childN(sxpr, n++, ONONE), &state->vmconfig);
805 if(err) goto exit;
806 state->vmconfig_n = strlen(state->vmconfig);
807 err = addrof(sxpr_childN(sxpr, n++, ONONE), &addr);
808 if(err) goto exit;
809 err = portof(sxpr_childN(sxpr, n++, ONONE), &port);
810 if(err) goto exit;
811 err = xfr_send(args, state, conn, addr, port);
813 } else if(sxpr_elementp(sxpr, oxfr_save)){
814 // Save message from xend.
815 char *file;
816 XfrState _state = {}, *state = &_state;
817 int n = 0;
819 dprintf("> xfr.save\n");
820 err = intof(sxpr_childN(sxpr, n++, ONONE), &state->vmid);
821 if(err) goto exit;
822 err = stringof(sxpr_childN(sxpr, n++, ONONE), &state->vmconfig);
823 if(err) goto exit;
824 state->vmconfig_n = strlen(state->vmconfig);
825 err = stringof(sxpr_childN(sxpr, n++, ONONE), &file);
826 if(err) goto exit;
827 err = xfr_save(args, state, conn, file);
829 } else if(sxpr_elementp(sxpr, oxfr_xfr)){
830 // Xfr message from peer xfrd.
831 XfrState _state = {}, *state = &_state;
832 int n = 0;
834 dprintf("> xfr.xfr\n");
835 err = intof(sxpr_childN(sxpr, n++, ONONE), &state->vmid);
836 if(err) goto exit;
837 err = xfr_recv(args, state, conn);
839 } else{
840 // Anything else is invalid.
841 err = -EINVAL;
842 dprintf("> Invalid message: ");
843 objprint(iostderr, sxpr, 0);
844 IOStream_print(iostderr, "\n");
845 xfr_error(conn, err);
846 }
847 exit:
848 Conn_close(conn);
849 dprintf("< err=%d\n", err);
850 return err;
851 }
853 /** Accept an incoming connection.
854 *
855 * @param sock tcp socket
856 * @return 0 on success, error code otherwise
857 */
858 int xfrd_accept(Args *args, int sock){
859 struct sockaddr_in peer_in;
860 struct sockaddr *peer = (struct sockaddr *)&peer_in;
861 socklen_t peer_n = sizeof(peer_in);
862 int peersock;
863 pid_t pid;
864 int err = 0;
866 dprintf(">\n");
867 dprintf("> accept...\n");
868 peersock = accept(sock, peer, &peer_n);
869 dprintf("> accept=%d\n", peersock);
870 if(peersock < 0){
871 perror("accept");
872 err = -errno;
873 goto exit;
874 }
875 iprintf("> Accepted connection from %s:%d\n",
876 inet_ntoa(peer_in.sin_addr), htons(peer_in.sin_port));
877 pid = fork();
878 if(pid > 0){
879 // Parent, fork succeeded.
880 iprintf("> Forked child pid=%d\n", pid);
881 close(peersock);
882 } else if (pid < 0){
883 // Parent, fork failed.
884 perror("fork");
885 close(peersock);
886 } else {
887 // Child.
888 iprintf("> Xfr service for %s:%d\n",
889 inet_ntoa(peer_in.sin_addr), htons(peer_in.sin_port));
890 err = xfrd_service(args, peersock, peer_in);
891 iprintf("> Xfr service err=%d\n", err);
892 shutdown(peersock, 2);
893 exit(err ? 1 : 0);
894 }
895 exit:
896 dprintf("< err=%d\n", err);
897 return err;
898 }
900 /** Socket select loop.
901 * Accepts connections on the tcp socket.
902 *
903 * @param listen_sock tcp listen socket
904 * @return 0 on success, error code otherwise
905 */
906 int xfrd_select(Args *args, int listen_sock){
907 int err = 0;
908 SelectSet set = {};
909 dprintf("> socks: %d\n", listen_sock);
910 while(1){
911 SelectSet_zero(&set);
912 SelectSet_add_read(&set, listen_sock);
913 err = SelectSet_select(&set, NULL);
914 if(err < 0){
915 if(errno == EINTR) continue;
916 perror("select");
917 goto exit;
918 }
919 if(FD_ISSET(listen_sock, &set.rd)){
920 xfrd_accept(args, listen_sock);
921 }
922 }
923 exit:
924 dprintf("< err=%d\n", err);
925 return err;
926 }
928 /** Create a socket.
929 *
930 * @param args program arguments
931 * @param socktype socket type
932 * @param reuse whether to set SO_REUSEADDR
933 * @param val return value for the socket
934 * @return 0 on success, error code otherwise
935 */
936 int create_socket(Args *args, int socktype, int reuse, int *val){
937 int err = 0;
938 int sock = 0;
939 struct sockaddr_in addr_in;
940 struct sockaddr *addr = (struct sockaddr *)&addr_in;
941 socklen_t addr_n = sizeof(addr_in);
943 dprintf(">\n");
944 // Create socket and bind it.
945 sock = socket(AF_INET, socktype, 0);
946 if(sock < 0){
947 err = -errno;
948 goto exit;
949 }
950 addr_in.sin_family = AF_INET;
951 addr_in.sin_addr.s_addr = INADDR_ANY;
952 addr_in.sin_port = args->port;
953 dprintf("> port=%d\n", ntohs(addr_in.sin_port));
954 if(reuse){
955 // Set socket option to reuse address.
956 int val = 1;
957 err = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
958 if(err < 0){
959 err = -errno;
960 perror("setsockopt");
961 goto exit;
962 }
963 }
964 err = bind(sock, addr, addr_n);
965 if(err < 0){
966 err = -errno;
967 perror("bind");
968 goto exit;
969 }
970 exit:
971 *val = (err ? -1 : sock);
972 dprintf("< err=%d\n", err);
973 return err;
974 }
976 /** Create the tcp listen socket.
977 *
978 * @param args program arguments
979 * @param val return value for the socket
980 * @return 0 on success, error code otherwise
981 */
982 int xfrd_listen_socket(Args *args, int *val){
983 int err = 0;
984 int sock;
985 dprintf(">\n");
986 err = create_socket(args, SOCK_STREAM, 1, &sock);
987 if(err) goto exit;
988 dprintf("> listen...\n");
989 err = listen(sock, 5);
990 if(err < 0){
991 err = -errno;
992 perror("listen");
993 goto exit;
994 }
995 exit:
996 *val = (err ? -1 : sock);
997 if(err) close(sock);
998 dprintf("< err=%d\n", err);
999 return err;
1002 /** Type for signal handling functions. */
1003 typedef void SignalAction(int code, siginfo_t *info, void *data);
1005 /** Handle SIGCHLD by getting child exit status.
1006 * This prevents child processes being defunct.
1008 * @param code signal code
1009 * @param info signal info
1010 * @param data
1011 */
1012 void sigaction_SIGCHLD(int code, siginfo_t *info, void *data){
1013 int status;
1014 pid_t pid;
1015 //dprintf("> child_exit=%d waiting...\n", child_exit);
1016 pid = wait(&status);
1017 dprintf("> child pid=%d status=%d\n", pid, status);
1020 /** Handle SIGPIPE.
1022 * @param code signal code
1023 * @param info signal info
1024 * @param data
1025 */
1026 void sigaction_SIGPIPE(int code, siginfo_t *info, void *data){
1027 dprintf("> SIGPIPE\n");
1028 //fflush(stdout);
1029 //fflush(stderr);
1030 //exit(1);
1033 /** Handle SIGALRM.
1035 * @param code signal code
1036 * @param info signal info
1037 * @param data
1038 */
1039 void sigaction_SIGALRM(int code, siginfo_t *info, void *data){
1040 dprintf("> SIGALRM\n");
1043 /** Install a handler for a signal.
1045 * @param signum signal
1046 * @param action handler
1047 * @return 0 on success, error code otherwise
1048 */
1049 int catch_signal(int signum, SignalAction *action){
1050 int err = 0;
1051 struct sigaction sig = {};
1052 sig.sa_sigaction = action;
1053 sig.sa_flags = SA_SIGINFO;
1054 err = sigaction(signum, &sig, NULL);
1055 if(err){
1056 perror("sigaction");
1058 return err;
1061 /** Transfer daemon main program.
1063 * @param args program arguments
1064 * @return 0 on success, error code otherwise
1065 */
1066 int xfrd_main(Args *args){
1067 int err = 0;
1068 int listen_sock;
1070 dprintf(">\n");
1071 catch_signal(SIGCHLD,sigaction_SIGCHLD);
1072 catch_signal(SIGPIPE,sigaction_SIGPIPE);
1073 catch_signal(SIGALRM,sigaction_SIGALRM);
1074 err = xfrd_listen_socket(args, &listen_sock);
1075 if(err) goto exit;
1076 err = xfrd_select(args, listen_sock);
1077 exit:
1078 close(listen_sock);
1079 dprintf("< err=%d\n", err);
1080 return err;
1083 /** Parse command-line arguments and call the xfrd main program.
1085 * @param arg argument count
1086 * @param argv arguments
1087 * @return 0 on success, 1 otherwise
1088 */
1089 int main(int argc, char *argv[]){
1090 int err = 0;
1091 int key = 0;
1092 int long_index = 0;
1094 set_defaults(args);
1095 while(1){
1096 key = getopt_long(argc, argv, short_opts, long_opts, &long_index);
1097 if(key == -1) break;
1098 switch(key){
1099 case OPT_PORT:
1100 err = !convert_service_to_port(optarg, &args->port);
1101 if(err) goto exit;
1102 break;
1103 case OPT_COMPRESS:
1104 args->compress = TRUE;
1105 break;
1106 case OPT_HELP:
1107 usage(0);
1108 break;
1109 case OPT_VERBOSE:
1110 args->verbose = TRUE;
1111 break;
1112 case OPT_VERSION:
1113 printf("> Version %d.%d\n", XFR_PROTO_MAJOR, XFR_PROTO_MINOR);
1114 exit(0);
1115 break;
1116 default:
1117 usage(EINVAL);
1118 break;
1121 xfr_init();
1122 err = xfrd_main(args);
1123 exit:
1124 if(err && key > 0){
1125 fprintf(stderr, "Error in arg %c\n", key);
1127 return (err ? 1 : 0);