direct-io.hg

view tools/xcs/xcs.c @ 3483:910d464ee588

bitkeeper revision 1.1159.224.8 (41f230af8MIrNZ0j3raPm-vgsd18GA)

xcs fix from Yuji Tsuchimoto [nox@hiroshima-u.ac.jp]
author iap10@labyrinth.cl.cam.ac.uk
date Sat Jan 22 10:53:35 2005 +0000 (2005-01-22)
parents a9ac02b7727d
children 610068179f96
line source
1 /* xcs.c
2 *
3 * xcs - Xen Control Switch
4 *
5 * Copyright (c) 2004, Andrew Warfield
6 */
8 /*
10 Things we need to select on in xcs:
12 1. Events arriving on /dev/evtchn
14 These will kick a function to read everything off the fd, and scan the
15 associated control message rings, resulting in notifications sent on
16 data channels to connected clients.
18 2. New TCP connections on XCS_PORT.
20 These will either be control (intially) or associated data connections.
22 Control connections will instantiate or rebind to an existing connnection
23 struct. The control channel is used to configure what events will be
24 received on an associated data channel. These two channels are split
25 out because the control channel is synchronous, all messages will return
26 a result from XCS. The data channel is effectively asynchronous, events
27 may arrive in the middle of a control message exchange. Additionally,
28 Having two TCP connections allows the client side to have a blocking
29 listen loop for data messages, while independently interacting on the
30 control channel at other places in the code.
32 Data connections attach to an existing control struct, using a session
33 id that is passed during the control connect. There is currently a
34 one-to-one relationship between data and control channels, but there
35 could just as easily be many data channels, if there were a set of
36 clients with identical interests, or if you wanted to trace an existing
37 client's data traffic.
39 3. Messages arriving on open TCP connections.
40 There are three types of open connections:
42 3a. Messages arriving on open control channel file descriptors.
44 [description of the control protocol here]
46 3b. Messages arriving on open data channel file descriptors.
48 [description of the data protocol here]
50 3c. Messages arriving on (new) unbound connections.
52 A connection must issue a XCS_CONNECT message to specify what
53 it is, after which the connection is moved into one of the above
54 two groups.
56 Additionally, we need a periodic timer to do housekeeping.
58 4. Every XCS_GC_INTERVAL seconds, we need to clean up outstanding state.
59 Specifically, we garbage collect any sessions (connection_t structs)
60 that have been unconnected for a period of time (XCS_SESSION_TIMEOUT),
61 and close any connections that have been openned, but not connected
62 as a control or data connection (XCS_UFD_TIMEOUT).
64 */
66 #include <stdlib.h>
67 #include <stdio.h>
68 #include <unistd.h>
69 #include <sys/time.h>
70 #include <sys/types.h>
71 #include <string.h>
72 #include <signal.h>
73 #include <sys/socket.h>
74 #include <netinet/in.h>
75 #include <arpa/inet.h>
76 #include <errno.h>
77 #include "xcs.h"
79 #undef fd_max
80 #define fd_max(x,y) ((x) > (y) ? (x) : (y))
82 /* ------[ Control channel interfaces ]------------------------------------*/
84 static control_channel_t *cc_list[NR_EVENT_CHANNELS];
85 static int dom_to_port[MAX_DOMS]; /* This should not be a fixed-size array.*/
87 static void init_interfaces(void)
88 {
89 int i;
91 for (i = 0; i < MAX_DOMS; i++)
92 dom_to_port[i] = -1;
93 memset(cc_list, 0, sizeof cc_list);
94 }
96 static control_channel_t *add_interface(u32 dom, int local_port,
97 int remote_port)
98 {
99 control_channel_t *cc=NULL, *oldcc;
100 int ret;
102 if (cc_list[dom_to_port[dom]] != NULL)
103 {
104 return(cc_list[dom_to_port[dom]]);
105 }
107 if (cc_list[local_port] == NULL)
108 {
109 cc = ctrl_chan_new(dom, local_port, remote_port);
110 }
112 if (cc == NULL)
113 return NULL;
115 DPRINTF("added a new interface: dom: %u (l:%d,r:%d): %p\n",
116 dom, local_port, remote_port, cc);
117 DPRINTF("added a new interface: dom: %u (l:%d,r:%d): %p\n",
118 dom, cc->local_port, cc->remote_port, cc);
120 if ((ret = evtchn_bind(cc->local_port)) != 0)
121 {
122 DPRINTF("Got control interface, but couldn't bind evtchan!(%d)\n", ret);
123 ctrl_chan_free(cc);
124 return NULL;
125 }
127 if ( cc_list[cc->local_port] != NULL )
128 {
129 oldcc = cc_list[cc->local_port];
131 if ((oldcc->remote_dom != cc->remote_dom) ||
132 (oldcc->remote_port != cc->remote_port))
133 {
134 DPRINTF("CC conflict! (port: %d, old dom: %u, new dom: %u)\n",
135 cc->local_port, oldcc->remote_dom, cc->remote_dom);
136 dom_to_port[oldcc->remote_dom] = -1;
137 ctrl_chan_free(cc_list[cc->local_port]);
138 }
139 }
141 cc_list[cc->local_port] = cc;
142 dom_to_port[cc->remote_dom] = cc->local_port;
143 cc->type = CC_TYPE_INTERDOMAIN;
144 cc->ref_count = 0;
145 return cc;
146 }
148 control_channel_t *add_virq(int virq)
149 {
150 control_channel_t *cc;
151 int virq_port;
153 if (ctrl_chan_bind_virq(virq, &virq_port) == -1)
154 return NULL;
156 if ((cc_list[virq_port] != NULL) &&
157 (cc_list[virq_port]->type != CC_TYPE_VIRQ))
158 return NULL;
160 if ((cc_list[virq_port] != NULL) &&
161 (cc_list[virq_port]->type == CC_TYPE_VIRQ))
162 return cc_list[virq_port];
164 cc = (control_channel_t *)malloc(sizeof(control_channel_t));
165 if ( cc == NULL ) return NULL;
167 cc->type = CC_TYPE_VIRQ;
168 cc->local_port = virq_port;
169 cc->virq = virq;
171 return cc;
172 }
174 void get_interface(control_channel_t *cc)
175 {
176 if (cc != NULL)
177 cc->ref_count++;
178 }
180 void put_interface(control_channel_t *cc)
181 {
182 if (cc != NULL)
183 {
184 cc->ref_count--;
185 if (cc->ref_count <= 0)
186 {
187 DPRINTF("Freeing cc on port %d.\n", cc->local_port);
188 (void)evtchn_unbind(cc->local_port);
189 ctrl_chan_free(cc);
190 }
191 }
192 }
194 /* ------[ Simple helpers ]------------------------------------------------*/
196 /* listen_socket() is straight from paul sheer's useful select_tut manpage. */
197 static int listen_socket (int listen_port)
198 {
199 struct sockaddr_in a;
200 int s;
201 int yes;
203 if ((s = socket (AF_INET, SOCK_STREAM, 0)) < 0)
204 {
205 perror ("socket");
206 return -1;
207 }
209 yes = 1;
210 if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR,
211 (char *) &yes, sizeof (yes)) < 0)
212 {
213 perror ("setsockopt");
214 close (s);
215 return -1;
216 }
218 memset (&a, 0, sizeof (a));
219 a.sin_port = htons (listen_port);
220 a.sin_family = AF_INET;
221 if (bind(s, (struct sockaddr *) &a, sizeof (a)) < 0)
222 {
223 perror ("bind");
224 close (s);
225 return -1;
226 }
227 printf ("accepting connections on port %d\n", (int) listen_port);
228 listen (s, 10);
229 return s;
230 }
232 /* ------[ Message handlers ]----------------------------------------------*/
234 #define NO_CHANGE 0
235 #define CONNECTED 1
236 #define DISCONNECTED 2
237 int handle_connect_msg( xcs_msg_t *msg, int fd )
238 {
239 xcs_connect_msg_t *cmsg = &msg->u.connect;
240 connection_t *con;
241 int ret = NO_CHANGE;
243 switch (msg->type)
244 {
245 case XCS_CONNECT_CTRL:
246 {
247 if ( cmsg->session_id == 0 )
248 {
249 con = connection_new();
250 if ( con == NULL)
251 {
252 msg->result = XCS_RSLT_FAILED;
253 break;
254 }
255 msg->result = XCS_RSLT_OK;
256 cmsg->session_id = con->id;
257 con->ctrl_fd = fd;
258 ret = CONNECTED;
259 DPRINTF("New control connection\n");
260 break;
261 }
263 con = get_con_by_session(cmsg->session_id);
264 if ( con == NULL )
265 {
266 msg->result = XCS_RSLT_BADSESSION;
267 break;
268 }
269 if ( con->ctrl_fd != -1 )
270 {
271 msg->result = XCS_RSLT_CONINUSE;
272 break;
273 }
274 con->ctrl_fd = fd;
275 msg->result = XCS_RSLT_OK;
276 ret = CONNECTED;
277 DPRINTF("Rebound to control connection\n");
278 break;
279 }
280 case XCS_CONNECT_DATA:
281 {
282 con = get_con_by_session(cmsg->session_id);
283 if ( con == NULL )
284 {
285 msg->result = XCS_RSLT_BADSESSION;
286 break;
287 }
288 if ( con->data_fd != -1 )
289 {
290 msg->result = XCS_RSLT_CONINUSE;
291 break;
292 }
293 con->data_fd = fd;
294 msg->result = XCS_RSLT_OK;
295 ret = CONNECTED;
296 DPRINTF("Attached data connection\n");
297 break;
299 }
300 case XCS_CONNECT_BYE:
301 {
302 close ( fd );
303 ret = DISCONNECTED;
304 break;
305 }
306 }
308 return ret;
309 }
311 int handle_control_message( connection_t *con, xcs_msg_t *msg )
312 {
313 int ret;
314 int reply_needed = 1;
316 DPRINTF("Got message, type %u.\n", msg->type);
318 switch (msg->type)
319 {
320 case XCS_MSG_BIND:
321 {
322 xcs_bind_msg_t *bmsg = &msg->u.bind;
324 if ( ! BIND_MSG_VALID(bmsg) )
325 {
326 msg->result = XCS_RSLT_BADREQUEST;
327 break;
328 }
330 ret = xcs_bind(con, bmsg->port, bmsg->type);
331 if (ret == 0) {
332 msg->result = XCS_RSLT_OK;
333 } else {
334 msg->result = XCS_RSLT_FAILED;
335 }
336 break;
337 }
338 case XCS_MSG_UNBIND:
339 {
340 xcs_bind_msg_t *bmsg = &msg->u.bind;
342 if ( ! BIND_MSG_VALID(bmsg) )
343 {
344 msg->result = XCS_RSLT_BADREQUEST;
345 break;
346 }
348 ret = xcs_unbind(con, bmsg->port, bmsg->type);
349 if (ret == 0) {
350 msg->result = XCS_RSLT_OK;
351 } else {
352 msg->result = XCS_RSLT_FAILED;
353 }
354 break;
355 }
356 case XCS_VIRQ_BIND:
357 {
358 control_channel_t *cc;
359 xcs_virq_msg_t *vmsg = &msg->u.virq;
360 if ( ! VIRQ_MSG_VALID(vmsg) )
361 {
362 msg->result = XCS_RSLT_BADREQUEST;
363 break;
364 }
366 cc = add_virq(vmsg->virq);
367 if (cc == NULL)
368 {
369 msg->result = XCS_RSLT_FAILED;
370 break;
371 }
372 ret = xcs_bind(con, cc->local_port, TYPE_VIRQ);
373 if (ret == 0) {
374 vmsg->port = cc->local_port;
375 msg->result = XCS_RSLT_OK;
376 } else {
377 msg->result = XCS_RSLT_FAILED;
378 }
379 break;
380 }
382 case XCS_CIF_NEW_CC:
383 {
384 control_channel_t *cc;
385 xcs_interface_msg_t *imsg = &msg->u.interface;
387 if ( ! INTERFACE_MSG_VALID(imsg) )
388 {
389 msg->result = XCS_RSLT_BADREQUEST;
390 break;
391 }
393 cc = add_interface(imsg->dom, imsg->local_port, imsg->remote_port);
394 if (cc != NULL) {
395 get_interface(cc);
396 msg->result = XCS_RSLT_OK;
397 imsg->local_port = cc->local_port;
398 imsg->remote_port = cc->remote_port;
399 } else {
400 msg->result = XCS_RSLT_FAILED;
401 }
402 break;
403 }
405 case XCS_CIF_FREE_CC:
406 {
407 control_channel_t *cc;
408 xcs_interface_msg_t *imsg = &msg->u.interface;
410 if ( ! INTERFACE_MSG_VALID(imsg) )
411 {
412 msg->result = XCS_RSLT_BADREQUEST;
413 break;
414 }
416 cc = add_interface(imsg->dom, imsg->local_port, imsg->remote_port);
417 if (cc != NULL) {
418 put_interface(cc);
419 }
420 msg->result = XCS_RSLT_OK;
421 break;
422 }
423 }
424 return reply_needed;
425 }
427 void handle_data_message( connection_t *con, xcs_msg_t *msg )
428 {
429 control_channel_t *cc;
430 xcs_control_msg_t *cmsg = &msg->u.control;
431 int port;
433 switch (msg->type)
434 {
435 case XCS_REQUEST:
436 if ( cmsg->remote_dom > MAX_DOMS )
437 break;
439 port = dom_to_port[cmsg->remote_dom];
440 if (port == -1) break;
441 cc = cc_list[port];
442 if ((cc != NULL) && ( cc->type == CC_TYPE_INTERDOMAIN ))
443 {
444 DPRINTF("DN:REQ: dom:%d port: %d type: %d\n",
445 cc->remote_dom, cc->local_port,
446 cmsg->msg.type);
447 ctrl_chan_write_request(cc, cmsg);
448 ctrl_chan_notify(cc);
449 } else {
450 DPRINTF("tried to send a REQ to a null cc\n.");
451 }
452 break;
454 case XCS_RESPONSE:
455 if ( cmsg->remote_dom > MAX_DOMS )
456 break;
458 port = dom_to_port[cmsg->remote_dom];
459 if (port == -1) break;
460 cc = cc_list[port];
461 if ((cc != NULL) && ( cc->type == CC_TYPE_INTERDOMAIN ))
462 {
463 DPRINTF("DN:RSP: dom:%d port: %d type: %d\n",
464 cc->remote_dom, cc->local_port,
465 cmsg->msg.type);
466 ctrl_chan_write_response(cc, cmsg);
467 ctrl_chan_notify(cc);
468 }
469 break;
471 case XCS_VIRQ:
472 if ( !(PORT_VALID(cmsg->local_port)) )
473 break;
475 cc = cc_list[cmsg->local_port];
477 if ((cc != NULL) && ( cc->type == CC_TYPE_VIRQ ))
478 {
479 DPRINTF("DN:VIRQ: virq: %d port: %d\n",
480 cc->virq, cc->local_port);
481 ctrl_chan_notify(cc);
482 }
483 break;
484 }
485 }
487 /* ------[ Control interface handler ]-------------------------------------*/
489 /* passed as a function pointer to the lookup. */
490 void send_kmsg(connection_t *c, void *arg)
491 {
492 xcs_msg_t *msg = (xcs_msg_t *)arg;
494 DPRINTF(" -> CONNECTION %d\n", c->data_fd);
495 if (c->data_fd > 0)
496 {
497 send(c->data_fd, msg, sizeof(xcs_msg_t), 0);
498 }
499 }
501 int handle_ctrl_if(void)
502 {
503 control_channel_t *cc;
504 control_msg_t *msg;
505 xcs_msg_t kmsg;
506 int chan, ret;
508 DPRINTF("Event thread kicked!\n");
509 again:
510 while ((chan = evtchn_read()) > 0)
511 {
512 evtchn_unmask(chan);
513 cc = cc_list[chan];
514 if (cc_list[chan] == NULL) {
515 DPRINTF("event from unknown channel (%d)\n", chan);
516 continue;
517 }
519 if ( cc_list[chan]->type == CC_TYPE_VIRQ )
520 {
521 DPRINTF("UP:VIRQ: virq:%d port: %d\n",
522 cc->virq, cc->local_port);
523 kmsg.type = XCS_VIRQ;
524 kmsg.u.control.local_port = cc->local_port;
525 xcs_lookup(cc->local_port, TYPE_VIRQ, send_kmsg, &kmsg);
526 continue;
527 }
529 while (ctrl_chan_request_to_read(cc))
530 {
531 msg = &kmsg.u.control.msg;
532 kmsg.type = XCS_REQUEST;
533 kmsg.u.control.remote_dom = cc->remote_dom;
534 kmsg.u.control.local_port = cc->local_port;
535 ret = ctrl_chan_read_request(cc, &kmsg.u.control);
536 DPRINTF("UP:REQ: dom:%d port: %d type: %d len: %d\n",
537 cc->remote_dom, cc->local_port,
538 msg->type, msg->length);
539 if (ret == 0)
540 xcs_lookup(cc->local_port, msg->type, send_kmsg, &kmsg);
541 }
543 while (ctrl_chan_response_to_read(cc))
544 {
545 msg = &kmsg.u.control.msg;
546 kmsg.type = XCS_RESPONSE;
547 kmsg.u.control.remote_dom = cc->remote_dom;
548 kmsg.u.control.local_port = cc->local_port;
549 ret = ctrl_chan_read_response(cc, &kmsg.u.control);
550 DPRINTF("UP:RSP: dom:%d port: %d type: %d len: %d\n",
551 cc->remote_dom, cc->local_port,
552 msg->type, msg->length);
553 if (ret == 0)
554 xcs_lookup(cc->local_port, msg->type, send_kmsg, &kmsg);
555 }
556 }
558 if (chan == -EINTR)
559 goto again;
561 return chan;
562 }
565 /* ------[ Main xcs code / big select loop ]-------------------------------*/
568 typedef struct unbound_fd_st {
569 int fd;
570 struct timeval born;
571 struct unbound_fd_st *next;
572 } unbound_fd_t;
574 /* This makes ufd point to the next entry in the list, so need to *
575 * break/continue if called while iterating. */
576 void delete_ufd(unbound_fd_t **ufd)
577 {
578 unbound_fd_t *del_ufd;
580 del_ufd = *ufd;
581 *ufd = (*ufd)->next;
582 free( del_ufd );
583 }
585 void gc_ufd_list( unbound_fd_t **ufd )
586 {
587 struct timeval now, delta;
589 gettimeofday(&now, NULL);
591 while ( *ufd != NULL )
592 {
593 timersub(&now, &(*ufd)->born, &delta);
594 if (delta.tv_sec > XCS_UFD_TIMEOUT)
595 {
596 DPRINTF("GC-UFD: closing fd: %d\n", (*ufd)->fd);
597 close((*ufd)->fd);
598 delete_ufd(ufd);
599 continue;
600 }
601 ufd = &(*ufd)->next;
602 }
603 }
605 int main (int argc, char*argv[])
606 {
607 int listen_fd, evtchn_fd;
608 unbound_fd_t *unbound_fd_list = NULL, **ufd;
609 struct timeval timeout = { XCS_GC_INTERVAL, 0 };
610 connection_t **con;
612 /* Initialize xc and event connections. */
613 if (ctrl_chan_init() != 0)
614 {
615 printf("Couldn't open conneciton to libxc.\n");
616 exit(-1);
617 }
619 if ((evtchn_fd = evtchn_open()) < 0)
620 {
621 printf("Couldn't open event channel driver interface.\n");
622 exit(-1);
623 }
625 /* Initialize control interfaces, bindings. */
626 init_interfaces();
627 init_bindings();
629 listen_fd = listen_socket(XCS_TCP_PORT);
631 for (;;)
632 {
633 int n, ret;
634 fd_set rd, wr, er;
635 FD_ZERO ( &rd );
636 FD_ZERO ( &wr );
637 FD_ZERO ( &er );
639 /* TCP listen fd: */
640 FD_SET ( listen_fd, &rd );
641 n = fd_max ( n, listen_fd );
643 /* Evtchn fd: */
644 FD_SET ( evtchn_fd, &rd );
645 n = fd_max ( n, evtchn_fd );
647 /* unbound connection fds: */
648 ufd = &unbound_fd_list;
649 while ((*ufd) != NULL)
650 {
651 FD_SET ( (*ufd)->fd, &rd );
652 n = fd_max ( n, (*ufd)->fd );
653 ufd = &(*ufd)->next;
654 }
656 /* control and data fds: */
657 con = &connection_list;
658 while ((*con) != NULL)
659 {
660 if ((*con)->ctrl_fd > 0)
661 {
662 FD_SET ( (*con)->ctrl_fd, &rd );
663 n = fd_max ( n, (*con)->ctrl_fd );
664 }
665 if ((*con)->data_fd > 0)
666 {
667 FD_SET ( (*con)->data_fd, &rd );
668 n = fd_max ( n, (*con)->data_fd );
669 }
670 con = &(*con)->next;
671 }
673 ret = select ( n + 1, &rd, &wr, &er, &timeout );
675 if ( (timeout.tv_sec == 0) && (timeout.tv_usec == 0) )
676 {
677 gc_ufd_list(&unbound_fd_list);
678 gc_connection_list();
679 timeout.tv_sec = XCS_GC_INTERVAL;
680 }
682 if ( (ret == -1) && (errno == EINTR) )
683 continue;
684 if ( ret < 0 )
685 {
686 perror ("select()");
687 exit(-1);
688 }
690 /* CASE 1: Events arriving on /dev/evtchn. */
692 if ( FD_ISSET (evtchn_fd, &rd ))
693 handle_ctrl_if();
695 /* CASE 2: New connection on the listen port. */
696 if ( FD_ISSET ( listen_fd, &rd ))
697 {
698 struct sockaddr_in remote_addr;
699 int size;
700 memset (&remote_addr, 0, sizeof (remote_addr));
701 size = sizeof remote_addr;
702 ret = accept(listen_fd, (struct sockaddr *)&remote_addr, &size);
703 if ( ret < 0 )
704 {
705 perror("accept()");
706 } else {
707 unbound_fd_t *new_ufd;
709 new_ufd = (unbound_fd_t *)malloc(sizeof(*new_ufd));
711 if (new_ufd != NULL)
712 {
713 gettimeofday(&new_ufd->born, NULL);
714 new_ufd->fd = ret;
715 new_ufd->next = unbound_fd_list;
716 unbound_fd_list = new_ufd;
717 } else {
718 perror("malloc unbound connection");
719 close(ret);
720 }
721 }
722 }
724 /* CASE 3a: Handle messages on control connections. */
726 con = &connection_list;
727 while ( *con != NULL )
728 {
729 if ( ((*con)->ctrl_fd > 0) && (FD_ISSET((*con)->ctrl_fd, &rd)) )
730 {
731 xcs_msg_t msg;
732 memset (&msg, 0, sizeof(msg));
733 ret = read( (*con)->ctrl_fd, &msg, sizeof(msg) );
735 if ( ret < 0 )
736 {
737 perror("reading ctrl fd.");
738 } else if ( ret == 0 )
739 {
740 DPRINTF("Control connection dropped.\n");
741 close ( (*con)->ctrl_fd );
742 (*con)->ctrl_fd = -1;
743 gettimeofday(&(*con)->disconnect_time, NULL);
744 } else
745 {
746 if ( ret != sizeof(msg) )
747 {
748 DPRINTF("Unexpected frame size!\n");
749 continue;
750 }
752 ret = handle_control_message( *con, &msg );
754 if ( ret == 1 )
755 send( (*con)->ctrl_fd, &msg, sizeof(msg), 0 );
756 }
757 }
758 con = &(*con)->next;
759 }
761 /* CASE 3b: Handle messages on data connections. */
763 con = &connection_list;
764 while ( *con != NULL )
765 {
766 if ( ((*con)->data_fd > 0) && (FD_ISSET((*con)->data_fd, &rd)) )
767 {
768 xcs_msg_t msg;
769 memset (&msg, 0, sizeof(msg));
770 ret = read( (*con)->data_fd, &msg, sizeof(msg) );
772 if ( ret < 0 )
773 {
774 perror("reading data fd.");
775 } else if ( ret == 0 )
776 {
777 DPRINTF("Data connection dropped.\n");
778 close ( (*con)->data_fd );
779 (*con)->data_fd = -1;
780 gettimeofday(&(*con)->disconnect_time, NULL);
781 } else
782 {
783 if ( ret != sizeof(msg) )
784 {
785 DPRINTF("Unexpected frame size!\n");
786 continue;
787 }
789 handle_data_message( *con, &msg );
790 }
791 }
792 con = &(*con)->next;
793 }
795 /* CASE 3c: Handle messages arriving on unbound connections. */
796 ufd = &unbound_fd_list;
797 while ((*ufd) != NULL)
798 {
799 if ( FD_ISSET( (*ufd)->fd, &rd ) )
800 {
801 xcs_msg_t msg;
802 memset (&msg, 0, sizeof(msg));
803 ret = read( (*ufd)->fd, &msg, sizeof(msg) );
805 if ( ret == 0 )
806 {
807 close ( (*ufd)->fd );
808 delete_ufd(ufd);
809 continue; /* we just advanced ufd */
810 } else {
811 if ( ret != sizeof(msg) )
812 {
813 DPRINTF("Unexpected frame size!\n");
814 continue;
815 }
817 ret = handle_connect_msg( &msg, (*ufd)->fd );
819 if ( (ret == CONNECTED) || (ret == NO_CHANGE) )
820 send( (*ufd)->fd, &msg, sizeof(msg), 0 );
822 if ( (ret = CONNECTED) || (ret = DISCONNECTED) )
823 {
824 delete_ufd( ufd );
825 continue;
826 }
827 }
828 }
829 ufd = &(*ufd)->next;
830 }
831 }
832 }