28#define NATS_LIBUV_INCLUDE
33#define NATS_LIBUV_ATTACH (1)
34#define NATS_LIBUV_READ (2)
35#define NATS_LIBUV_WRITE (3)
36#define NATS_LIBUV_DETACH (4)
38struct __natsLibuvEvent;
40typedef struct __natsLibuvEvent
44 struct __natsLibuvEvent *next;
53 uv_async_t *scheduler;
69static uv_once_t uvOnce = UV_ONCE_INIT;
70static uv_key_t uvLoopThreadKey;
75 if (uv_key_create(&uvLoopThreadKey) != 0)
92 uv_once(&uvOnce, _initOnce);
106 uv_key_set(&uvLoopThreadKey, (
void*) loop);
110uvScheduleToEventLoop(natsLibuvEvents *nle,
int eventType,
bool add)
112 natsLibuvEvent *newEvent = NULL;
115 newEvent = (natsLibuvEvent*) malloc(
sizeof(natsLibuvEvent));
116 if (newEvent == NULL)
119 newEvent->type = eventType;
121 newEvent->next = NULL;
123 uv_mutex_lock(nle->lock);
125 if (nle->head == NULL)
126 nle->head = newEvent;
128 if (nle->tail != NULL)
129 nle->tail->next = newEvent;
131 nle->tail = newEvent;
139 res = uv_async_send(nle->scheduler);
140 uv_mutex_unlock(nle->lock);
146natsLibuvPoll(uv_poll_t* handle,
int status,
int events)
148 natsLibuvEvents *nle = (natsLibuvEvents*)handle->data;
159 if (events & UV_READABLE)
162 if (events & UV_WRITABLE)
167uvHandleClosedCb(uv_handle_t *handle)
173uvPollUpdate(natsLibuvEvents *nle,
int eventType,
bool add)
175 if (eventType == NATS_LIBUV_READ)
178 nle->events |= UV_READABLE;
180 nle->events &= ~UV_READABLE;
185 nle->events |= UV_WRITABLE;
187 nle->events &= ~UV_WRITABLE;
192 int res = uv_poll_start(nle->handle, nle->events, natsLibuvPoll);
197 uv_close((uv_handle_t*) nle->handle, uvHandleClosedCb);
208uvAsyncAttach(natsLibuvEvents *nle)
215 nle->handle = (uv_poll_t*) malloc(
sizeof(uv_poll_t));
216 if (nle->handle == NULL)
221#if UV_VERSION_MAJOR <= 1
222 if (uv_poll_init_socket(nle->loop, nle->handle, nle->socket) != 0)
224 if (uv_poll_init(nle->loop, nle->handle, nle->socket) != 0)
230 && (nle->handle->data = (
void*) nle)
231 && (uv_poll_start(nle->handle, UV_READABLE, natsLibuvPoll) != 0))
240uvFinalCloseCb(uv_handle_t* handle)
242 natsLibuvEvents *nle = (natsLibuvEvents*) handle->data;
243 natsLibuvEvent *event;
245 while ((event = nle->head) != NULL)
247 nle->head =
event->next;
250 free(nle->scheduler);
251 uv_mutex_destroy(nle->lock);
257uvAsyncDetach(natsLibuvEvents *nle)
259 uv_close((uv_handle_t*) nle->scheduler, uvFinalCloseCb);
263uvAsyncCb(uv_async_t *handle)
265 natsLibuvEvents *nle = (natsLibuvEvents*) handle->data;
267 natsLibuvEvent *
event = NULL;
272 uv_mutex_lock(nle->lock);
279 uv_mutex_unlock(nle->lock);
283 nle->head =
event->next;
284 if (event == nle->tail)
287 more = (nle->head != NULL ? true :
false);
289 uv_mutex_unlock(nle->lock);
293 case NATS_LIBUV_ATTACH:
295 s = uvAsyncAttach(nle);
298 case NATS_LIBUV_READ:
299 case NATS_LIBUV_WRITE:
301 s = uvPollUpdate(nle, event->type, event->add);
304 case NATS_LIBUV_DETACH:
345 uv_loop_t *uvLoop = (uv_loop_t*) loop;
347 natsLibuvEvents *nle = (natsLibuvEvents*) (*userData);
350 sched = ((uv_key_get(&uvLoopThreadKey) != loop) ?
true :
false);
359 nle = (natsLibuvEvents*) calloc(1,
sizeof(natsLibuvEvents));
363 nle->lock = (uv_mutex_t*) malloc(
sizeof(uv_mutex_t));
364 if (nle->lock == NULL)
367 if ((s ==
NATS_OK) && (uv_mutex_init(nle->lock) != 0))
371 && ((nle->scheduler = (uv_async_t*) malloc(
sizeof(uv_async_t))) == NULL))
377 && (uv_async_init(uvLoop, nle->scheduler, uvAsyncCb) != 0))
386 nle->scheduler->data = (
void*) nle;
392 nle->socket = socket;
393 nle->events = UV_READABLE;
396 s = uvScheduleToEventLoop(nle, NATS_LIBUV_ATTACH,
true);
398 s = uvAsyncAttach(nle);
402 *userData = (
void*) nle;
420 natsLibuvEvents *nle = (natsLibuvEvents*) userData;
424 sched = ((uv_key_get(&uvLoopThreadKey) != nle->loop) ?
true :
false);
434 if (sched || (nle->head != NULL))
435 s = uvScheduleToEventLoop(nle, NATS_LIBUV_READ, add);
437 s = uvPollUpdate(nle, NATS_LIBUV_READ, add);
453 natsLibuvEvents *nle = (natsLibuvEvents*) userData;
457 sched = ((uv_key_get(&uvLoopThreadKey) != nle->loop) ?
true :
false);
460 if (sched || (nle->head != NULL))
461 s = uvScheduleToEventLoop(nle, NATS_LIBUV_WRITE, add);
463 s = uvPollUpdate(nle, NATS_LIBUV_WRITE, add);
479 natsLibuvEvents *nle = (natsLibuvEvents*) userData;
483 sched = ((uv_key_get(&uvLoopThreadKey) != nle->loop) ?
true :
false);
486 if (sched || (nle->head != NULL))
487 s = uvScheduleToEventLoop(nle, NATS_LIBUV_DETACH,
true);
NATS_EXTERN void natsConnection_Close(natsConnection *nc)
Closes the connection.
NATS_EXTERN void natsConnection_ProcessCloseEvent(natsSock *socket)
Process a socket close event when using external event loop.
NATS_EXTERN void natsConnection_ProcessReadEvent(natsConnection *nc)
Process a read event when using external event loop.
NATS_EXTERN void natsConnection_ProcessWriteEvent(natsConnection *nc)
Process a write event when using external event loop.
void natsLibuv_SetThreadLocalLoop(uv_loop_t *loop)
Register the event loop with the thread running uv_run().
Definition libuv.h:104
void natsLibuv_Init(void)
Initialize the adapter.
Definition libuv.h:90
natsStatus natsLibuv_Read(void *userData, bool add)
Start or stop polling on READ events.
Definition libuv.h:418
natsStatus natsLibuv_Attach(void **userData, void *loop, natsConnection *nc, natsSock socket)
Attach a connection to the given event loop.
Definition libuv.h:343
natsStatus natsLibuv_Detach(void *userData)
The connection is closed, it can be safely detached.
Definition libuv.h:477
natsStatus natsLibuv_Write(void *userData, bool add)
Start or stop polling on WRITE events.
Definition libuv.h:451
struct __natsConnection natsConnection
A connection to a NATS Server.
Definition nats.h:160
int natsSock
Definition nats.h:58
natsStatus
Status returned by most of the APIs.
Definition status.h:50
@ NATS_ERR
Generic error.
Definition status.h:53
@ NATS_NO_MEMORY
Definition status.h:102
@ NATS_ILLEGAL_STATE
Definition status.h:88
@ NATS_OK
Success.
Definition status.h:51