NATS C Client with JetStream and Streaming support  3.11.0
The nats.io C Client, Supported by Synadia Communications Inc.
Loading...
Searching...
No Matches
libuv.h
Go to the documentation of this file.
1// Copyright 2016-2018 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14#ifndef LIBUV_H_
15#define LIBUV_H_
16
17#ifdef __cplusplus
18extern "C" {
19#endif
20
24#ifndef _GNU_SOURCE
25#define _GNU_SOURCE
26#endif
27
28#define NATS_LIBUV_INCLUDE
29
30#include <uv.h>
31#include "../nats.h"
32
33#define NATS_LIBUV_ATTACH (1)
34#define NATS_LIBUV_READ (2)
35#define NATS_LIBUV_WRITE (3)
36#define NATS_LIBUV_DETACH (4)
37
38struct __natsLibuvEvent;
39
40typedef struct __natsLibuvEvent
41{
42 int type;
43 bool add;
44 struct __natsLibuvEvent *next;
45
46} natsLibuvEvent;
47
48typedef struct
49{
51 uv_loop_t *loop;
52 uv_poll_t *handle;
53 uv_async_t *scheduler;
54 int events;
55 natsSock socket;
56 uv_mutex_t *lock;
57 natsLibuvEvent *head;
58 natsLibuvEvent *tail;
59
60} natsLibuvEvents;
61
62// Forward declarations
63natsStatus natsLibuv_Detach(void *userData);
64
69static uv_once_t uvOnce = UV_ONCE_INIT;
70static uv_key_t uvLoopThreadKey;
71
72static void
73_initOnce(void)
74{
75 if (uv_key_create(&uvLoopThreadKey) != 0)
76 abort();
77}
78
89void
91{
92 uv_once(&uvOnce, _initOnce);
93}
94
103void
105{
106 uv_key_set(&uvLoopThreadKey, (void*) loop);
107}
108
109static natsStatus
110uvScheduleToEventLoop(natsLibuvEvents *nle, int eventType, bool add)
111{
112 natsLibuvEvent *newEvent = NULL;
113 int res;
114
115 newEvent = (natsLibuvEvent*) malloc(sizeof(natsLibuvEvent));
116 if (newEvent == NULL)
117 return NATS_NO_MEMORY;
118
119 newEvent->type = eventType;
120 newEvent->add = add;
121 newEvent->next = NULL;
122
123 uv_mutex_lock(nle->lock);
124
125 if (nle->head == NULL)
126 nle->head = newEvent;
127
128 if (nle->tail != NULL)
129 nle->tail->next = newEvent;
130
131 nle->tail = newEvent;
132
133 // We need to wake up the event loop thread under our lock because
134 // due to signal coalescing (and the reason we have a list), it is
135 // possible that the detach that we have just added is processed
136 // after we release the lock, freeing the `nle` structure. Calling
137 // `uv_async_send(nle->scheduler)` outside this lock would then
138 // cause a crash or race.
139 res = uv_async_send(nle->scheduler);
140 uv_mutex_unlock(nle->lock);
141
142 return (res == 0 ? NATS_OK : NATS_ERR);
143}
144
145static void
146natsLibuvPoll(uv_poll_t* handle, int status, int events)
147{
148 natsLibuvEvents *nle = (natsLibuvEvents*)handle->data;
149
150 if (status != 0)
151 {
152 // There was an error, try to process as a read event.
153 // If we had an issue with the socket, this will cause
154 // an auto-reconnect.
156 return;
157 }
158
159 if (events & UV_READABLE)
161
162 if (events & UV_WRITABLE)
164}
165
166static void
167uvHandleClosedCb(uv_handle_t *handle)
168{
169 free(handle);
170}
171
172static natsStatus
173uvPollUpdate(natsLibuvEvents *nle, int eventType, bool add)
174{
175 if (eventType == NATS_LIBUV_READ)
176 {
177 if (add)
178 nle->events |= UV_READABLE;
179 else
180 nle->events &= ~UV_READABLE;
181 }
182 else
183 {
184 if (add)
185 nle->events |= UV_WRITABLE;
186 else
187 nle->events &= ~UV_WRITABLE;
188 }
189
190 if (nle->events)
191 {
192 int res = uv_poll_start(nle->handle, nle->events, natsLibuvPoll);
193 return (res == 0 ? NATS_OK : NATS_ERR);
194 }
195 // Both read and write events have been removed, this signal that the socket
196 // should be closed prior to a reconnect or during natsConnection_Close().
197 uv_close((uv_handle_t*) nle->handle, uvHandleClosedCb);
198 nle->handle = NULL;
199 // We have stopped polling for events for this socket and are in the event
200 // loop thread, so we invoke this so that the NATS C client library can
201 // proceed with closing the socket.
202 natsConnection_ProcessCloseEvent(&(nle->socket));
203
204 return NATS_OK;
205}
206
207static natsStatus
208uvAsyncAttach(natsLibuvEvents *nle)
209{
211
212 // Even when this is a reconnect, previous nle->handle has already been
213 // set to NULL (and the memory has or will be freed in uvHandleClosedCb),
214 // so recreate now.
215 nle->handle = (uv_poll_t*) malloc(sizeof(uv_poll_t));
216 if (nle->handle == NULL)
217 s = NATS_NO_MEMORY;
218
219 if (s == NATS_OK)
220 {
221#if UV_VERSION_MAJOR <= 1
222 if (uv_poll_init_socket(nle->loop, nle->handle, nle->socket) != 0)
223#else
224 if (uv_poll_init(nle->loop, nle->handle, nle->socket) != 0)
225#endif
226 s = NATS_ERR;
227 }
228
229 if ((s == NATS_OK)
230 && (nle->handle->data = (void*) nle)
231 && (uv_poll_start(nle->handle, UV_READABLE, natsLibuvPoll) != 0))
232 {
233 s = NATS_ERR;
234 }
235
236 return s;
237}
238
239static void
240natsLibuvEvents_free(natsLibuvEvents *nle, bool processDetachedEvent)
241{
242 natsLibuvEvent *event;
243
244 while ((event = nle->head) != NULL)
245 {
246 nle->head = event->next;
247 free(event);
248 }
249 free(nle->scheduler);
250 if (nle->lock != NULL)
251 {
252 uv_mutex_destroy(nle->lock);
253 free(nle->lock);
254 }
255 if (processDetachedEvent)
257 free(nle);
258}
259
260static void
261uvFinalCloseCb(uv_handle_t* handle)
262{
263 natsLibuvEvents *nle = (natsLibuvEvents*) handle->data;
264 natsLibuvEvents_free(nle, true);
265}
266
267static void
268uvAsyncDetach(natsLibuvEvents *nle)
269{
270 uv_close((uv_handle_t*) nle->scheduler, uvFinalCloseCb);
271}
272
273static void
274uvAsyncCb(uv_async_t *handle)
275{
276 natsLibuvEvents *nle = (natsLibuvEvents*) handle->data;
278 natsLibuvEvent *event = NULL;
279 bool more = false;
280
281 while (1)
282 {
283 uv_mutex_lock(nle->lock);
284
285 event = nle->head;
286 if (event == NULL)
287 {
288 // This is possible, even on entry of this function because
289 // the callback is called when the handle is initialized.
290 uv_mutex_unlock(nle->lock);
291 return;
292 }
293
294 nle->head = event->next;
295 if (event == nle->tail)
296 nle->tail = NULL;
297
298 more = (nle->head != NULL ? true : false);
299
300 uv_mutex_unlock(nle->lock);
301
302 switch (event->type)
303 {
304 case NATS_LIBUV_ATTACH:
305 {
306 s = uvAsyncAttach(nle);
307 break;
308 }
309 case NATS_LIBUV_READ:
310 case NATS_LIBUV_WRITE:
311 {
312 s = uvPollUpdate(nle, event->type, event->add);
313 break;
314 }
315 case NATS_LIBUV_DETACH:
316 {
317 uvAsyncDetach(nle);
318 // We want to make sure that we will exit this loop since by now
319 // the `nle` structure may have been freed. Regardless, this is
320 // supposed to be the last event for this `nle` object.
321 more = false;
322 break;
323 }
324 default:
325 {
326 s = NATS_ERR;
327 break;
328 }
329 }
330
331 free(event);
332
333 if ((s != NATS_OK) || !more)
334 break;
335 }
336
337 if (s != NATS_OK)
338 natsConnection_Close(nle->nc);
339}
340
354natsLibuv_Attach(void **userData, void *loop, natsConnection *nc, natsSock socket)
355{
356 uv_loop_t *uvLoop = (uv_loop_t*) loop;
357 bool sched = false;
358 natsLibuvEvents *nle = (natsLibuvEvents*) (*userData);
360 bool created = false;
361
362 sched = ((uv_key_get(&uvLoopThreadKey) != loop) ? true : false);
363
364 // This is the first attach (when reconnecting, nle will be non-NULL).
365 if (nle == NULL)
366 {
367 // This has to run from the event loop!
368 if (sched)
369 return NATS_ILLEGAL_STATE;
370
371 nle = (natsLibuvEvents*) calloc(1, sizeof(natsLibuvEvents));
372 if (nle == NULL)
373 return NATS_NO_MEMORY;
374
375 // Indicate that we have created the object here (in case we get a failure).
376 created = true;
377
378 nle->lock = (uv_mutex_t*) malloc(sizeof(uv_mutex_t));
379 if (nle->lock == NULL)
380 s = NATS_NO_MEMORY;
381
382 if ((s == NATS_OK) && (uv_mutex_init(nle->lock) != 0))
383 s = NATS_ERR;
384
385 if ((s == NATS_OK)
386 && ((nle->scheduler = (uv_async_t*) malloc(sizeof(uv_async_t))) == NULL))
387 {
388 s = NATS_NO_MEMORY;
389 }
390
391 if ((s == NATS_OK)
392 && (uv_async_init(uvLoop, nle->scheduler, uvAsyncCb) != 0))
393 {
394 s = NATS_ERR;
395 }
396
397 if (s == NATS_OK)
398 {
399 nle->nc = nc;
400 nle->loop = uvLoop;
401 nle->scheduler->data = (void*) nle;
402 }
403 }
404
405 if (s == NATS_OK)
406 {
407 nle->socket = socket;
408 nle->events = UV_READABLE;
409
410 if (sched)
411 s = uvScheduleToEventLoop(nle, NATS_LIBUV_ATTACH, true);
412 else
413 s = uvAsyncAttach(nle);
414 }
415
416 if (s == NATS_OK)
417 *userData = (void*) nle;
418 else if (created)
419 natsLibuvEvents_free(nle, false);
420
421 return s;
422}
423
433natsLibuv_Read(void *userData, bool add)
434{
435 natsLibuvEvents *nle = (natsLibuvEvents*) userData;
437 bool sched;
438
439 // If we remove, first stop polling immediately, then proceed as usual.
440 if (!add)
441 uv_poll_stop(nle->handle);
442
443 sched = ((uv_key_get(&uvLoopThreadKey) != nle->loop) ? true : false);
444
445 // If this call is made from a different thread than the event loop's
446 // thread, or if there are already scheduled events, then schedule
447 // this new event.
448
449 // We don't need to get the lock for nle->head because if sched is
450 // false, we are in the event loop thread, which is the thread removing
451 // events from the list. Also, all calls to the read/write/etc.. callbacks
452 // are protected by the connection's lock in the NATS library.
453 if (sched || (nle->head != NULL))
454 s = uvScheduleToEventLoop(nle, NATS_LIBUV_READ, add);
455 else
456 s = uvPollUpdate(nle, NATS_LIBUV_READ, add);
457
458 return s;
459}
460
470natsLibuv_Write(void *userData, bool add)
471{
472 natsLibuvEvents *nle = (natsLibuvEvents*) userData;
474 bool sched;
475
476 sched = ((uv_key_get(&uvLoopThreadKey) != nle->loop) ? true : false);
477
478 // See comment in natsLibuvRead
479 if (sched || (nle->head != NULL))
480 s = uvScheduleToEventLoop(nle, NATS_LIBUV_WRITE, add);
481 else
482 s = uvPollUpdate(nle, NATS_LIBUV_WRITE, add);
483
484 return s;
485}
486
496natsLibuv_Detach(void *userData)
497{
498 natsLibuvEvents *nle = (natsLibuvEvents*) userData;
500 bool sched;
501
502 sched = ((uv_key_get(&uvLoopThreadKey) != nle->loop) ? true : false);
503
504 // See comment in natsLibuvRead
505 if (sched || (nle->head != NULL))
506 s = uvScheduleToEventLoop(nle, NATS_LIBUV_DETACH, true);
507 else
508 uvAsyncDetach(nle);
509
510 return s;
511}
512
// end of libuvFunctions
514
515#ifdef __cplusplus
516}
517#endif
518
519#endif /* LIBUV_H_ */
NATS_EXTERN void natsConnection_Close(natsConnection *nc)
Closes the connection.
NATS_EXTERN void natsConnection_ProcessDetachedEvent(natsConnection *nc)
Process a detach event when using external event loop.
NATS_EXTERN void natsConnection_ProcessCloseEvent(natsSock *socket)
Process a socket close event when using external event loop.
NATS_EXTERN void natsConnection_ProcessReadEvent(natsConnection *nc)
Process a read event when using external event loop.
NATS_EXTERN void natsConnection_ProcessWriteEvent(natsConnection *nc)
Process a write event when using external event loop.
void natsLibuv_SetThreadLocalLoop(uv_loop_t *loop)
Register the event loop with the thread running uv_run().
Definition libuv.h:104
void natsLibuv_Init(void)
Initialize the adapter.
Definition libuv.h:90
natsStatus natsLibuv_Read(void *userData, bool add)
Start or stop polling on READ events.
Definition libuv.h:433
natsStatus natsLibuv_Attach(void **userData, void *loop, natsConnection *nc, natsSock socket)
Attach a connection to the given event loop.
Definition libuv.h:354
natsStatus natsLibuv_Detach(void *userData)
The connection is closed, it can be safely detached.
Definition libuv.h:496
natsStatus natsLibuv_Write(void *userData, bool add)
Start or stop polling on WRITE events.
Definition libuv.h:470
struct __natsConnection natsConnection
A connection to a NATS Server.
Definition nats.h:152
int natsSock
Definition nats.h:50
natsStatus
Status returned by most of the APIs.
Definition status.h:50
@ NATS_ERR
Generic error.
Definition status.h:53
@ NATS_NO_MEMORY
Definition status.h:102
@ NATS_ILLEGAL_STATE
Definition status.h:88
@ NATS_OK
Success.
Definition status.h:51