ia64/xen-unstable

changeset 4329:9dd7bd4776a1

bitkeeper revision 1.1236.43.12 (4244309dO7HfNtv-R6F5S3jdQqQR8A)

Enhanced concurrency support in blockstore.

Signed-off-by: James Bulpin <James.Bulpin@cl.cam.ac.uk>
author jrb44@plym.cl.cam.ac.uk
date Fri Mar 25 15:39:09 2005 +0000 (2005-03-25)
parents f71b1114a5c3
children 1fe76eb5830b 9664916ba37e
files tools/blktap/Makefile tools/blktap/blktaplib.c tools/blktap/blockstore.c tools/blktap/parallax-threaded.h
line diff
     1.1 --- a/tools/blktap/Makefile	Fri Mar 25 14:00:59 2005 +0000
     1.2 +++ b/tools/blktap/Makefile	Fri Mar 25 15:39:09 2005 +0000
     1.3 @@ -58,7 +58,7 @@ OBJS     = $(patsubst %.c,%.o,$(SRCS))
     1.4  
     1.5  LIB      = libblktap.so libblktap.so.$(MAJOR) libblktap.so.$(MAJOR).$(MINOR)
     1.6  
     1.7 -all: mk-symlinks blkdump blkcow blkimg blkcowimg blkgnbd blkcowgnbd $(VDI_TOOLS) parallax 
     1.8 +all: mk-symlinks blkdump blkcow blkimg blkcowimg blkgnbd blkcowgnbd $(VDI_TOOLS) parallax parallax-threaded blockstored
     1.9  	$(MAKE) $(LIB)
    1.10  
    1.11  LINUX_ROOT := $(wildcard $(XEN_ROOT)/linux-2.6.*-xen-sparse)
    1.12 @@ -120,42 +120,42 @@ blkaio: $(LIB) blkaio.c blkaiolib.c
    1.13  	$(CC) $(CFLAGS) -o blkaio -L$(XEN_LIBXC) -L$(XEN_LIBXUTIL) -L. -lblktap blkaio.c blkaiolib.c -laio -lpthread
    1.14  
    1.15  parallax: $(LIB) $(PLX_SRCS)
    1.16 -	$(CC) $(CFLAGS) -o parallax -L$(XEN_LIBXC) -L$(XEN_LIBXUTIL) -L. -lblktap $(PLX_SRCS) libgnbd/libgnbd.a
    1.17 +	$(CC) $(CFLAGS) -o parallax -L$(XEN_LIBXC) -L$(XEN_LIBXUTIL) -L. -lblktap -lpthread $(PLX_SRCS) libgnbd/libgnbd.a
    1.18  
    1.19  parallax-threaded: $(LIB) $(PLXT_SRCS)
    1.20  	$(CC) $(CFLAGS) -o parallax-threaded -L$(XEN_LIBXC) -L$(XEN_LIBXUTIL) -L. -lpthread -lblktap $(PLXT_SRCS) libgnbd/libgnbd.a
    1.21  
    1.22  vdi_test: $(LIB) $(VDI_SRCS)
    1.23 -	$(CC) $(CFLAGS) -g3 -o vdi_test -DVDI_STANDALONE $(VDI_SRCS)
    1.24 +	$(CC) $(CFLAGS) -g3 -o vdi_test -DVDI_STANDALONE -lpthread $(VDI_SRCS)
    1.25  
    1.26  vdi_list: $(LIB) vdi_list.c $(VDI_SRCS)
    1.27 -	$(CC) $(CFLAGS) -g3 -o vdi_list vdi_list.c $(VDI_SRCS)
    1.28 +	$(CC) $(CFLAGS) -g3 -o vdi_list vdi_list.c -lpthread $(VDI_SRCS)
    1.29  
    1.30  vdi_create: $(LIB) vdi_create.c $(VDI_SRCS)
    1.31 -	$(CC) $(CFLAGS) -g3 -o vdi_create vdi_create.c $(VDI_SRCS)
    1.32 +	$(CC) $(CFLAGS) -g3 -o vdi_create vdi_create.c -lpthread $(VDI_SRCS)
    1.33  
    1.34  vdi_snap: $(LIB) vdi_snap.c $(VDI_SRCS)
    1.35 -	$(CC) $(CFLAGS) -g3 -o vdi_snap vdi_snap.c $(VDI_SRCS)
    1.36 +	$(CC) $(CFLAGS) -g3 -o vdi_snap vdi_snap.c -lpthread $(VDI_SRCS)
    1.37  
    1.38  vdi_snap_list: $(LIB) vdi_snap_list.c $(VDI_SRCS)
    1.39 -	$(CC) $(CFLAGS) -g3 -o vdi_snap_list vdi_snap_list.c $(VDI_SRCS)
    1.40 +	$(CC) $(CFLAGS) -g3 -o vdi_snap_list vdi_snap_list.c -lpthread $(VDI_SRCS)
    1.41  
    1.42  vdi_snap_delete: $(LIB) vdi_snap_delete.c $(VDI_SRCS)
    1.43 -	$(CC) $(CFLAGS) -g3 -o vdi_snap_delete vdi_snap_delete.c $(VDI_SRCS)
    1.44 +	$(CC) $(CFLAGS) -g3 -o vdi_snap_delete vdi_snap_delete.c -lpthread $(VDI_SRCS)
    1.45  
    1.46  vdi_tree: $(LIB) vdi_tree.c $(VDI_SRCS)
    1.47 -	$(CC) $(CFLAGS) -g3 -o vdi_tree vdi_tree.c $(VDI_SRCS)
    1.48 +	$(CC) $(CFLAGS) -g3 -o vdi_tree vdi_tree.c -lpthread $(VDI_SRCS)
    1.49  
    1.50  vdi_fill: $(LIB) vdi_fill.c $(VDI_SRCS)
    1.51 -	$(CC) $(CFLAGS) -g3 -o vdi_fill vdi_fill.c $(VDI_SRCS)
    1.52 +	$(CC) $(CFLAGS) -g3 -o vdi_fill vdi_fill.c -lpthread $(VDI_SRCS)
    1.53  
    1.54  vdi_validate: $(LIB) vdi_validate.c $(VDI_SRCS)
    1.55 -	$(CC) $(CFLAGS) -g3 -o vdi_validate vdi_validate.c $(VDI_SRCS)
    1.56 +	$(CC) $(CFLAGS) -g3 -o vdi_validate vdi_validate.c -lpthread $(VDI_SRCS)
    1.57  
    1.58  blockstored: blockstored.c
    1.59 -	$(CC) $(CFLAGS) -g3 -o blockstored blockstored.c
    1.60 +	$(CC) $(CFLAGS) -g3 -o blockstored -lpthread blockstored.c
    1.61  bstest: bstest.c blockstore.c
    1.62 -	$(CC) $(CFLAGS) -g3 -o bstest bstest.c blockstore.c
    1.63 +	$(CC) $(CFLAGS) -g3 -o bstest bstest.c -lpthread blockstore.c
    1.64  
    1.65  .PHONY: TAGS clean install mk-symlinks rpm
    1.66  TAGS:
     2.1 --- a/tools/blktap/blktaplib.c	Fri Mar 25 14:00:59 2005 +0000
     2.2 +++ b/tools/blktap/blktaplib.c	Fri Mar 25 15:39:09 2005 +0000
     2.3 @@ -248,12 +248,21 @@ static void apply_rsp_hooks(blkif_respon
     2.4      }
     2.5  }
     2.6  
     2.7 +static pthread_mutex_t push_mutex = PTHREAD_MUTEX_INITIALIZER;
     2.8 +
     2.9  void blktap_inject_response(blkif_response_t *rsp)
    2.10  {
    2.11 +    
    2.12      apply_rsp_hooks(rsp);
    2.13 +    
    2.14      write_rsp_to_fe_ring(rsp);
    2.15 +    
    2.16 +    pthread_mutex_lock(&push_mutex);
    2.17 +    
    2.18      RING_PUSH_RESPONSES(&fe_ring);
    2.19      ioctl(fd, BLKTAP_IOCTL_KICK_FE);
    2.20 +    
    2.21 +    pthread_mutex_unlock(&push_mutex);
    2.22  }
    2.23  
    2.24  /*-----[ Polling fd listeners ]------------------------------------------*/
    2.25 @@ -449,7 +458,9 @@ int blktap_listen(void)
    2.26              }
    2.27              /* Using this as a unidirectional ring. */
    2.28              ctrl_ring.req_cons = ctrl_ring.rsp_prod_pvt = i;
    2.29 +pthread_mutex_lock(&push_mutex);
    2.30              RING_PUSH_RESPONSES(&ctrl_ring);
    2.31 +pthread_mutex_unlock(&push_mutex);
    2.32              
    2.33              /* empty the fe_ring */
    2.34              notify_fe = 0;
    2.35 @@ -517,14 +528,18 @@ int blktap_listen(void)
    2.36  
    2.37              if (notify_be) {
    2.38                  DPRINTF("notifying be\n");
    2.39 +pthread_mutex_lock(&push_mutex);
    2.40                  RING_PUSH_REQUESTS(&be_ring);
    2.41                  ioctl(fd, BLKTAP_IOCTL_KICK_BE);
    2.42 +pthread_mutex_unlock(&push_mutex);
    2.43              }
    2.44  
    2.45              if (notify_fe) {
    2.46                  DPRINTF("notifying fe\n");
    2.47 +pthread_mutex_lock(&push_mutex);
    2.48                  RING_PUSH_RESPONSES(&fe_ring);
    2.49                  ioctl(fd, BLKTAP_IOCTL_KICK_FE);
    2.50 +pthread_mutex_unlock(&push_mutex);
    2.51              }
    2.52          }        
    2.53      }
     3.1 --- a/tools/blktap/blockstore.c	Fri Mar 25 14:00:59 2005 +0000
     3.2 +++ b/tools/blktap/blockstore.c	Fri Mar 25 15:39:09 2005 +0000
     3.3 @@ -13,13 +13,16 @@
     3.4  #include <string.h>
     3.5  #include <sys/types.h>
     3.6  #include <sys/stat.h>
     3.7 +#include <sys/time.h>
     3.8  #include <stdarg.h>
     3.9  #include "blockstore.h"
    3.10  #include <pthread.h>
    3.11  #include "parallax-threaded.h"
    3.12  
    3.13  #define BLOCKSTORE_REMOTE
    3.14 -#define BSDEBUG
    3.15 +//#define BSDEBUG
    3.16 +
    3.17 +#define RETRY_TIMEOUT 1000000 /* microseconds */
    3.18  
    3.19  /*****************************************************************************
    3.20   * Debugging
    3.21 @@ -63,6 +66,37 @@ struct sockaddr_in sin_local;
    3.22  int bssock = 0;
    3.23  
    3.24  /*****************************************************************************
    3.25 + * Notification                                                              *
    3.26 + *****************************************************************************/
    3.27 +
    3.28 +typedef struct pool_thread_t_struct {
    3.29 +    pthread_mutex_t ptmutex;
    3.30 +    pthread_cond_t ptcv;
    3.31 +    int newdata;
    3.32 +} pool_thread_t;
    3.33 +
    3.34 +pool_thread_t pool_thread[READ_POOL_SIZE+1];
    3.35 +
    3.36 +#define RECV_NOTIFY(tid) { \
    3.37 +    pthread_mutex_lock(&(pool_thread[tid].ptmutex)); \
    3.38 +    pool_thread[tid].newdata = 1; \
    3.39 +    DB("CV Waking %u", tid); \
    3.40 +    pthread_cond_signal(&(pool_thread[tid].ptcv)); \
    3.41 +    pthread_mutex_unlock(&(pool_thread[tid].ptmutex)); }
    3.42 +#define RECV_AWAIT(tid) { \
    3.43 +    pthread_mutex_lock(&(pool_thread[tid].ptmutex)); \
    3.44 +    if (pool_thread[tid].newdata) { \
    3.45 +        pool_thread[tid].newdata = 0; \
    3.46 +        DB("CV Woken %u", tid); \
    3.47 +    } \
    3.48 +    else { \
    3.49 +        DB("CV Waiting %u", tid); \
    3.50 +        pthread_cond_wait(&(pool_thread[tid].ptcv), \
    3.51 +                          &(pool_thread[tid].ptmutex)); \
    3.52 +    } \
    3.53 +    pthread_mutex_unlock(&(pool_thread[tid].ptmutex)); }
    3.54 +
    3.55 +/*****************************************************************************
    3.56   * Message queue management                                                  *
    3.57   *****************************************************************************/
    3.58  
    3.59 @@ -76,23 +110,6 @@ pthread_mutex_t ptmutex_recv;
    3.60  #define ENTER_RECV_CR pthread_mutex_lock(&ptmutex_recv)
    3.61  #define LEAVE_RECV_CR pthread_mutex_unlock(&ptmutex_recv)
    3.62  
    3.63 -int notify = 0;
    3.64 -pthread_mutex_t ptmutex_notify;
    3.65 -pthread_cond_t ptcv_notify;
    3.66 -#define RECV_NOTIFY { \
    3.67 -    pthread_mutex_lock(&ptmutex_notify); \
    3.68 -    notify = 1; \
    3.69 -    pthread_cond_signal(&ptcv_notify); \
    3.70 -    pthread_mutex_unlock(&ptmutex_notify); }
    3.71 -#define RECV_AWAIT { \
    3.72 -    pthread_mutex_lock(&ptmutex_notify); \
    3.73 -    if (notify) \
    3.74 -        notify = 0; \
    3.75 -    else \
    3.76 -        pthread_cond_wait(&ptcv_notify, &ptmutex_notify); \
    3.77 -    pthread_mutex_unlock(&ptmutex_notify); }
    3.78 -    
    3.79 -
    3.80  /* A message queue entry. We allocate one of these for every request we send.
    3.81   * Asynchronous reply reception also used one of these.
    3.82   */
    3.83 @@ -104,6 +121,8 @@ typedef struct bsq_t_struct {
    3.84      int length;
    3.85      struct msghdr msghdr;
    3.86      struct iovec iov[2];
    3.87 +    int tid;
    3.88 +    struct timeval tv_sent;
    3.89      bshdr_t message;
    3.90      void *block;
    3.91  } bsq_t;
    3.92 @@ -267,11 +286,13 @@ int send_message(bsq_t *qe) {
    3.93      qe->message.luid = new_luid();
    3.94  
    3.95      qe->status = 0;
    3.96 +    qe->tid = (int)pthread_getspecific(tid_key);
    3.97      if (enqueue(qe) < 0) {
    3.98          fprintf(stderr, "Error enqueuing request.\n");
    3.99          return -1;
   3.100      }
   3.101  
   3.102 +    gettimeofday(&(qe->tv_sent), NULL);
   3.103      DB("send_message to %d luid=%016llx\n", qe->server, qe->message.luid);
   3.104      rc = sendmsg(bssock, &(qe->msghdr), MSG_DONTWAIT);
   3.105      //rc = sendto(bssock, (void *)&(qe->message), qe->length, 0,
   3.106 @@ -407,6 +428,7 @@ void recv_recycle_buffer(bsq_t *q) {
   3.107  int wait_recv(bsq_t **reqs, int numreqs) {
   3.108      bsq_t *q, *m;
   3.109      unsigned int x, i;
   3.110 +    int tid = (int)pthread_getspecific(tid_key);
   3.111  
   3.112      DB("ENTER wait_recv %u\n", numreqs);
   3.113  
   3.114 @@ -420,7 +442,7 @@ int wait_recv(bsq_t **reqs, int numreqs)
   3.115          return numreqs;
   3.116      }
   3.117  
   3.118 -    RECV_AWAIT;
   3.119 +    RECV_AWAIT(tid);
   3.120  
   3.121      /*
   3.122      rxagain:
   3.123 @@ -442,6 +464,52 @@ int wait_recv(bsq_t **reqs, int numreqs)
   3.124  
   3.125  }
   3.126  
   3.127 +/* retry
   3.128 + */
   3.129 +static int retry_count = 0;
   3.130 +int retry(bsq_t *qe)
   3.131 +{
   3.132 +    int rc;
   3.133 +    gettimeofday(&(qe->tv_sent), NULL);
   3.134 +    DB("retry to %d luid=%016llx\n", qe->server, qe->message.luid);
   3.135 +    retry_count++;
   3.136 +    rc = sendmsg(bssock, &(qe->msghdr), MSG_DONTWAIT);
   3.137 +    if (rc < 0)
   3.138 +        return rc;
   3.139 +    return 0;
   3.140 +}
   3.141 +
   3.142 +/* queue runner
   3.143 + */
   3.144 +void *queue_runner(void *arg)
   3.145 +{
   3.146 +    for (;;) {
   3.147 +        struct timeval now;
   3.148 +        long long nowus, sus;
   3.149 +        bsq_t *q;
   3.150 +        int r;
   3.151 +
   3.152 +        sleep(1);
   3.153 +
   3.154 +        gettimeofday(&now, NULL);
   3.155 +        nowus = now.tv_usec + now.tv_sec * 1000000;
   3.156 +        ENTER_QUEUE_CR;
   3.157 +        r = retry_count;
   3.158 +        for (q = bs_head; q; q = q->next) {
   3.159 +            sus = q->tv_sent.tv_usec + q->tv_sent.tv_sec * 1000000;
   3.160 +            if ((nowus - sus) > RETRY_TIMEOUT) {
   3.161 +                if (retry(q) < 0) {
   3.162 +                    fprintf(stderr, "Error on sendmsg retry.\n");
   3.163 +                }
   3.164 +            }
   3.165 +        }
   3.166 +        if (r != retry_count) {
   3.167 +            fprintf(stderr, "RETRIES: %u %u\n", retry_count - r, retry_count);
   3.168 +        }
   3.169 +        LEAVE_QUEUE_CR;
   3.170 +    }
   3.171 +}
   3.172 +
   3.173  /* receive loop
   3.174   */
   3.175  void *receive_loop(void *arg)
   3.176 @@ -461,7 +529,7 @@ void *receive_loop(void *arg)
   3.177              }
   3.178              else {
   3.179                  DB("RX MATCH");
   3.180 -                RECV_NOTIFY;
   3.181 +                RECV_NOTIFY(m->tid);
   3.182              }
   3.183          }
   3.184      }
   3.185 @@ -1146,8 +1214,12 @@ int __init_blockstore(void)
   3.186      pthread_mutex_init(&ptmutex_queue, NULL);
   3.187      pthread_mutex_init(&ptmutex_luid, NULL);
   3.188      pthread_mutex_init(&ptmutex_recv, NULL);
   3.189 -    pthread_mutex_init(&ptmutex_notify, NULL);
   3.190 -    pthread_cond_init(&ptcv_notify, NULL);
   3.191 +    /*pthread_mutex_init(&ptmutex_notify, NULL);*/
   3.192 +    for (i = 0; i <= READ_POOL_SIZE; i++) {
   3.193 +        pool_thread[i].newdata = 0;
   3.194 +        pthread_mutex_init(&(pool_thread[i].ptmutex), NULL);
   3.195 +        pthread_cond_init(&(pool_thread[i].ptcv), NULL);
   3.196 +    }
   3.197  
   3.198      bsservers[0].hostname = "firebug.cl.cam.ac.uk";
   3.199      bsservers[1].hostname = "planb.cl.cam.ac.uk";
   3.200 @@ -1225,6 +1297,7 @@ int __init_blockstore(void)
   3.201      }
   3.202  
   3.203      pthread_create(&pthread_recv, NULL, receive_loop, NULL);
   3.204 +    pthread_create(&pthread_recv, NULL, queue_runner, NULL);
   3.205  
   3.206  #else /* /BLOCKSTORE_REMOTE */
   3.207      block_fp = open("blockstore.dat", O_RDWR | O_CREAT | O_LARGEFILE, 0644);
   3.208 @@ -1262,9 +1335,14 @@ int __init_blockstore(void)
   3.209  
   3.210  void __exit_blockstore(void)
   3.211  {
   3.212 +    int i;
   3.213      pthread_mutex_destroy(&ptmutex_recv);
   3.214      pthread_mutex_destroy(&ptmutex_luid);
   3.215      pthread_mutex_destroy(&ptmutex_queue);
   3.216 -    pthread_mutex_destroy(&ptmutex_notify);
   3.217 -    pthread_cond_destroy(&ptcv_notify);
   3.218 +    /*pthread_mutex_destroy(&ptmutex_notify);
   3.219 +      pthread_cond_destroy(&ptcv_notify);*/
   3.220 +    for (i = 0; i <= READ_POOL_SIZE; i++) {
   3.221 +        pthread_mutex_destroy(&(pool_thread[i].ptmutex));
   3.222 +        pthread_cond_destroy(&(pool_thread[i].ptcv));
   3.223 +    }
   3.224  }
     4.1 --- a/tools/blktap/parallax-threaded.h	Fri Mar 25 14:00:59 2005 +0000
     4.2 +++ b/tools/blktap/parallax-threaded.h	Fri Mar 25 15:39:09 2005 +0000
     4.3 @@ -14,7 +14,8 @@
     4.4  #define NOTHREADS
     4.5  #endif
     4.6  
     4.7 -#define READ_POOL_SIZE 128
     4.8 +//#define READ_POOL_SIZE 128
     4.9 +#define READ_POOL_SIZE 8
    4.10  
    4.11  /* per-thread identifier */
    4.12  pthread_key_t tid_key;