ia64/xen-unstable
changeset 4329:9dd7bd4776a1
bitkeeper revision 1.1236.43.12 (4244309dO7HfNtv-R6F5S3jdQqQR8A)
Enhanced concurrency support in blockstore.
Signed-off-by: James Bulpin <James.Bulpin@cl.cam.ac.uk>
Enhanced concurrency support in blockstore.
Signed-off-by: James Bulpin <James.Bulpin@cl.cam.ac.uk>
author | jrb44@plym.cl.cam.ac.uk |
---|---|
date | Fri Mar 25 15:39:09 2005 +0000 (2005-03-25) |
parents | f71b1114a5c3 |
children | 1fe76eb5830b 9664916ba37e |
files | tools/blktap/Makefile tools/blktap/blktaplib.c tools/blktap/blockstore.c tools/blktap/parallax-threaded.h |
line diff
1.1 --- a/tools/blktap/Makefile Fri Mar 25 14:00:59 2005 +0000 1.2 +++ b/tools/blktap/Makefile Fri Mar 25 15:39:09 2005 +0000 1.3 @@ -58,7 +58,7 @@ OBJS = $(patsubst %.c,%.o,$(SRCS)) 1.4 1.5 LIB = libblktap.so libblktap.so.$(MAJOR) libblktap.so.$(MAJOR).$(MINOR) 1.6 1.7 -all: mk-symlinks blkdump blkcow blkimg blkcowimg blkgnbd blkcowgnbd $(VDI_TOOLS) parallax 1.8 +all: mk-symlinks blkdump blkcow blkimg blkcowimg blkgnbd blkcowgnbd $(VDI_TOOLS) parallax parallax-threaded blockstored 1.9 $(MAKE) $(LIB) 1.10 1.11 LINUX_ROOT := $(wildcard $(XEN_ROOT)/linux-2.6.*-xen-sparse) 1.12 @@ -120,42 +120,42 @@ blkaio: $(LIB) blkaio.c blkaiolib.c 1.13 $(CC) $(CFLAGS) -o blkaio -L$(XEN_LIBXC) -L$(XEN_LIBXUTIL) -L. -lblktap blkaio.c blkaiolib.c -laio -lpthread 1.14 1.15 parallax: $(LIB) $(PLX_SRCS) 1.16 - $(CC) $(CFLAGS) -o parallax -L$(XEN_LIBXC) -L$(XEN_LIBXUTIL) -L. -lblktap $(PLX_SRCS) libgnbd/libgnbd.a 1.17 + $(CC) $(CFLAGS) -o parallax -L$(XEN_LIBXC) -L$(XEN_LIBXUTIL) -L. -lblktap -lpthread $(PLX_SRCS) libgnbd/libgnbd.a 1.18 1.19 parallax-threaded: $(LIB) $(PLXT_SRCS) 1.20 $(CC) $(CFLAGS) -o parallax-threaded -L$(XEN_LIBXC) -L$(XEN_LIBXUTIL) -L. -lpthread -lblktap $(PLXT_SRCS) libgnbd/libgnbd.a 1.21 1.22 vdi_test: $(LIB) $(VDI_SRCS) 1.23 - $(CC) $(CFLAGS) -g3 -o vdi_test -DVDI_STANDALONE $(VDI_SRCS) 1.24 + $(CC) $(CFLAGS) -g3 -o vdi_test -DVDI_STANDALONE -lpthread $(VDI_SRCS) 1.25 1.26 vdi_list: $(LIB) vdi_list.c $(VDI_SRCS) 1.27 - $(CC) $(CFLAGS) -g3 -o vdi_list vdi_list.c $(VDI_SRCS) 1.28 + $(CC) $(CFLAGS) -g3 -o vdi_list vdi_list.c -lpthread $(VDI_SRCS) 1.29 1.30 vdi_create: $(LIB) vdi_create.c $(VDI_SRCS) 1.31 - $(CC) $(CFLAGS) -g3 -o vdi_create vdi_create.c $(VDI_SRCS) 1.32 + $(CC) $(CFLAGS) -g3 -o vdi_create vdi_create.c -lpthread $(VDI_SRCS) 1.33 1.34 vdi_snap: $(LIB) vdi_snap.c $(VDI_SRCS) 1.35 - $(CC) $(CFLAGS) -g3 -o vdi_snap vdi_snap.c $(VDI_SRCS) 1.36 + $(CC) $(CFLAGS) -g3 -o vdi_snap vdi_snap.c -lpthread $(VDI_SRCS) 1.37 1.38 vdi_snap_list: $(LIB) vdi_snap_list.c $(VDI_SRCS) 1.39 - $(CC) $(CFLAGS) -g3 -o vdi_snap_list vdi_snap_list.c $(VDI_SRCS) 1.40 + $(CC) $(CFLAGS) -g3 -o vdi_snap_list vdi_snap_list.c -lpthread $(VDI_SRCS) 1.41 1.42 vdi_snap_delete: $(LIB) vdi_snap_delete.c $(VDI_SRCS) 1.43 - $(CC) $(CFLAGS) -g3 -o vdi_snap_delete vdi_snap_delete.c $(VDI_SRCS) 1.44 + $(CC) $(CFLAGS) -g3 -o vdi_snap_delete vdi_snap_delete.c -lpthread $(VDI_SRCS) 1.45 1.46 vdi_tree: $(LIB) vdi_tree.c $(VDI_SRCS) 1.47 - $(CC) $(CFLAGS) -g3 -o vdi_tree vdi_tree.c $(VDI_SRCS) 1.48 + $(CC) $(CFLAGS) -g3 -o vdi_tree vdi_tree.c -lpthread $(VDI_SRCS) 1.49 1.50 vdi_fill: $(LIB) vdi_fill.c $(VDI_SRCS) 1.51 - $(CC) $(CFLAGS) -g3 -o vdi_fill vdi_fill.c $(VDI_SRCS) 1.52 + $(CC) $(CFLAGS) -g3 -o vdi_fill vdi_fill.c -lpthread $(VDI_SRCS) 1.53 1.54 vdi_validate: $(LIB) vdi_validate.c $(VDI_SRCS) 1.55 - $(CC) $(CFLAGS) -g3 -o vdi_validate vdi_validate.c $(VDI_SRCS) 1.56 + $(CC) $(CFLAGS) -g3 -o vdi_validate vdi_validate.c -lpthread $(VDI_SRCS) 1.57 1.58 blockstored: blockstored.c 1.59 - $(CC) $(CFLAGS) -g3 -o blockstored blockstored.c 1.60 + $(CC) $(CFLAGS) -g3 -o blockstored -lpthread blockstored.c 1.61 bstest: bstest.c blockstore.c 1.62 - $(CC) $(CFLAGS) -g3 -o bstest bstest.c blockstore.c 1.63 + $(CC) $(CFLAGS) -g3 -o bstest bstest.c -lpthread blockstore.c 1.64 1.65 .PHONY: TAGS clean install mk-symlinks rpm 1.66 TAGS:
2.1 --- a/tools/blktap/blktaplib.c Fri Mar 25 14:00:59 2005 +0000 2.2 +++ b/tools/blktap/blktaplib.c Fri Mar 25 15:39:09 2005 +0000 2.3 @@ -248,12 +248,21 @@ static void apply_rsp_hooks(blkif_respon 2.4 } 2.5 } 2.6 2.7 +static pthread_mutex_t push_mutex = PTHREAD_MUTEX_INITIALIZER; 2.8 + 2.9 void blktap_inject_response(blkif_response_t *rsp) 2.10 { 2.11 + 2.12 apply_rsp_hooks(rsp); 2.13 + 2.14 write_rsp_to_fe_ring(rsp); 2.15 + 2.16 + pthread_mutex_lock(&push_mutex); 2.17 + 2.18 RING_PUSH_RESPONSES(&fe_ring); 2.19 ioctl(fd, BLKTAP_IOCTL_KICK_FE); 2.20 + 2.21 + pthread_mutex_unlock(&push_mutex); 2.22 } 2.23 2.24 /*-----[ Polling fd listeners ]------------------------------------------*/ 2.25 @@ -449,7 +458,9 @@ int blktap_listen(void) 2.26 } 2.27 /* Using this as a unidirectional ring. */ 2.28 ctrl_ring.req_cons = ctrl_ring.rsp_prod_pvt = i; 2.29 +pthread_mutex_lock(&push_mutex); 2.30 RING_PUSH_RESPONSES(&ctrl_ring); 2.31 +pthread_mutex_unlock(&push_mutex); 2.32 2.33 /* empty the fe_ring */ 2.34 notify_fe = 0; 2.35 @@ -517,14 +528,18 @@ int blktap_listen(void) 2.36 2.37 if (notify_be) { 2.38 DPRINTF("notifying be\n"); 2.39 +pthread_mutex_lock(&push_mutex); 2.40 RING_PUSH_REQUESTS(&be_ring); 2.41 ioctl(fd, BLKTAP_IOCTL_KICK_BE); 2.42 +pthread_mutex_unlock(&push_mutex); 2.43 } 2.44 2.45 if (notify_fe) { 2.46 DPRINTF("notifying fe\n"); 2.47 +pthread_mutex_lock(&push_mutex); 2.48 RING_PUSH_RESPONSES(&fe_ring); 2.49 ioctl(fd, BLKTAP_IOCTL_KICK_FE); 2.50 +pthread_mutex_unlock(&push_mutex); 2.51 } 2.52 } 2.53 }
3.1 --- a/tools/blktap/blockstore.c Fri Mar 25 14:00:59 2005 +0000 3.2 +++ b/tools/blktap/blockstore.c Fri Mar 25 15:39:09 2005 +0000 3.3 @@ -13,13 +13,16 @@ 3.4 #include <string.h> 3.5 #include <sys/types.h> 3.6 #include <sys/stat.h> 3.7 +#include <sys/time.h> 3.8 #include <stdarg.h> 3.9 #include "blockstore.h" 3.10 #include <pthread.h> 3.11 #include "parallax-threaded.h" 3.12 3.13 #define BLOCKSTORE_REMOTE 3.14 -#define BSDEBUG 3.15 +//#define BSDEBUG 3.16 + 3.17 +#define RETRY_TIMEOUT 1000000 /* microseconds */ 3.18 3.19 /***************************************************************************** 3.20 * Debugging 3.21 @@ -63,6 +66,37 @@ struct sockaddr_in sin_local; 3.22 int bssock = 0; 3.23 3.24 /***************************************************************************** 3.25 + * Notification * 3.26 + *****************************************************************************/ 3.27 + 3.28 +typedef struct pool_thread_t_struct { 3.29 + pthread_mutex_t ptmutex; 3.30 + pthread_cond_t ptcv; 3.31 + int newdata; 3.32 +} pool_thread_t; 3.33 + 3.34 +pool_thread_t pool_thread[READ_POOL_SIZE+1]; 3.35 + 3.36 +#define RECV_NOTIFY(tid) { \ 3.37 + pthread_mutex_lock(&(pool_thread[tid].ptmutex)); \ 3.38 + pool_thread[tid].newdata = 1; \ 3.39 + DB("CV Waking %u", tid); \ 3.40 + pthread_cond_signal(&(pool_thread[tid].ptcv)); \ 3.41 + pthread_mutex_unlock(&(pool_thread[tid].ptmutex)); } 3.42 +#define RECV_AWAIT(tid) { \ 3.43 + pthread_mutex_lock(&(pool_thread[tid].ptmutex)); \ 3.44 + if (pool_thread[tid].newdata) { \ 3.45 + pool_thread[tid].newdata = 0; \ 3.46 + DB("CV Woken %u", tid); \ 3.47 + } \ 3.48 + else { \ 3.49 + DB("CV Waiting %u", tid); \ 3.50 + pthread_cond_wait(&(pool_thread[tid].ptcv), \ 3.51 + &(pool_thread[tid].ptmutex)); \ 3.52 + } \ 3.53 + pthread_mutex_unlock(&(pool_thread[tid].ptmutex)); } 3.54 + 3.55 +/***************************************************************************** 3.56 * Message queue management * 3.57 *****************************************************************************/ 3.58 3.59 @@ -76,23 +110,6 @@ pthread_mutex_t ptmutex_recv; 3.60 #define ENTER_RECV_CR pthread_mutex_lock(&ptmutex_recv) 3.61 #define LEAVE_RECV_CR pthread_mutex_unlock(&ptmutex_recv) 3.62 3.63 -int notify = 0; 3.64 -pthread_mutex_t ptmutex_notify; 3.65 -pthread_cond_t ptcv_notify; 3.66 -#define RECV_NOTIFY { \ 3.67 - pthread_mutex_lock(&ptmutex_notify); \ 3.68 - notify = 1; \ 3.69 - pthread_cond_signal(&ptcv_notify); \ 3.70 - pthread_mutex_unlock(&ptmutex_notify); } 3.71 -#define RECV_AWAIT { \ 3.72 - pthread_mutex_lock(&ptmutex_notify); \ 3.73 - if (notify) \ 3.74 - notify = 0; \ 3.75 - else \ 3.76 - pthread_cond_wait(&ptcv_notify, &ptmutex_notify); \ 3.77 - pthread_mutex_unlock(&ptmutex_notify); } 3.78 - 3.79 - 3.80 /* A message queue entry. We allocate one of these for every request we send. 3.81 * Asynchronous reply reception also used one of these. 3.82 */ 3.83 @@ -104,6 +121,8 @@ typedef struct bsq_t_struct { 3.84 int length; 3.85 struct msghdr msghdr; 3.86 struct iovec iov[2]; 3.87 + int tid; 3.88 + struct timeval tv_sent; 3.89 bshdr_t message; 3.90 void *block; 3.91 } bsq_t; 3.92 @@ -267,11 +286,13 @@ int send_message(bsq_t *qe) { 3.93 qe->message.luid = new_luid(); 3.94 3.95 qe->status = 0; 3.96 + qe->tid = (int)pthread_getspecific(tid_key); 3.97 if (enqueue(qe) < 0) { 3.98 fprintf(stderr, "Error enqueuing request.\n"); 3.99 return -1; 3.100 } 3.101 3.102 + gettimeofday(&(qe->tv_sent), NULL); 3.103 DB("send_message to %d luid=%016llx\n", qe->server, qe->message.luid); 3.104 rc = sendmsg(bssock, &(qe->msghdr), MSG_DONTWAIT); 3.105 //rc = sendto(bssock, (void *)&(qe->message), qe->length, 0, 3.106 @@ -407,6 +428,7 @@ void recv_recycle_buffer(bsq_t *q) { 3.107 int wait_recv(bsq_t **reqs, int numreqs) { 3.108 bsq_t *q, *m; 3.109 unsigned int x, i; 3.110 + int tid = (int)pthread_getspecific(tid_key); 3.111 3.112 DB("ENTER wait_recv %u\n", numreqs); 3.113 3.114 @@ -420,7 +442,7 @@ int wait_recv(bsq_t **reqs, int numreqs) 3.115 return numreqs; 3.116 } 3.117 3.118 - RECV_AWAIT; 3.119 + RECV_AWAIT(tid); 3.120 3.121 /* 3.122 rxagain: 3.123 @@ -442,6 +464,52 @@ int wait_recv(bsq_t **reqs, int numreqs) 3.124 3.125 } 3.126 3.127 +/* retry 3.128 + */ 3.129 +static int retry_count = 0; 3.130 +int retry(bsq_t *qe) 3.131 +{ 3.132 + int rc; 3.133 + gettimeofday(&(qe->tv_sent), NULL); 3.134 + DB("retry to %d luid=%016llx\n", qe->server, qe->message.luid); 3.135 + retry_count++; 3.136 + rc = sendmsg(bssock, &(qe->msghdr), MSG_DONTWAIT); 3.137 + if (rc < 0) 3.138 + return rc; 3.139 + return 0; 3.140 +} 3.141 + 3.142 +/* queue runner 3.143 + */ 3.144 +void *queue_runner(void *arg) 3.145 +{ 3.146 + for (;;) { 3.147 + struct timeval now; 3.148 + long long nowus, sus; 3.149 + bsq_t *q; 3.150 + int r; 3.151 + 3.152 + sleep(1); 3.153 + 3.154 + gettimeofday(&now, NULL); 3.155 + nowus = now.tv_usec + now.tv_sec * 1000000; 3.156 + ENTER_QUEUE_CR; 3.157 + r = retry_count; 3.158 + for (q = bs_head; q; q = q->next) { 3.159 + sus = q->tv_sent.tv_usec + q->tv_sent.tv_sec * 1000000; 3.160 + if ((nowus - sus) > RETRY_TIMEOUT) { 3.161 + if (retry(q) < 0) { 3.162 + fprintf(stderr, "Error on sendmsg retry.\n"); 3.163 + } 3.164 + } 3.165 + } 3.166 + if (r != retry_count) { 3.167 + fprintf(stderr, "RETRIES: %u %u\n", retry_count - r, retry_count); 3.168 + } 3.169 + LEAVE_QUEUE_CR; 3.170 + } 3.171 +} 3.172 + 3.173 /* receive loop 3.174 */ 3.175 void *receive_loop(void *arg) 3.176 @@ -461,7 +529,7 @@ void *receive_loop(void *arg) 3.177 } 3.178 else { 3.179 DB("RX MATCH"); 3.180 - RECV_NOTIFY; 3.181 + RECV_NOTIFY(m->tid); 3.182 } 3.183 } 3.184 } 3.185 @@ -1146,8 +1214,12 @@ int __init_blockstore(void) 3.186 pthread_mutex_init(&ptmutex_queue, NULL); 3.187 pthread_mutex_init(&ptmutex_luid, NULL); 3.188 pthread_mutex_init(&ptmutex_recv, NULL); 3.189 - pthread_mutex_init(&ptmutex_notify, NULL); 3.190 - pthread_cond_init(&ptcv_notify, NULL); 3.191 + /*pthread_mutex_init(&ptmutex_notify, NULL);*/ 3.192 + for (i = 0; i <= READ_POOL_SIZE; i++) { 3.193 + pool_thread[i].newdata = 0; 3.194 + pthread_mutex_init(&(pool_thread[i].ptmutex), NULL); 3.195 + pthread_cond_init(&(pool_thread[i].ptcv), NULL); 3.196 + } 3.197 3.198 bsservers[0].hostname = "firebug.cl.cam.ac.uk"; 3.199 bsservers[1].hostname = "planb.cl.cam.ac.uk"; 3.200 @@ -1225,6 +1297,7 @@ int __init_blockstore(void) 3.201 } 3.202 3.203 pthread_create(&pthread_recv, NULL, receive_loop, NULL); 3.204 + pthread_create(&pthread_recv, NULL, queue_runner, NULL); 3.205 3.206 #else /* /BLOCKSTORE_REMOTE */ 3.207 block_fp = open("blockstore.dat", O_RDWR | O_CREAT | O_LARGEFILE, 0644); 3.208 @@ -1262,9 +1335,14 @@ int __init_blockstore(void) 3.209 3.210 void __exit_blockstore(void) 3.211 { 3.212 + int i; 3.213 pthread_mutex_destroy(&ptmutex_recv); 3.214 pthread_mutex_destroy(&ptmutex_luid); 3.215 pthread_mutex_destroy(&ptmutex_queue); 3.216 - pthread_mutex_destroy(&ptmutex_notify); 3.217 - pthread_cond_destroy(&ptcv_notify); 3.218 + /*pthread_mutex_destroy(&ptmutex_notify); 3.219 + pthread_cond_destroy(&ptcv_notify);*/ 3.220 + for (i = 0; i <= READ_POOL_SIZE; i++) { 3.221 + pthread_mutex_destroy(&(pool_thread[i].ptmutex)); 3.222 + pthread_cond_destroy(&(pool_thread[i].ptcv)); 3.223 + } 3.224 }
4.1 --- a/tools/blktap/parallax-threaded.h Fri Mar 25 14:00:59 2005 +0000 4.2 +++ b/tools/blktap/parallax-threaded.h Fri Mar 25 15:39:09 2005 +0000 4.3 @@ -14,7 +14,8 @@ 4.4 #define NOTHREADS 4.5 #endif 4.6 4.7 -#define READ_POOL_SIZE 128 4.8 +//#define READ_POOL_SIZE 128 4.9 +#define READ_POOL_SIZE 8 4.10 4.11 /* per-thread identifier */ 4.12 pthread_key_t tid_key;