28 #define NATS_LIBUV_INCLUDE
33 #define NATS_LIBUV_ATTACH (1)
34 #define NATS_LIBUV_READ (2)
35 #define NATS_LIBUV_WRITE (3)
36 #define NATS_LIBUV_DETACH (4)
38 struct __natsLibuvEvent;
40 typedef struct __natsLibuvEvent
44 struct __natsLibuvEvent *next;
53 uv_async_t *scheduler;
69 static uv_once_t uvOnce = UV_ONCE_INIT;
70 static uv_key_t uvLoopThreadKey;
75 if (uv_key_create(&uvLoopThreadKey) != 0)
92 uv_once(&uvOnce, _initOnce);
106 uv_key_set(&uvLoopThreadKey, (
void*) loop);
110 uvScheduleToEventLoop(natsLibuvEvents *nle,
int eventType,
bool add)
112 natsLibuvEvent *newEvent = NULL;
115 newEvent = (natsLibuvEvent*) malloc(
sizeof(natsLibuvEvent));
116 if (newEvent == NULL)
119 newEvent->type = eventType;
121 newEvent->next = NULL;
123 uv_mutex_lock(nle->lock);
125 if (nle->head == NULL)
126 nle->head = newEvent;
128 if (nle->tail != NULL)
129 nle->tail->next = newEvent;
131 nle->tail = newEvent;
133 uv_mutex_unlock(nle->lock);
135 res = uv_async_send(nle->scheduler);
141 natsLibuvPoll(uv_poll_t* handle,
int status,
int events)
143 natsLibuvEvents *nle = (natsLibuvEvents*)handle->data;
154 if (events & UV_READABLE)
157 if (events & UV_WRITABLE)
162 uvPollUpdate(natsLibuvEvents *nle,
int eventType,
bool add)
166 if (eventType == NATS_LIBUV_READ)
169 nle->events |= UV_READABLE;
171 nle->events &= ~UV_READABLE;
176 nle->events |= UV_WRITABLE;
178 nle->events &= ~UV_WRITABLE;
182 res = uv_poll_start(nle->handle, nle->events, natsLibuvPoll);
184 res = uv_poll_stop(nle->handle);
193 uvHandleClosedCb(uv_handle_t *handle)
199 uvAsyncAttach(natsLibuvEvents *nle)
204 if (nle->handle != NULL)
206 uv_close((uv_handle_t*) nle->handle, uvHandleClosedCb);
210 nle->handle = (uv_poll_t*) malloc(
sizeof(uv_poll_t));
211 if (nle->handle == NULL)
216 #if UV_VERSION_MAJOR <= 1
217 if (uv_poll_init_socket(nle->loop, nle->handle, nle->socket) != 0)
219 if (uv_poll_init(nle->loop, nle->handle, nle->socket) != 0)
225 && (nle->handle->data = (
void*) nle)
226 && (uv_poll_start(nle->handle, UV_READABLE, natsLibuvPoll) != 0))
235 finalCloseCb(uv_handle_t* handle)
237 natsLibuvEvents *nle = (natsLibuvEvents*)handle->data;
238 natsLibuvEvent *event;
240 while ((event = nle->head) != NULL)
242 nle->head =
event->next;
246 free(nle->scheduler);
247 uv_mutex_destroy(nle->lock);
253 closeSchedulerCb(uv_handle_t* scheduler)
255 natsLibuvEvents *nle = (natsLibuvEvents*) scheduler->data;
257 uv_close((uv_handle_t*) nle->handle, finalCloseCb);
261 uvAsyncDetach(natsLibuvEvents *nle)
263 uv_close((uv_handle_t*) nle->scheduler, closeSchedulerCb);
267 uvAsyncCb(uv_async_t *handle)
269 natsLibuvEvents *nle = (natsLibuvEvents*) handle->data;
271 natsLibuvEvent *event = NULL;
276 uv_mutex_lock(nle->lock);
283 uv_mutex_unlock(nle->lock);
287 nle->head =
event->next;
288 if (event == nle->tail)
291 more = (nle->head != NULL ? true :
false);
293 uv_mutex_unlock(nle->lock);
297 case NATS_LIBUV_ATTACH:
299 s = uvAsyncAttach(nle);
302 case NATS_LIBUV_READ:
303 case NATS_LIBUV_WRITE:
305 s = uvPollUpdate(nle, event->type, event->add);
308 case NATS_LIBUV_DETACH:
345 uv_loop_t *uvLoop = (uv_loop_t*) loop;
347 natsLibuvEvents *nle = (natsLibuvEvents*) (*userData);
350 sched = ((uv_key_get(&uvLoopThreadKey) != loop) ?
true :
false);
359 nle = (natsLibuvEvents*) calloc(1,
sizeof(natsLibuvEvents));
363 nle->lock = (uv_mutex_t*) malloc(
sizeof(uv_mutex_t));
364 if (nle->lock == NULL)
367 if ((s ==
NATS_OK) && (uv_mutex_init(nle->lock) != 0))
371 && ((nle->scheduler = (uv_async_t*) malloc(
sizeof(uv_async_t))) == NULL))
377 && (uv_async_init(uvLoop, nle->scheduler, uvAsyncCb) != 0))
386 nle->scheduler->data = (
void*) nle;
392 nle->socket = socket;
393 nle->events = UV_READABLE;
396 s = uvScheduleToEventLoop(nle, NATS_LIBUV_ATTACH,
true);
398 s = uvAsyncAttach(nle);
402 *userData = (
void*) nle;
420 natsLibuvEvents *nle = (natsLibuvEvents*) userData;
424 sched = ((uv_key_get(&uvLoopThreadKey) != nle->loop) ?
true :
false);
434 if (sched || (nle->head != NULL))
435 s = uvScheduleToEventLoop(nle, NATS_LIBUV_READ, add);
437 s = uvPollUpdate(nle, NATS_LIBUV_READ, add);
453 natsLibuvEvents *nle = (natsLibuvEvents*) userData;
457 sched = ((uv_key_get(&uvLoopThreadKey) != nle->loop) ?
true :
false);
460 if (sched || (nle->head != NULL))
461 s = uvScheduleToEventLoop(nle, NATS_LIBUV_WRITE, add);
463 s = uvPollUpdate(nle, NATS_LIBUV_WRITE, add);
479 natsLibuvEvents *nle = (natsLibuvEvents*) userData;
483 sched = ((uv_key_get(&uvLoopThreadKey) != nle->loop) ?
true :
false);
486 if (sched || (nle->head != NULL))
487 s = uvScheduleToEventLoop(nle, NATS_LIBUV_DETACH,
true);
NATS_EXTERN void natsConnection_Close(natsConnection *nc)
Closes the connection.
NATS_EXTERN void natsConnection_ProcessReadEvent(natsConnection *nc)
Process a read event when using external event loop.
NATS_EXTERN void natsConnection_ProcessWriteEvent(natsConnection *nc)
Process a write event when using external event loop.
void natsLibuv_SetThreadLocalLoop(uv_loop_t *loop)
Register the event loop with the thread running uv_run().
Definition: libuv.h:104
void natsLibuv_Init(void)
Initialize the adapter.
Definition: libuv.h:90
natsStatus natsLibuv_Read(void *userData, bool add)
Start or stop polling on READ events.
Definition: libuv.h:418
natsStatus natsLibuv_Attach(void **userData, void *loop, natsConnection *nc, natsSock socket)
Attach a connection to the given event loop.
Definition: libuv.h:343
natsStatus natsLibuv_Detach(void *userData)
The connection is closed, it can be safely detached.
Definition: libuv.h:477
natsStatus natsLibuv_Write(void *userData, bool add)
Start or stop polling on WRITE events.
Definition: libuv.h:451
struct __natsConnection natsConnection
A connection to a NATS Server.
Definition: nats.h:152
int natsSock
Definition: nats.h:50
natsStatus
Status returned by most of the APIs.
Definition: status.h:50
@ NATS_ERR
Generic error.
Definition: status.h:53
@ NATS_NO_MEMORY
Definition: status.h:102
@ NATS_ILLEGAL_STATE
Definition: status.h:88
@ NATS_OK
Success.
Definition: status.h:51