NATS C Client with JetStream and Streaming support  3.8.0
The nats.io C Client, Supported by Synadia Communications Inc.
Loading...
Searching...
No Matches
libuv.h
Go to the documentation of this file.
1// Copyright 2016-2018 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14#ifndef LIBUV_H_
15#define LIBUV_H_
16
17#ifdef __cplusplus
18extern "C" {
19#endif
20
24#ifndef _GNU_SOURCE
25#define _GNU_SOURCE
26#endif
27
28#define NATS_LIBUV_INCLUDE
29
30#include <uv.h>
31#include "../nats.h"
32
33#define NATS_LIBUV_ATTACH (1)
34#define NATS_LIBUV_READ (2)
35#define NATS_LIBUV_WRITE (3)
36#define NATS_LIBUV_DETACH (4)
37
38struct __natsLibuvEvent;
39
40typedef struct __natsLibuvEvent
41{
42 int type;
43 bool add;
44 struct __natsLibuvEvent *next;
45
46} natsLibuvEvent;
47
48typedef struct
49{
51 uv_loop_t *loop;
52 uv_poll_t *handle;
53 uv_async_t *scheduler;
54 int events;
55 natsSock socket;
56 uv_mutex_t *lock;
57 natsLibuvEvent *head;
58 natsLibuvEvent *tail;
59
60} natsLibuvEvents;
61
62// Forward declarations
63natsStatus natsLibuv_Detach(void *userData);
64
69static uv_once_t uvOnce = UV_ONCE_INIT;
70static uv_key_t uvLoopThreadKey;
71
72static void
73_initOnce(void)
74{
75 if (uv_key_create(&uvLoopThreadKey) != 0)
76 abort();
77}
78
89void
91{
92 uv_once(&uvOnce, _initOnce);
93}
94
103void
105{
106 uv_key_set(&uvLoopThreadKey, (void*) loop);
107}
108
109static natsStatus
110uvScheduleToEventLoop(natsLibuvEvents *nle, int eventType, bool add)
111{
112 natsLibuvEvent *newEvent = NULL;
113 int res;
114
115 newEvent = (natsLibuvEvent*) malloc(sizeof(natsLibuvEvent));
116 if (newEvent == NULL)
117 return NATS_NO_MEMORY;
118
119 newEvent->type = eventType;
120 newEvent->add = add;
121 newEvent->next = NULL;
122
123 uv_mutex_lock(nle->lock);
124
125 if (nle->head == NULL)
126 nle->head = newEvent;
127
128 if (nle->tail != NULL)
129 nle->tail->next = newEvent;
130
131 nle->tail = newEvent;
132
133 uv_mutex_unlock(nle->lock);
134
135 res = uv_async_send(nle->scheduler);
136
137 return (res == 0 ? NATS_OK : NATS_ERR);
138}
139
140static void
141natsLibuvPoll(uv_poll_t* handle, int status, int events)
142{
143 natsLibuvEvents *nle = (natsLibuvEvents*)handle->data;
144
145 if (status != 0)
146 {
147 // There was an error, try to process as a read event.
148 // If we had an issue with the socket, this will cause
149 // an auto-reconnect.
151 return;
152 }
153
154 if (events & UV_READABLE)
156
157 if (events & UV_WRITABLE)
159}
160
161static natsStatus
162uvPollUpdate(natsLibuvEvents *nle, int eventType, bool add)
163{
164 int res;
165
166 if (eventType == NATS_LIBUV_READ)
167 {
168 if (add)
169 nle->events |= UV_READABLE;
170 else
171 nle->events &= ~UV_READABLE;
172 }
173 else
174 {
175 if (add)
176 nle->events |= UV_WRITABLE;
177 else
178 nle->events &= ~UV_WRITABLE;
179 }
180
181 if (nle->events)
182 res = uv_poll_start(nle->handle, nle->events, natsLibuvPoll);
183 else
184 res = uv_poll_stop(nle->handle);
185
186 if (res != 0)
187 return NATS_ERR;
188
189 return NATS_OK;
190}
191
192static void
193uvHandleClosedCb(uv_handle_t *handle)
194{
195 free(handle);
196}
197
198static natsStatus
199uvAsyncAttach(natsLibuvEvents *nle)
200{
202
203 // We are reconnecting, destroy the old handle, create a new one
204 if (nle->handle != NULL)
205 {
206 uv_close((uv_handle_t*) nle->handle, uvHandleClosedCb);
207 nle->handle = NULL;
208 }
209
210 nle->handle = (uv_poll_t*) malloc(sizeof(uv_poll_t));
211 if (nle->handle == NULL)
212 s = NATS_NO_MEMORY;
213
214 if (s == NATS_OK)
215 {
216#if UV_VERSION_MAJOR <= 1
217 if (uv_poll_init_socket(nle->loop, nle->handle, nle->socket) != 0)
218#else
219 if (uv_poll_init(nle->loop, nle->handle, nle->socket) != 0)
220#endif
221 s = NATS_ERR;
222 }
223
224 if ((s == NATS_OK)
225 && (nle->handle->data = (void*) nle)
226 && (uv_poll_start(nle->handle, UV_READABLE, natsLibuvPoll) != 0))
227 {
228 s = NATS_ERR;
229 }
230
231 return s;
232}
233
234static void
235finalCloseCb(uv_handle_t* handle)
236{
237 natsLibuvEvents *nle = (natsLibuvEvents*)handle->data;
238 natsLibuvEvent *event;
239
240 while ((event = nle->head) != NULL)
241 {
242 nle->head = event->next;
243 free(event);
244 }
245 free(nle->handle);
246 free(nle->scheduler);
247 uv_mutex_destroy(nle->lock);
248 free(nle->lock);
249 free(nle);
250}
251
252static void
253closeSchedulerCb(uv_handle_t* scheduler)
254{
255 natsLibuvEvents *nle = (natsLibuvEvents*) scheduler->data;
256
257 uv_close((uv_handle_t*) nle->handle, finalCloseCb);
258}
259
260static void
261uvAsyncDetach(natsLibuvEvents *nle)
262{
263 uv_close((uv_handle_t*) nle->scheduler, closeSchedulerCb);
264}
265
266static void
267uvAsyncCb(uv_async_t *handle)
268{
269 natsLibuvEvents *nle = (natsLibuvEvents*) handle->data;
271 natsLibuvEvent *event = NULL;
272 bool more = false;
273
274 while (1)
275 {
276 uv_mutex_lock(nle->lock);
277
278 event = nle->head;
279 if (event == NULL)
280 {
281 // This is possible, even on entry of this function because
282 // the callback is called when the handle is initialized.
283 uv_mutex_unlock(nle->lock);
284 return;
285 }
286
287 nle->head = event->next;
288 if (event == nle->tail)
289 nle->tail = NULL;
290
291 more = (nle->head != NULL ? true : false);
292
293 uv_mutex_unlock(nle->lock);
294
295 switch (event->type)
296 {
297 case NATS_LIBUV_ATTACH:
298 {
299 s = uvAsyncAttach(nle);
300 break;
301 }
302 case NATS_LIBUV_READ:
303 case NATS_LIBUV_WRITE:
304 {
305 s = uvPollUpdate(nle, event->type, event->add);
306 break;
307 }
308 case NATS_LIBUV_DETACH:
309 {
310 uvAsyncDetach(nle);
311 break;
312 }
313 default:
314 {
315 s = NATS_ERR;
316 break;
317 }
318 }
319
320 free(event);
321
322 if ((s != NATS_OK) || !more)
323 break;
324 }
325
326 if (s != NATS_OK)
327 natsConnection_Close(nle->nc);
328}
329
343natsLibuv_Attach(void **userData, void *loop, natsConnection *nc, natsSock socket)
344{
345 uv_loop_t *uvLoop = (uv_loop_t*) loop;
346 bool sched = false;
347 natsLibuvEvents *nle = (natsLibuvEvents*) (*userData);
349
350 sched = ((uv_key_get(&uvLoopThreadKey) != loop) ? true : false);
351
352 // This is the first attach (when reconnecting, nle will be non-NULL).
353 if (nle == NULL)
354 {
355 // This has to run from the event loop!
356 if (sched)
357 return NATS_ILLEGAL_STATE;
358
359 nle = (natsLibuvEvents*) calloc(1, sizeof(natsLibuvEvents));
360 if (nle == NULL)
361 return NATS_NO_MEMORY;
362
363 nle->lock = (uv_mutex_t*) malloc(sizeof(uv_mutex_t));
364 if (nle->lock == NULL)
365 s = NATS_NO_MEMORY;
366
367 if ((s == NATS_OK) && (uv_mutex_init(nle->lock) != 0))
368 s = NATS_ERR;
369
370 if ((s == NATS_OK)
371 && ((nle->scheduler = (uv_async_t*) malloc(sizeof(uv_async_t))) == NULL))
372 {
373 s = NATS_NO_MEMORY;
374 }
375
376 if ((s == NATS_OK)
377 && (uv_async_init(uvLoop, nle->scheduler, uvAsyncCb) != 0))
378 {
379 s = NATS_ERR;
380 }
381
382 if (s == NATS_OK)
383 {
384 nle->nc = nc;
385 nle->loop = uvLoop;
386 nle->scheduler->data = (void*) nle;
387 }
388 }
389
390 if (s == NATS_OK)
391 {
392 nle->socket = socket;
393 nle->events = UV_READABLE;
394
395 if (sched)
396 s = uvScheduleToEventLoop(nle, NATS_LIBUV_ATTACH, true);
397 else
398 s = uvAsyncAttach(nle);
399 }
400
401 if (s == NATS_OK)
402 *userData = (void*) nle;
403 else
404 natsLibuv_Detach((void*) nle);
405
406 return s;
407}
408
418natsLibuv_Read(void *userData, bool add)
419{
420 natsLibuvEvents *nle = (natsLibuvEvents*) userData;
422 bool sched;
423
424 sched = ((uv_key_get(&uvLoopThreadKey) != nle->loop) ? true : false);
425
426 // If this call is made from a different thread than the event loop's
427 // thread, or if there are already scheduled events, then schedule
428 // this new event.
429
430 // We don't need to get the lock for nle->head because if sched is
431 // false, we are in the event loop thread, which is the thread removing
432 // events from the list. Also, all calls to the read/write/etc.. callbacks
433 // are protected by the connection's lock in the NATS library.
434 if (sched || (nle->head != NULL))
435 s = uvScheduleToEventLoop(nle, NATS_LIBUV_READ, add);
436 else
437 s = uvPollUpdate(nle, NATS_LIBUV_READ, add);
438
439 return s;
440}
441
451natsLibuv_Write(void *userData, bool add)
452{
453 natsLibuvEvents *nle = (natsLibuvEvents*) userData;
455 bool sched;
456
457 sched = ((uv_key_get(&uvLoopThreadKey) != nle->loop) ? true : false);
458
459 // See comment in natsLibuvRead
460 if (sched || (nle->head != NULL))
461 s = uvScheduleToEventLoop(nle, NATS_LIBUV_WRITE, add);
462 else
463 s = uvPollUpdate(nle, NATS_LIBUV_WRITE, add);
464
465 return s;
466}
467
477natsLibuv_Detach(void *userData)
478{
479 natsLibuvEvents *nle = (natsLibuvEvents*) userData;
481 bool sched;
482
483 sched = ((uv_key_get(&uvLoopThreadKey) != nle->loop) ? true : false);
484
485 // See comment in natsLibuvRead
486 if (sched || (nle->head != NULL))
487 s = uvScheduleToEventLoop(nle, NATS_LIBUV_DETACH, true);
488 else
489 uvAsyncDetach(nle);
490
491 return s;
492}
493
494 // end of libuvFunctions
495
496#ifdef __cplusplus
497}
498#endif
499
500#endif /* LIBUV_H_ */
NATS_EXTERN void natsConnection_Close(natsConnection *nc)
Closes the connection.
NATS_EXTERN void natsConnection_ProcessReadEvent(natsConnection *nc)
Process a read event when using external event loop.
NATS_EXTERN void natsConnection_ProcessWriteEvent(natsConnection *nc)
Process a write event when using external event loop.
void natsLibuv_SetThreadLocalLoop(uv_loop_t *loop)
Register the event loop with the thread running uv_run().
Definition libuv.h:104
void natsLibuv_Init(void)
Initialize the adapter.
Definition libuv.h:90
natsStatus natsLibuv_Read(void *userData, bool add)
Start or stop polling on READ events.
Definition libuv.h:418
natsStatus natsLibuv_Attach(void **userData, void *loop, natsConnection *nc, natsSock socket)
Attach a connection to the given event loop.
Definition libuv.h:343
natsStatus natsLibuv_Detach(void *userData)
The connection is closed, it can be safely detached.
Definition libuv.h:477
natsStatus natsLibuv_Write(void *userData, bool add)
Start or stop polling on WRITE events.
Definition libuv.h:451
struct __natsConnection natsConnection
A connection to a NATS Server.
Definition nats.h:152
int natsSock
Definition nats.h:50
natsStatus
Status returned by most of the APIs.
Definition status.h:50
@ NATS_ERR
Generic error.
Definition status.h:53
@ NATS_NO_MEMORY
Definition status.h:102
@ NATS_ILLEGAL_STATE
Definition status.h:88
@ NATS_OK
Success.
Definition status.h:51