NATS C Client with JetStream and Streaming support  3.10.0
The nats.io C Client, Supported by Synadia Communications Inc.
Loading...
Searching...
No Matches
libuv.h
Go to the documentation of this file.
1// Copyright 2016-2018 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14#ifndef LIBUV_H_
15#define LIBUV_H_
16
17#ifdef __cplusplus
18extern "C" {
19#endif
20
24#ifndef _GNU_SOURCE
25#define _GNU_SOURCE
26#endif
27
28#define NATS_LIBUV_INCLUDE
29
30#include <uv.h>
31#include "../nats.h"
32
33#define NATS_LIBUV_ATTACH (1)
34#define NATS_LIBUV_READ (2)
35#define NATS_LIBUV_WRITE (3)
36#define NATS_LIBUV_DETACH (4)
37
38struct __natsLibuvEvent;
39
40typedef struct __natsLibuvEvent
41{
42 int type;
43 bool add;
44 struct __natsLibuvEvent *next;
45
46} natsLibuvEvent;
47
48typedef struct
49{
51 uv_loop_t *loop;
52 uv_poll_t *handle;
53 uv_async_t *scheduler;
54 int events;
55 natsSock socket;
56 uv_mutex_t *lock;
57 natsLibuvEvent *head;
58 natsLibuvEvent *tail;
59
60} natsLibuvEvents;
61
62// Forward declarations
63natsStatus natsLibuv_Detach(void *userData);
64
69static uv_once_t uvOnce = UV_ONCE_INIT;
70static uv_key_t uvLoopThreadKey;
71
72static void
73_initOnce(void)
74{
75 if (uv_key_create(&uvLoopThreadKey) != 0)
76 abort();
77}
78
89void
91{
92 uv_once(&uvOnce, _initOnce);
93}
94
103void
105{
106 uv_key_set(&uvLoopThreadKey, (void*) loop);
107}
108
109static natsStatus
110uvScheduleToEventLoop(natsLibuvEvents *nle, int eventType, bool add)
111{
112 natsLibuvEvent *newEvent = NULL;
113 int res;
114
115 newEvent = (natsLibuvEvent*) malloc(sizeof(natsLibuvEvent));
116 if (newEvent == NULL)
117 return NATS_NO_MEMORY;
118
119 newEvent->type = eventType;
120 newEvent->add = add;
121 newEvent->next = NULL;
122
123 uv_mutex_lock(nle->lock);
124
125 if (nle->head == NULL)
126 nle->head = newEvent;
127
128 if (nle->tail != NULL)
129 nle->tail->next = newEvent;
130
131 nle->tail = newEvent;
132
133 // We need to wake up the event loop thread under our lock because
134 // due to signal coalescing (and the reason we have a list), it is
135 // possible that the detach that we have just added is processed
136 // after we release the lock, freeing the `nle` structure. Calling
137 // `uv_async_send(nle->scheduler)` outside this lock would then
138 // cause a crash or race.
139 res = uv_async_send(nle->scheduler);
140 uv_mutex_unlock(nle->lock);
141
142 return (res == 0 ? NATS_OK : NATS_ERR);
143}
144
145static void
146natsLibuvPoll(uv_poll_t* handle, int status, int events)
147{
148 natsLibuvEvents *nle = (natsLibuvEvents*)handle->data;
149
150 if (status != 0)
151 {
152 // There was an error, try to process as a read event.
153 // If we had an issue with the socket, this will cause
154 // an auto-reconnect.
156 return;
157 }
158
159 if (events & UV_READABLE)
161
162 if (events & UV_WRITABLE)
164}
165
166static void
167uvHandleClosedCb(uv_handle_t *handle)
168{
169 free(handle);
170}
171
172static natsStatus
173uvPollUpdate(natsLibuvEvents *nle, int eventType, bool add)
174{
175 if (eventType == NATS_LIBUV_READ)
176 {
177 if (add)
178 nle->events |= UV_READABLE;
179 else
180 nle->events &= ~UV_READABLE;
181 }
182 else
183 {
184 if (add)
185 nle->events |= UV_WRITABLE;
186 else
187 nle->events &= ~UV_WRITABLE;
188 }
189
190 if (nle->events)
191 {
192 int res = uv_poll_start(nle->handle, nle->events, natsLibuvPoll);
193 return (res == 0 ? NATS_OK : NATS_ERR);
194 }
195 // Both read and write events have been removed, this signal that the socket
196 // should be closed prior to a reconnect or during natsConnection_Close().
197 uv_close((uv_handle_t*) nle->handle, uvHandleClosedCb);
198 nle->handle = NULL;
199 // We have stopped polling for events for this socket and are in the event
200 // loop thread, so we invoke this so that the NATS C client library can
201 // proceed with closing the socket.
202 natsConnection_ProcessCloseEvent(&(nle->socket));
203
204 return NATS_OK;
205}
206
207static natsStatus
208uvAsyncAttach(natsLibuvEvents *nle)
209{
211
212 // Even when this is a reconnect, previous nle->handle has already been
213 // set to NULL (and the memory has or will be freed in uvHandleClosedCb),
214 // so recreate now.
215 nle->handle = (uv_poll_t*) malloc(sizeof(uv_poll_t));
216 if (nle->handle == NULL)
217 s = NATS_NO_MEMORY;
218
219 if (s == NATS_OK)
220 {
221#if UV_VERSION_MAJOR <= 1
222 if (uv_poll_init_socket(nle->loop, nle->handle, nle->socket) != 0)
223#else
224 if (uv_poll_init(nle->loop, nle->handle, nle->socket) != 0)
225#endif
226 s = NATS_ERR;
227 }
228
229 if ((s == NATS_OK)
230 && (nle->handle->data = (void*) nle)
231 && (uv_poll_start(nle->handle, UV_READABLE, natsLibuvPoll) != 0))
232 {
233 s = NATS_ERR;
234 }
235
236 return s;
237}
238
239static void
240uvFinalCloseCb(uv_handle_t* handle)
241{
242 natsLibuvEvents *nle = (natsLibuvEvents*) handle->data;
243 natsLibuvEvent *event;
244
245 while ((event = nle->head) != NULL)
246 {
247 nle->head = event->next;
248 free(event);
249 }
250 free(nle->scheduler);
251 uv_mutex_destroy(nle->lock);
252 free(nle->lock);
253 free(nle);
254}
255
256static void
257uvAsyncDetach(natsLibuvEvents *nle)
258{
259 uv_close((uv_handle_t*) nle->scheduler, uvFinalCloseCb);
260}
261
262static void
263uvAsyncCb(uv_async_t *handle)
264{
265 natsLibuvEvents *nle = (natsLibuvEvents*) handle->data;
267 natsLibuvEvent *event = NULL;
268 bool more = false;
269
270 while (1)
271 {
272 uv_mutex_lock(nle->lock);
273
274 event = nle->head;
275 if (event == NULL)
276 {
277 // This is possible, even on entry of this function because
278 // the callback is called when the handle is initialized.
279 uv_mutex_unlock(nle->lock);
280 return;
281 }
282
283 nle->head = event->next;
284 if (event == nle->tail)
285 nle->tail = NULL;
286
287 more = (nle->head != NULL ? true : false);
288
289 uv_mutex_unlock(nle->lock);
290
291 switch (event->type)
292 {
293 case NATS_LIBUV_ATTACH:
294 {
295 s = uvAsyncAttach(nle);
296 break;
297 }
298 case NATS_LIBUV_READ:
299 case NATS_LIBUV_WRITE:
300 {
301 s = uvPollUpdate(nle, event->type, event->add);
302 break;
303 }
304 case NATS_LIBUV_DETACH:
305 {
306 uvAsyncDetach(nle);
307 // We want to make sure that we will exit this loop since by now
308 // the `nle` structure may have been freed. Regardless, this is
309 // supposed to be the last event for this `nle` object.
310 more = false;
311 break;
312 }
313 default:
314 {
315 s = NATS_ERR;
316 break;
317 }
318 }
319
320 free(event);
321
322 if ((s != NATS_OK) || !more)
323 break;
324 }
325
326 if (s != NATS_OK)
327 natsConnection_Close(nle->nc);
328}
329
343natsLibuv_Attach(void **userData, void *loop, natsConnection *nc, natsSock socket)
344{
345 uv_loop_t *uvLoop = (uv_loop_t*) loop;
346 bool sched = false;
347 natsLibuvEvents *nle = (natsLibuvEvents*) (*userData);
349
350 sched = ((uv_key_get(&uvLoopThreadKey) != loop) ? true : false);
351
352 // This is the first attach (when reconnecting, nle will be non-NULL).
353 if (nle == NULL)
354 {
355 // This has to run from the event loop!
356 if (sched)
357 return NATS_ILLEGAL_STATE;
358
359 nle = (natsLibuvEvents*) calloc(1, sizeof(natsLibuvEvents));
360 if (nle == NULL)
361 return NATS_NO_MEMORY;
362
363 nle->lock = (uv_mutex_t*) malloc(sizeof(uv_mutex_t));
364 if (nle->lock == NULL)
365 s = NATS_NO_MEMORY;
366
367 if ((s == NATS_OK) && (uv_mutex_init(nle->lock) != 0))
368 s = NATS_ERR;
369
370 if ((s == NATS_OK)
371 && ((nle->scheduler = (uv_async_t*) malloc(sizeof(uv_async_t))) == NULL))
372 {
373 s = NATS_NO_MEMORY;
374 }
375
376 if ((s == NATS_OK)
377 && (uv_async_init(uvLoop, nle->scheduler, uvAsyncCb) != 0))
378 {
379 s = NATS_ERR;
380 }
381
382 if (s == NATS_OK)
383 {
384 nle->nc = nc;
385 nle->loop = uvLoop;
386 nle->scheduler->data = (void*) nle;
387 }
388 }
389
390 if (s == NATS_OK)
391 {
392 nle->socket = socket;
393 nle->events = UV_READABLE;
394
395 if (sched)
396 s = uvScheduleToEventLoop(nle, NATS_LIBUV_ATTACH, true);
397 else
398 s = uvAsyncAttach(nle);
399 }
400
401 if (s == NATS_OK)
402 *userData = (void*) nle;
403 else
404 natsLibuv_Detach((void*) nle);
405
406 return s;
407}
408
418natsLibuv_Read(void *userData, bool add)
419{
420 natsLibuvEvents *nle = (natsLibuvEvents*) userData;
422 bool sched;
423
424 sched = ((uv_key_get(&uvLoopThreadKey) != nle->loop) ? true : false);
425
426 // If this call is made from a different thread than the event loop's
427 // thread, or if there are already scheduled events, then schedule
428 // this new event.
429
430 // We don't need to get the lock for nle->head because if sched is
431 // false, we are in the event loop thread, which is the thread removing
432 // events from the list. Also, all calls to the read/write/etc.. callbacks
433 // are protected by the connection's lock in the NATS library.
434 if (sched || (nle->head != NULL))
435 s = uvScheduleToEventLoop(nle, NATS_LIBUV_READ, add);
436 else
437 s = uvPollUpdate(nle, NATS_LIBUV_READ, add);
438
439 return s;
440}
441
451natsLibuv_Write(void *userData, bool add)
452{
453 natsLibuvEvents *nle = (natsLibuvEvents*) userData;
455 bool sched;
456
457 sched = ((uv_key_get(&uvLoopThreadKey) != nle->loop) ? true : false);
458
459 // See comment in natsLibuvRead
460 if (sched || (nle->head != NULL))
461 s = uvScheduleToEventLoop(nle, NATS_LIBUV_WRITE, add);
462 else
463 s = uvPollUpdate(nle, NATS_LIBUV_WRITE, add);
464
465 return s;
466}
467
477natsLibuv_Detach(void *userData)
478{
479 natsLibuvEvents *nle = (natsLibuvEvents*) userData;
481 bool sched;
482
483 sched = ((uv_key_get(&uvLoopThreadKey) != nle->loop) ? true : false);
484
485 // See comment in natsLibuvRead
486 if (sched || (nle->head != NULL))
487 s = uvScheduleToEventLoop(nle, NATS_LIBUV_DETACH, true);
488 else
489 uvAsyncDetach(nle);
490
491 return s;
492}
493
// end of libuvFunctions
495
496#ifdef __cplusplus
497}
498#endif
499
500#endif /* LIBUV_H_ */
NATS_EXTERN void natsConnection_Close(natsConnection *nc)
Closes the connection.
NATS_EXTERN void natsConnection_ProcessCloseEvent(natsSock *socket)
Process a socket close event when using external event loop.
NATS_EXTERN void natsConnection_ProcessReadEvent(natsConnection *nc)
Process a read event when using external event loop.
NATS_EXTERN void natsConnection_ProcessWriteEvent(natsConnection *nc)
Process a write event when using external event loop.
void natsLibuv_SetThreadLocalLoop(uv_loop_t *loop)
Register the event loop with the thread running uv_run().
Definition libuv.h:104
void natsLibuv_Init(void)
Initialize the adapter.
Definition libuv.h:90
natsStatus natsLibuv_Read(void *userData, bool add)
Start or stop polling on READ events.
Definition libuv.h:418
natsStatus natsLibuv_Attach(void **userData, void *loop, natsConnection *nc, natsSock socket)
Attach a connection to the given event loop.
Definition libuv.h:343
natsStatus natsLibuv_Detach(void *userData)
The connection is closed, it can be safely detached.
Definition libuv.h:477
natsStatus natsLibuv_Write(void *userData, bool add)
Start or stop polling on WRITE events.
Definition libuv.h:451
struct __natsConnection natsConnection
A connection to a NATS Server.
Definition nats.h:160
int natsSock
Definition nats.h:58
natsStatus
Status returned by most of the APIs.
Definition status.h:50
@ NATS_ERR
Generic error.
Definition status.h:53
@ NATS_NO_MEMORY
Definition status.h:102
@ NATS_ILLEGAL_STATE
Definition status.h:88
@ NATS_OK
Success.
Definition status.h:51