--- evolution-data-server-1.8.0/libedataserver/e-msgport.c.emsgport 2006-08-07 00:48:26.000000000 -0400 +++ evolution-data-server-1.8.0/libedataserver/e-msgport.c 2006-09-21 13:14:38.000000000 -0400 @@ -486,29 +486,19 @@ } struct _EMsgPort { - EDList queue; - int condwait; /* how many waiting in condwait */ - union { - int pipe[2]; /* On Win32 actually a pair of SOCKETs */ - struct { - int read; - int write; - } fd; - } pipe; + GAsyncQueue *queue; + EMsg *cache; + gint pipe[2]; /* on Win32, actually a pair of SOCKETs */ #ifdef HAVE_NSS - union { - PRFileDesc *pipe[2]; - struct { - PRFileDesc *read; - PRFileDesc *write; - } fd; - } prpipe; -#endif - /* @#@$#$ glib stuff */ - GCond *cond; - GMutex *lock; + PRFileDesc *prpipe[2]; +#endif }; +/* message flags */ +enum { + MSG_FLAG_SYNC_WITH_PIPE = 1 << 0, + MSG_FLAG_SYNC_WITH_PR_PIPE = 1 << 1 +}; #ifdef HAVE_NSS static int @@ -529,189 +519,237 @@ } #endif -EMsgPort *e_msgport_new(void) +static void +msgport_sync_with_pipe (gint fd) { - EMsgPort *mp; + gchar buffer[1]; + + while (fd >= 0) { + if (E_READ (fd, buffer, 1) > 0) + break; + else if (!E_IS_STATUS_INTR ()) { + g_warning ("%s: Failed to read from pipe: %s", + G_STRFUNC, g_strerror (errno)); + break; + } + } +} - mp = g_malloc(sizeof(*mp)); - e_dlist_init(&mp->queue); - mp->lock = g_mutex_new(); - mp->cond = g_cond_new(); - e_pipe (mp->pipe.pipe); #ifdef HAVE_NSS - e_prpipe (mp->prpipe.pipe); +static void +msgport_sync_with_prpipe (PRFileDesc *prfd) +{ + gchar buffer[1]; + + while (prfd != NULL) { + if (PR_Read (prfd, buffer, 1) > 0) + break; + else if (PR_GetError () != PR_PENDING_INTERRUPT_ERROR) { + gchar *text = g_alloca (PR_GetErrorTextLength ()); + PR_GetErrorText (text); + g_warning ("%s: Failed to read from NSPR pipe: %s", + G_STRFUNC, text); + break; + } + } +} #endif - mp->condwait = 0; - return mp; +EMsgPort * +e_msgport_new (void) +{ + EMsgPort *msgport; + + msgport = g_slice_new (EMsgPort); + msgport->queue = g_async_queue_new (); + msgport->cache = NULL; + msgport->pipe[0] = -1; + msgport->pipe[1] = -1; +#ifdef HAVE_NSS + msgport->prpipe[0] = NULL; + msgport->prpipe[1] = NULL; +#endif + + return msgport; } -void e_msgport_destroy(EMsgPort *mp) +void +e_msgport_destroy (EMsgPort *msgport) { - g_mutex_free(mp->lock); - g_cond_free(mp->cond); - if (mp->pipe.fd.read != -1) { - E_CLOSE(mp->pipe.fd.read); - E_CLOSE(mp->pipe.fd.write); + g_return_if_fail (msgport != NULL); + + if (msgport->pipe[0] >= 0) { + E_CLOSE (msgport->pipe[0]); + E_CLOSE (msgport->pipe[1]); } #ifdef HAVE_NSS - if (mp->prpipe.fd.read) { - PR_Close(mp->prpipe.fd.read); - PR_Close(mp->prpipe.fd.write); + if (msgport->prpipe[0] != NULL) { + PR_Close (msgport->prpipe[0]); + PR_Close (msgport->prpipe[1]); } #endif - g_free(mp); + + g_async_queue_unref (msgport->queue); + g_slice_free (EMsgPort, msgport); } -/* get a fd that can be used to wait on the port asynchronously */ -int e_msgport_fd(EMsgPort *mp) +int +e_msgport_fd (EMsgPort *msgport) { - return mp->pipe.fd.read; + gint fd; + + g_return_val_if_fail (msgport != NULL, -1); + + g_async_queue_lock (msgport->queue); + fd = msgport->pipe[0]; + if (fd < 0 && e_pipe (msgport->pipe) == 0) + fd = msgport->pipe[0]; + g_async_queue_unlock (msgport->queue); + + return fd; } #ifdef HAVE_NSS -PRFileDesc *e_msgport_prfd(EMsgPort *mp) +PRFileDesc * +e_msgport_prfd (EMsgPort *msgport) { - return mp->prpipe.fd.read; + PRFileDesc *prfd; + + g_return_val_if_fail (msgport != NULL, NULL); + + g_async_queue_lock (msgport->queue); + prfd = msgport->prpipe[0]; + if (prfd == NULL && e_prpipe (msgport->prpipe) == 0) + prfd = msgport->prpipe[0]; + g_async_queue_unlock (msgport->queue); + + return prfd; } #endif -void e_msgport_put(EMsgPort *mp, EMsg *msg) +void +e_msgport_put (EMsgPort *msgport, EMsg *msg) { + gint fd; #ifdef HAVE_NSS PRFileDesc *prfd; #endif - ssize_t w; - int fd; - - m(printf("put:\n")); - g_mutex_lock(mp->lock); - e_dlist_addtail(&mp->queue, &msg->ln); - if (mp->condwait > 0) { - m(printf("put: condwait > 0, waking up\n")); - g_cond_signal(mp->cond); + + g_return_if_fail (msgport != NULL); + g_return_if_fail (msg != NULL); + + g_async_queue_lock (msgport->queue); + + msg->flags = 0; + + fd = msgport->pipe[1]; + while (fd >= 0) { + if (E_WRITE (fd, "E", 1) > 0) { + msg->flags |= MSG_FLAG_SYNC_WITH_PIPE; + break; + } else if (!E_IS_STATUS_INTR ()) { + g_warning ("%s: Failed to write to pipe: %s", + G_STRFUNC, g_strerror (errno)); + break; + } } - - fd = mp->pipe.fd.write; -#ifdef HAVE_NSS - prfd = mp->prpipe.fd.write; -#endif - g_mutex_unlock(mp->lock); #ifdef HAVE_NSS - if (prfd != NULL) { - m(printf("put: have pr pipe, writing notification to it\n")); - do { - w = PR_Write (prfd, "E", 1); - } while (w == -1 && PR_GetError () == PR_PENDING_INTERRUPT_ERROR); + prfd = msgport->prpipe[1]; + while (prfd != NULL) { + if (PR_Write (prfd, "E", 1) > 0) { + msg->flags |= MSG_FLAG_SYNC_WITH_PR_PIPE; + break; + } else if (PR_GetError () != PR_PENDING_INTERRUPT_ERROR) { + gchar *text = g_alloca (PR_GetErrorTextLength ()); + PR_GetErrorText (text); + g_warning ("%s: Failed to write to NSPR pipe: %s", + G_STRFUNC, text); + break; + } } #endif - if (fd != -1) { - m(printf("put: have pipe, writing notification to it\n")); - do { - w = E_WRITE (fd, "E", 1); - } while (w == -1 && E_IS_STATUS_INTR ()); - } - m(printf("put: done\n")); + g_async_queue_push_unlocked (msgport->queue, msg); + g_async_queue_unlock (msgport->queue); } -static void -msgport_cleanlock(void *data) +EMsg * +e_msgport_wait (EMsgPort *msgport) { - EMsgPort *mp = data; + EMsg *msg; - g_mutex_unlock(mp->lock); -} + g_return_val_if_fail (msgport != NULL, NULL); -EMsg *e_msgport_wait(EMsgPort *mp) -{ - EMsg *msg; + g_async_queue_lock (msgport->queue); - m(printf("wait:\n")); - g_mutex_lock(mp->lock); - while (e_dlist_empty(&mp->queue)) { - if (mp->pipe.fd.read != -1) { - fd_set rfds; - int retry; - - m(printf("wait: waiting on pipe\n")); - g_mutex_unlock(mp->lock); - do { - FD_ZERO(&rfds); - FD_SET(mp->pipe.fd.read, &rfds); - retry = E_IS_SOCKET_ERROR(select(mp->pipe.fd.read+1, &rfds, NULL, NULL, NULL)) && E_IS_STATUS_INTR(); - pthread_testcancel(); - } while (retry); - g_mutex_lock(mp->lock); - m(printf("wait: got pipe\n")); -#ifdef HAVE_NSS - } else if (mp->prpipe.fd.read != NULL) { - PRPollDesc rfds[1]; - int retry; - - m(printf("wait: waitng on pr pipe\n")); - g_mutex_unlock(mp->lock); - do { - rfds[0].fd = mp->prpipe.fd.read; - rfds[0].in_flags = PR_POLL_READ|PR_POLL_ERR; - retry = PR_Poll(rfds, 1, PR_INTERVAL_NO_TIMEOUT) == -1 && PR_GetError() == PR_PENDING_INTERRUPT_ERROR; - pthread_testcancel(); - } while (retry); - g_mutex_lock(mp->lock); - m(printf("wait: got pr pipe\n")); -#endif /* HAVE_NSS */ - } else { - m(printf("wait: waiting on condition\n")); - mp->condwait++; - /* if we are cancelled in the cond-wait, then we need to unlock our lock when we cleanup */ - pthread_cleanup_push(msgport_cleanlock, mp); - g_cond_wait(mp->cond, mp->lock); - pthread_cleanup_pop(0); - m(printf("wait: got condition\n")); - mp->condwait--; - } + /* check the cache first */ + if (msgport->cache != NULL) { + msg = msgport->cache; + /* don't clear the cache */ + g_async_queue_unlock (msgport->queue); + return msg; } - msg = (EMsg *)mp->queue.head; - m(printf("wait: message = %p\n", msg)); - g_mutex_unlock(mp->lock); - m(printf("wait: done\n")); + + msg = g_async_queue_pop_unlocked (msgport->queue); + + g_assert (msg != NULL); + + /* The message is not actually "removed" from the EMsgPort until + * e_msgport_get() is called. So we cache the popped message. */ + msgport->cache = msg; + + if (msg->flags & MSG_FLAG_SYNC_WITH_PIPE) + msgport_sync_with_pipe (msgport->pipe[0]); +#ifdef HAVE_NSS + if (msg->flags & MSG_FLAG_SYNC_WITH_PR_PIPE) + msgport_sync_with_prpipe (msgport->prpipe[0]); +#endif + + g_async_queue_unlock (msgport->queue); + return msg; } -EMsg *e_msgport_get(EMsgPort *mp) +EMsg * +e_msgport_get (EMsgPort *msgport) { EMsg *msg; - char dummy[1]; - ssize_t n; - - g_mutex_lock(mp->lock); - msg = (EMsg *)e_dlist_remhead(&mp->queue); - if (msg) { - if (mp->pipe.fd.read != -1) { - do { - n = E_READ (mp->pipe.fd.read, dummy, 1); - } while (n == -1 && E_IS_STATUS_INTR ()); - } + + g_return_val_if_fail (msgport != NULL, NULL); + + g_async_queue_lock (msgport->queue); + + /* check the cache first */ + if (msgport->cache != NULL) { + msg = msgport->cache; + msgport->cache = NULL; + g_async_queue_unlock (msgport->queue); + return msg; + } + + msg = g_async_queue_try_pop_unlocked (msgport->queue); + + if (msg != NULL && msg->flags & MSG_FLAG_SYNC_WITH_PIPE) + msgport_sync_with_pipe (msgport->pipe[0]); #ifdef HAVE_NSS - if (mp->prpipe.fd.read != NULL) { - do { - n = PR_Read (mp->prpipe.fd.read, dummy, 1); - } while (n == -1 && PR_GetError () == PR_PENDING_INTERRUPT_ERROR); - } + if (msg != NULL && msg->flags & MSG_FLAG_SYNC_WITH_PR_PIPE) + msgport_sync_with_prpipe (msgport->prpipe[0]); #endif - } - m(printf("get: message = %p\n", msg)); - g_mutex_unlock(mp->lock); + + g_async_queue_unlock (msgport->queue); return msg; } -void e_msgport_reply(EMsg *msg) +void +e_msgport_reply (EMsg *msg) { - if (msg->reply_port) { - e_msgport_put(msg->reply_port, msg); - } + g_return_if_fail (msg != NULL); + + if (msg->reply_port) + e_msgport_put (msg->reply_port, msg); + /* else lost? */ } @@ -1099,7 +1137,7 @@ switch(e->type) { case E_THREAD_QUEUE: /* if the queue is full, lose this new addition */ - if (e_dlist_length(&e->server_port->queue) < e->queue_limit) { + if (g_async_queue_length(e->server_port->queue) < e->queue_limit) { e_msgport_put(e->server_port, msg); } else { printf("queue limit reached, dropping new message\n"); @@ -1108,7 +1146,7 @@ break; case E_THREAD_DROP: /* if the queue is full, lose the oldest (unprocessed) message */ - if (e_dlist_length(&e->server_port->queue) < e->queue_limit) { + if (g_async_queue_length(e->server_port->queue) < e->queue_limit) { e_msgport_put(e->server_port, msg); } else { printf("queue limit reached, dropping old message\n"); --- evolution-data-server-1.8.0/libedataserver/e-msgport.h.emsgport 2004-12-02 22:33:06.000000000 -0500 +++ evolution-data-server-1.8.0/libedataserver/e-msgport.h 2006-09-21 13:10:43.000000000 -0400 @@ -56,6 +56,7 @@ typedef struct _EMsg { EDListNode ln; EMsgPort *reply_port; + gint flags; } EMsg; EMsgPort *e_msgport_new(void);