28#define NATS_LIBUV_INCLUDE 
   33#define NATS_LIBUV_ATTACH   (1) 
   34#define NATS_LIBUV_READ     (2) 
   35#define NATS_LIBUV_WRITE    (3) 
   36#define NATS_LIBUV_DETACH   (4) 
   38struct __natsLibuvEvent;
 
   40typedef struct __natsLibuvEvent
 
   44    struct __natsLibuvEvent *next;
 
   53    uv_async_t      *scheduler;
 
   69static uv_once_t    uvOnce = UV_ONCE_INIT;
 
   70static uv_key_t     uvLoopThreadKey;
 
   75    if (uv_key_create(&uvLoopThreadKey) != 0)
 
   92    uv_once(&uvOnce, _initOnce);
 
 
  106    uv_key_set(&uvLoopThreadKey, (
void*) loop);
 
 
  110uvScheduleToEventLoop(natsLibuvEvents *nle, 
int eventType, 
bool add)
 
  112    natsLibuvEvent  *newEvent = NULL;
 
  115    newEvent = (natsLibuvEvent*) malloc(
sizeof(natsLibuvEvent));
 
  116    if (newEvent == NULL)
 
  119    newEvent->type  = eventType;
 
  121    newEvent->next  = NULL;
 
  123    uv_mutex_lock(nle->lock);
 
  125    if (nle->head == NULL)
 
  126        nle->head = newEvent;
 
  128    if (nle->tail != NULL)
 
  129        nle->tail->next = newEvent;
 
  131    nle->tail = newEvent;
 
  139    res = uv_async_send(nle->scheduler);
 
  140    uv_mutex_unlock(nle->lock);
 
  146natsLibuvPoll(uv_poll_t* handle, 
int status, 
int events)
 
  148    natsLibuvEvents *nle = (natsLibuvEvents*)handle->data;
 
  159    if (events & UV_READABLE)
 
  162    if (events & UV_WRITABLE)
 
  167uvHandleClosedCb(uv_handle_t *handle)
 
  173uvPollUpdate(natsLibuvEvents *nle, 
int eventType, 
bool add)
 
  175    if (eventType == NATS_LIBUV_READ)
 
  178            nle->events |= UV_READABLE;
 
  180            nle->events &= ~UV_READABLE;
 
  185            nle->events |= UV_WRITABLE;
 
  187            nle->events &= ~UV_WRITABLE;
 
  192        int res = uv_poll_start(nle->handle, nle->events, natsLibuvPoll);
 
  197    uv_close((uv_handle_t*) nle->handle, uvHandleClosedCb);
 
  208uvAsyncAttach(natsLibuvEvents *nle)
 
  215    nle->handle = (uv_poll_t*) malloc(
sizeof(uv_poll_t));
 
  216    if (nle->handle == NULL)
 
  221#if UV_VERSION_MAJOR <= 1 
  222        if (uv_poll_init_socket(nle->loop, nle->handle, nle->socket) != 0)
 
  224        if (uv_poll_init(nle->loop, nle->handle, nle->socket) != 0)
 
  230        && (nle->handle->data = (
void*) nle)
 
  231        && (uv_poll_start(nle->handle, UV_READABLE, natsLibuvPoll) != 0))
 
  240natsLibuvEvents_free(natsLibuvEvents *nle, 
bool processDetachedEvent)
 
  242    natsLibuvEvent *event;
 
  244    while ((event = nle->head) != NULL)
 
  246        nle->head = 
event->next;
 
  249    free(nle->scheduler);
 
  250    if (nle->lock != NULL)
 
  252        uv_mutex_destroy(nle->lock);
 
  255    if (processDetachedEvent)
 
  261uvFinalCloseCb(uv_handle_t* handle)
 
  263    natsLibuvEvents *nle = (natsLibuvEvents*) handle->data;
 
  264    natsLibuvEvents_free(nle, 
true);
 
  268uvAsyncDetach(natsLibuvEvents *nle)
 
  270    uv_close((uv_handle_t*) nle->scheduler, uvFinalCloseCb);
 
  274uvAsyncCb(uv_async_t *handle)
 
  276    natsLibuvEvents *nle    = (natsLibuvEvents*) handle->data;
 
  278    natsLibuvEvent  *
event  = NULL;
 
  283        uv_mutex_lock(nle->lock);
 
  290            uv_mutex_unlock(nle->lock);
 
  294        nle->head = 
