28#define NATS_LIBUV_INCLUDE
33#define NATS_LIBUV_ATTACH (1)
34#define NATS_LIBUV_READ (2)
35#define NATS_LIBUV_WRITE (3)
36#define NATS_LIBUV_DETACH (4)
38struct __natsLibuvEvent;
40typedef struct __natsLibuvEvent
44 struct __natsLibuvEvent *next;
53 uv_async_t *scheduler;
69static uv_once_t uvOnce = UV_ONCE_INIT;
70static uv_key_t uvLoopThreadKey;
75 if (uv_key_create(&uvLoopThreadKey) != 0)
92 uv_once(&uvOnce, _initOnce);
106 uv_key_set(&uvLoopThreadKey, (
void*) loop);
110uvScheduleToEventLoop(natsLibuvEvents *nle,
int eventType,
bool add)
112 natsLibuvEvent *newEvent = NULL;
115 newEvent = (natsLibuvEvent*) malloc(
sizeof(natsLibuvEvent));
116 if (newEvent == NULL)
119 newEvent->type = eventType;
121 newEvent->next = NULL;
123 uv_mutex_lock(nle->lock);
125 if (nle->head == NULL)
126 nle->head = newEvent;
128 if (nle->tail != NULL)
129 nle->tail->next = newEvent;
131 nle->tail = newEvent;
139 res = uv_async_send(nle->scheduler);
140 uv_mutex_unlock(nle->lock);
146natsLibuvPoll(uv_poll_t* handle,
int status,
int events)
148 natsLibuvEvents *nle = (natsLibuvEvents*)handle->data;
159 if (events & UV_READABLE)
162 if (events & UV_WRITABLE)
167uvHandleClosedCb(uv_handle_t *handle)
173uvPollUpdate(natsLibuvEvents *nle,
int eventType,
bool add)
175 if (eventType == NATS_LIBUV_READ)
178 nle->events |= UV_READABLE;
180 nle->events &= ~UV_READABLE;
185 nle->events |= UV_WRITABLE;
187 nle->events &= ~UV_WRITABLE;
192 int res = uv_poll_start(nle->handle, nle->events, natsLibuvPoll);
197 uv_close((uv_handle_t*) nle->handle, uvHandleClosedCb);
208uvAsyncAttach(natsLibuvEvents *nle)
215 nle->handle = (uv_poll_t*) malloc(
sizeof(uv_poll_t));
216 if (nle->handle == NULL)
221#if UV_VERSION_MAJOR <= 1
222 if (uv_poll_init_socket(nle->loop, nle->handle, nle->socket) != 0)
224 if (uv_poll_init(nle->loop, nle->handle, nle->socket) != 0)
230 && (nle->handle->data = (
void*) nle)
231 && (uv_poll_start(nle->handle, UV_READABLE, natsLibuvPoll) != 0))
240natsLibuvEvents_free(natsLibuvEvents *nle,
bool processDetachedEvent)
242 natsLibuvEvent *event;
244 while ((event = nle->head) != NULL)
246 nle->head =
event->next;
249 free(nle->scheduler);
250 if (nle->lock != NULL)
252 uv_mutex_destroy(nle->lock);
255 if (processDetachedEvent)
261uvFinalCloseCb(uv_handle_t* handle)
263 natsLibuvEvents *nle = (natsLibuvEvents*) handle->data;
264 natsLibuvEvents_free(nle,
true);
268uvAsyncDetach(natsLibuvEvents *nle)
270 uv_close((uv_handle_t*) nle->scheduler, uvFinalCloseCb);
274uvAsyncCb(uv_async_t *handle)
276 natsLibuvEvents *nle = (natsLibuvEvents*) handle->data;
278 natsLibuvEvent *
event = NULL;
283 uv_mutex_lock(nle->lock);
290 uv_mutex_unlock(nle->lock);
294 nle->head =
event->next;
295 if (event == nle->tail)
298 more = (nle->head != NULL ? true :
false);
300 uv_mutex_unlock(nle->lock);
304 case NATS_LIBUV_ATTACH:
306 s = uvAsyncAttach(nle);
309 case NATS_LIBUV_READ:
310 case NATS_LIBUV_WRITE:
312 s = uvPollUpdate(nle, event->type, event->add);
315 case NATS_LIBUV_DETACH:
356 uv_loop_t *uvLoop = (uv_loop_t*) loop;
358 natsLibuvEvents *nle = (natsLibuvEvents*) (*userData);
360 bool created =
false;
362 sched = ((uv_key_get(&uvLoopThreadKey) != loop) ?
true :
false);
371 nle = (natsLibuvEvents*) calloc(1,
sizeof(natsLibuvEvents));
378 nle->lock = (uv_mutex_t*) malloc(
sizeof(uv_mutex_t));
379 if (nle->lock == NULL)
382 if ((s ==
NATS_OK) && (uv_mutex_init(nle->lock) != 0))
386 && ((nle->scheduler = (uv_async_t*) malloc(
sizeof(uv_async_t))) == NULL))
392 && (uv_async_init(uvLoop, nle->scheduler, uvAsyncCb) != 0))
401 nle->scheduler->data = (
void*) nle;
407 nle->socket = socket;
408 nle->events = UV_READABLE;
411 s = uvScheduleToEventLoop(nle, NATS_LIBUV_ATTACH,
true);
413 s = uvAsyncAttach(nle);
417 *userData = (
void*) nle;
419 natsLibuvEvents_free(nle,
false);
435 natsLibuvEvents *nle = (natsLibuvEvents*) userData;
441 uv_poll_stop(nle->handle);
443 sched = ((uv_key_get(&uvLoopThreadKey) != nle->loop) ?
true :
false);
453 if (sched || (nle->head != NULL))
454 s = uvScheduleToEventLoop(nle, NATS_LIBUV_READ, add);
456 s = uvPollUpdate(nle, NATS_LIBUV_READ, add);
472 natsLibuvEvents *nle = (natsLibuvEvents*) userData;
476 sched = ((uv_key_get(&uvLoopThreadKey) != nle->loop) ?
true :
false);
479 if (sched || (nle->head != NULL))
480 s = uvScheduleToEventLoop(nle, NATS_LIBUV_WRITE, add);
482 s = uvPollUpdate(nle, NATS_LIBUV_WRITE, add);
498 natsLibuvEvents *nle = (natsLibuvEvents*) userData;
502 sched = ((uv_key_get(&uvLoopThreadKey) != nle->loop) ?
true :
false);
505 if (sched || (nle->head != NULL))
506 s = uvScheduleToEventLoop(nle, NATS_LIBUV_DETACH,
true);
NATS_EXTERN void natsConnection_Close(natsConnection *nc)
Closes the connection.
NATS_EXTERN void natsConnection_ProcessDetachedEvent(natsConnection *nc)
Process a detach event when using external event loop.
NATS_EXTERN void natsConnection_ProcessCloseEvent(natsSock *socket)
Process a socket close event when using external event loop.
NATS_EXTERN void natsConnection_ProcessReadEvent(natsConnection *nc)
Process a read event when using external event loop.
NATS_EXTERN void natsConnection_ProcessWriteEvent(natsConnection *nc)
Process a write event when using external event loop.
void natsLibuv_SetThreadLocalLoop(uv_loop_t *loop)
Register the event loop with the thread running uv_run().
Definition libuv.h:104
void natsLibuv_Init(void)
Initialize the adapter.
Definition libuv.h:90
natsStatus natsLibuv_Read(void *userData, bool add)
Start or stop polling on READ events.
Definition libuv.h:433
natsStatus natsLibuv_Attach(void **userData, void *loop, natsConnection *nc, natsSock socket)
Attach a connection to the given event loop.
Definition libuv.h:354
natsStatus natsLibuv_Detach(void *userData)
The connection is closed, it can be safely detached.
Definition libuv.h:496
natsStatus natsLibuv_Write(void *userData, bool add)
Start or stop polling on WRITE events.
Definition libuv.h:470
struct __natsConnection natsConnection
A connection to a NATS Server.
Definition nats.h:152
int natsSock
Definition nats.h:50
natsStatus
Status returned by most of the APIs.
Definition status.h:50
@ NATS_ERR
Generic error.
Definition status.h:53
@ NATS_NO_MEMORY
Definition status.h:102
@ NATS_ILLEGAL_STATE
Definition status.h:88
@ NATS_OK
Success.
Definition status.h:51