event->next;
 
  295        if (event == nle->tail)
 
  298        more = (nle->head != NULL ? true : 
false);
 
  300        uv_mutex_unlock(nle->lock);
 
  304            case NATS_LIBUV_ATTACH:
 
  306                s = uvAsyncAttach(nle);
 
  309            case NATS_LIBUV_READ:
 
  310            case NATS_LIBUV_WRITE:
 
  312                s = uvPollUpdate(nle, event->type, event->add);
 
  315            case NATS_LIBUV_DETACH:
 
  356    uv_loop_t       *uvLoop = (uv_loop_t*) loop;
 
  358    natsLibuvEvents *nle    = (natsLibuvEvents*) (*userData);
 
  360    bool            created = 
false;
 
  362    sched = ((uv_key_get(&uvLoopThreadKey) != loop) ? 
true : 
false);
 
  371        nle = (natsLibuvEvents*) calloc(1, 
sizeof(natsLibuvEvents));
 
  378        nle->lock = (uv_mutex_t*) malloc(
sizeof(uv_mutex_t));
 
  379        if (nle->lock == NULL)
 
  382        if ((s == 
NATS_OK) && (uv_mutex_init(nle->lock) != 0))
 
  386            && ((nle->scheduler = (uv_async_t*) malloc(
sizeof(uv_async_t))) == NULL))
 
  392            && (uv_async_init(uvLoop, nle->scheduler, uvAsyncCb) != 0))
 
  401            nle->scheduler->data = (
void*) nle;
 
  407        nle->socket = socket;
 
  408        nle->events = UV_READABLE;
 
  411            s = uvScheduleToEventLoop(nle, NATS_LIBUV_ATTACH, 
true);
 
  413            s = uvAsyncAttach(nle);
 
  417        *userData = (
void*) nle;
 
  419        natsLibuvEvents_free(nle, 
false);
 
 
  435    natsLibuvEvents *nle = (natsLibuvEvents*) userData;
 
  441        uv_poll_stop(nle->handle);
 
  443    sched = ((uv_key_get(&uvLoopThreadKey) != nle->loop) ? 
true : 
false);
 
  453    if (sched || (nle->head != NULL))
 
  454        s = uvScheduleToEventLoop(nle, NATS_LIBUV_READ, add);
 
  456        s = uvPollUpdate(nle, NATS_LIBUV_READ, add);
 
 
  472    natsLibuvEvents *nle = (natsLibuvEvents*) userData;
 
  476    sched = ((uv_key_get(&uvLoopThreadKey) != nle->loop) ? 
true : 
false);
 
  479    if (sched || (nle->head != NULL))
 
  480        s = uvScheduleToEventLoop(nle, NATS_LIBUV_WRITE, add);
 
  482        s = uvPollUpdate(nle, NATS_LIBUV_WRITE, add);
 
 
  498    natsLibuvEvents *nle = (natsLibuvEvents*) userData;
 
  502    sched = ((uv_key_get(&uvLoopThreadKey) != nle->loop) ? 
true : 
false);
 
  505    if (sched || (nle->head != NULL))
 
  506        s = uvScheduleToEventLoop(nle, NATS_LIBUV_DETACH, 
true);
 
 
 
NATS_EXTERN void natsConnection_Close(natsConnection *nc)
Closes the connection.
 
NATS_EXTERN void natsConnection_ProcessDetachedEvent(natsConnection *nc)
Process a detach event when using external event loop.
 
NATS_EXTERN void natsConnection_ProcessCloseEvent(natsSock *socket)
Process a socket close event when using external event loop.
 
NATS_EXTERN void natsConnection_ProcessReadEvent(natsConnection *nc)
Process a read event when using external event loop.
 
NATS_EXTERN void natsConnection_ProcessWriteEvent(natsConnection *nc)
Process a write event when using external event loop.
 
void natsLibuv_SetThreadLocalLoop(uv_loop_t *loop)
Register the event loop with the thread running uv_run().
Definition libuv.h:104
 
void natsLibuv_Init(void)
Initialize the adapter.
Definition libuv.h:90
 
natsStatus natsLibuv_Read(void *userData, bool add)
Start or stop polling on READ events.
Definition libuv.h:433
 
natsStatus natsLibuv_Attach(void **userData, void *loop, natsConnection *nc, natsSock socket)
Attach a connection to the given event loop.
Definition libuv.h:354
 
natsStatus natsLibuv_Detach(void *userData)
The connection is closed, it can be safely detached.
Definition libuv.h:496
 
natsStatus natsLibuv_Write(void *userData, bool add)
Start or stop polling on WRITE events.
Definition libuv.h:470
 
struct __natsConnection natsConnection
A connection to a NATS Server.
Definition nats.h:152
 
int natsSock
Definition nats.h:50
 
natsStatus
Status returned by most of the APIs.
Definition status.h:50
 
@ NATS_ERR
Generic error.
Definition status.h:53
 
@ NATS_NO_MEMORY
Definition status.h:102
 
@ NATS_ILLEGAL_STATE
Definition status.h:88
 
@ NATS_OK
Success.
Definition status.h:51