NATS C Client with JetStream and Streaming support  3.9.1
The nats.io C Client, Supported by Synadia Communications Inc.
libuv.h
Go to the documentation of this file.
1 // Copyright 2016-2018 The NATS Authors
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 #ifndef LIBUV_H_
15 #define LIBUV_H_
16 
17 #ifdef __cplusplus
18 extern "C" {
19 #endif
20 
24 #ifndef _GNU_SOURCE
25 #define _GNU_SOURCE
26 #endif
27 
28 #define NATS_LIBUV_INCLUDE
29 
30 #include <uv.h>
31 #include "../nats.h"
32 
33 #define NATS_LIBUV_ATTACH (1)
34 #define NATS_LIBUV_READ (2)
35 #define NATS_LIBUV_WRITE (3)
36 #define NATS_LIBUV_DETACH (4)
37 
38 struct __natsLibuvEvent;
39 
40 typedef struct __natsLibuvEvent
41 {
42  int type;
43  bool add;
44  struct __natsLibuvEvent *next;
45 
46 } natsLibuvEvent;
47 
48 typedef struct
49 {
50  natsConnection *nc;
51  uv_loop_t *loop;
52  uv_poll_t *handle;
53  uv_async_t *scheduler;
54  int events;
55  natsSock socket;
56  uv_mutex_t *lock;
57  natsLibuvEvent *head;
58  natsLibuvEvent *tail;
59 
60 } natsLibuvEvents;
61 
62 // Forward declarations
63 natsStatus natsLibuv_Detach(void *userData);
64 
69 static uv_once_t uvOnce = UV_ONCE_INIT;
70 static uv_key_t uvLoopThreadKey;
71 
72 static void
73 _initOnce(void)
74 {
75  if (uv_key_create(&uvLoopThreadKey) != 0)
76  abort();
77 }
78 
89 void
91 {
92  uv_once(&uvOnce, _initOnce);
93 }
94 
103 void
105 {
106  uv_key_set(&uvLoopThreadKey, (void*) loop);
107 }
108 
109 static natsStatus
110 uvScheduleToEventLoop(natsLibuvEvents *nle, int eventType, bool add)
111 {
112  natsLibuvEvent *newEvent = NULL;
113  int res;
114 
115  newEvent = (natsLibuvEvent*) malloc(sizeof(natsLibuvEvent));
116  if (newEvent == NULL)
117  return NATS_NO_MEMORY;
118 
119  newEvent->type = eventType;
120  newEvent->add = add;
121  newEvent->next = NULL;
122 
123  uv_mutex_lock(nle->lock);
124 
125  if (nle->head == NULL)
126  nle->head = newEvent;
127 
128  if (nle->tail != NULL)
129  nle->tail->next = newEvent;
130 
131  nle->tail = newEvent;
132 
133  uv_mutex_unlock(nle->lock);
134 
135  res = uv_async_send(nle->scheduler);
136 
137  return (res == 0 ? NATS_OK : NATS_ERR);
138 }
139 
140 static void
141 natsLibuvPoll(uv_poll_t* handle, int status, int events)
142 {
143  natsLibuvEvents *nle = (natsLibuvEvents*)handle->data;
144 
145  if (status != 0)
146  {
147  // There was an error, try to process as a read event.
148  // If we had an issue with the socket, this will cause
149  // an auto-reconnect.
151  return;
152  }
153 
154  if (events & UV_READABLE)
156 
157  if (events & UV_WRITABLE)
159 }
160 
161 static natsStatus
162 uvPollUpdate(natsLibuvEvents *nle, int eventType, bool add)
163 {
164  int res;
165 
166  if (eventType == NATS_LIBUV_READ)
167  {
168  if (add)
169  nle->events |= UV_READABLE;
170  else
171  nle->events &= ~UV_READABLE;
172  }
173  else
174  {
175  if (add)
176  nle->events |= UV_WRITABLE;
177  else
178  nle->events &= ~UV_WRITABLE;
179  }
180 
181  if (nle->events)
182  res = uv_poll_start(nle->handle, nle->events, natsLibuvPoll);
183  else
184  res = uv_poll_stop(nle->handle);
185 
186  if (res != 0)
187  return NATS_ERR;
188 
189  return NATS_OK;
190 }
191 
192 static void
193 uvHandleClosedCb(uv_handle_t *handle)
194 {
195  free(handle);
196 }
197 
198 static natsStatus
199 uvAsyncAttach(natsLibuvEvents *nle)
200 {
201  natsStatus s = NATS_OK;
202 
203  // We are reconnecting, destroy the old handle, create a new one
204  if (nle->handle != NULL)
205  {
206  uv_close((uv_handle_t*) nle->handle, uvHandleClosedCb);
207  nle->handle = NULL;
208  }
209 
210  nle->handle = (uv_poll_t*) malloc(sizeof(uv_poll_t));
211  if (nle->handle == NULL)
212  s = NATS_NO_MEMORY;
213 
214  if (s == NATS_OK)
215  {
216 #if UV_VERSION_MAJOR <= 1
217  if (uv_poll_init_socket(nle->loop, nle->handle, nle->socket) != 0)
218 #else
219  if (uv_poll_init(nle->loop, nle->handle, nle->socket) != 0)
220 #endif
221  s = NATS_ERR;
222  }
223 
224  if ((s == NATS_OK)
225  && (nle->handle->data = (void*) nle)
226  && (uv_poll_start(nle->handle, UV_READABLE, natsLibuvPoll) != 0))
227  {
228  s = NATS_ERR;
229  }
230 
231  return s;
232 }
233 
234 static void
235 finalCloseCb(uv_handle_t* handle)
236 {
237  natsLibuvEvents *nle = (natsLibuvEvents*)handle->data;
238  natsLibuvEvent *event;
239 
240  while ((event = nle->head) != NULL)
241  {
242  nle->head = event->next;
243  free(event);
244  }
245  free(nle->handle);
246  free(nle->scheduler);
247  uv_mutex_destroy(nle->lock);
248  free(nle->lock);
249  free(nle);
250 }
251 
252 static void
253 closeSchedulerCb(uv_handle_t* scheduler)
254 {
255  natsLibuvEvents *nle = (natsLibuvEvents*) scheduler->data;
256 
257  uv_close((uv_handle_t*) nle->handle, finalCloseCb);
258 }
259 
260 static void
261 uvAsyncDetach(natsLibuvEvents *nle)
262 {
263  uv_close((uv_handle_t*) nle->scheduler, closeSchedulerCb);
264 }
265 
266 static void
267 uvAsyncCb(uv_async_t *handle)
268 {
269  natsLibuvEvents *nle = (natsLibuvEvents*) handle->data;
270  natsStatus s = NATS_OK;
271  natsLibuvEvent *event = NULL;
272  bool more = false;
273 
274  while (1)
275  {
276  uv_mutex_lock(nle->lock);
277 
278  event = nle->head;
279  if (event == NULL)
280  {
281  // This is possible, even on entry of this function because
282  // the callback is called when the handle is initialized.
283  uv_mutex_unlock(nle->lock);
284  return;
285  }
286 
287  nle->head = event->next;
288  if (event == nle->tail)
289  nle->tail = NULL;
290 
291  more = (nle->head != NULL ? true : false);
292 
293  uv_mutex_unlock(nle->lock);
294 
295  switch (event->type)
296  {
297  case NATS_LIBUV_ATTACH:
298  {
299  s = uvAsyncAttach(nle);
300  break;
301  }
302  case NATS_LIBUV_READ:
303  case NATS_LIBUV_WRITE:
304  {
305  s = uvPollUpdate(nle, event->type, event->add);
306  break;
307  }
308  case NATS_LIBUV_DETACH:
309  {
310  uvAsyncDetach(nle);
311  break;
312  }
313  default:
314  {
315  s = NATS_ERR;
316  break;
317  }
318  }
319 
320  free(event);
321 
322  if ((s != NATS_OK) || !more)
323  break;
324  }
325 
326  if (s != NATS_OK)
327  natsConnection_Close(nle->nc);
328 }
329 
343 natsLibuv_Attach(void **userData, void *loop, natsConnection *nc, natsSock socket)
344 {
345  uv_loop_t *uvLoop = (uv_loop_t*) loop;
346  bool sched = false;
347  natsLibuvEvents *nle = (natsLibuvEvents*) (*userData);
348  natsStatus s = NATS_OK;
349 
350  sched = ((uv_key_get(&uvLoopThreadKey) != loop) ? true : false);
351 
352  // This is the first attach (when reconnecting, nle will be non-NULL).
353  if (nle == NULL)
354  {
355  // This has to run from the event loop!
356  if (sched)
357  return NATS_ILLEGAL_STATE;
358 
359  nle = (natsLibuvEvents*) calloc(1, sizeof(natsLibuvEvents));
360  if (nle == NULL)
361  return NATS_NO_MEMORY;
362 
363  nle->lock = (uv_mutex_t*) malloc(sizeof(uv_mutex_t));
364  if (nle->lock == NULL)
365  s = NATS_NO_MEMORY;
366 
367  if ((s == NATS_OK) && (uv_mutex_init(nle->lock) != 0))
368  s = NATS_ERR;
369 
370  if ((s == NATS_OK)
371  && ((nle->scheduler = (uv_async_t*) malloc(sizeof(uv_async_t))) == NULL))
372  {
373  s = NATS_NO_MEMORY;
374  }
375 
376  if ((s == NATS_OK)
377  && (uv_async_init(uvLoop, nle->scheduler, uvAsyncCb) != 0))
378  {
379  s = NATS_ERR;
380  }
381 
382  if (s == NATS_OK)
383  {
384  nle->nc = nc;
385  nle->loop = uvLoop;
386  nle->scheduler->data = (void*) nle;
387  }
388  }
389 
390  if (s == NATS_OK)
391  {
392  nle->socket = socket;
393  nle->events = UV_READABLE;
394 
395  if (sched)
396  s = uvScheduleToEventLoop(nle, NATS_LIBUV_ATTACH, true);
397  else
398  s = uvAsyncAttach(nle);
399  }
400 
401  if (s == NATS_OK)
402  *userData = (void*) nle;
403  else
404  natsLibuv_Detach((void*) nle);
405 
406  return s;
407 }
408 
418 natsLibuv_Read(void *userData, bool add)
419 {
420  natsLibuvEvents *nle = (natsLibuvEvents*) userData;
421  natsStatus s = NATS_OK;
422  bool sched;
423 
424  sched = ((uv_key_get(&uvLoopThreadKey) != nle->loop) ? true : false);
425 
426  // If this call is made from a different thread than the event loop's
427  // thread, or if there are already scheduled events, then schedule
428  // this new event.
429 
430  // We don't need to get the lock for nle->head because if sched is
431  // false, we are in the event loop thread, which is the thread removing
432  // events from the list. Also, all calls to the read/write/etc.. callbacks
433  // are protected by the connection's lock in the NATS library.
434  if (sched || (nle->head != NULL))
435  s = uvScheduleToEventLoop(nle, NATS_LIBUV_READ, add);
436  else
437  s = uvPollUpdate(nle, NATS_LIBUV_READ, add);
438 
439  return s;
440 }
441 
451 natsLibuv_Write(void *userData, bool add)
452 {
453  natsLibuvEvents *nle = (natsLibuvEvents*) userData;
454  natsStatus s = NATS_OK;
455  bool sched;
456 
457  sched = ((uv_key_get(&uvLoopThreadKey) != nle->loop) ? true : false);
458 
459  // See comment in natsLibuvRead
460  if (sched || (nle->head != NULL))
461  s = uvScheduleToEventLoop(nle, NATS_LIBUV_WRITE, add);
462  else
463  s = uvPollUpdate(nle, NATS_LIBUV_WRITE, add);
464 
465  return s;
466 }
467 
477 natsLibuv_Detach(void *userData)
478 {
479  natsLibuvEvents *nle = (natsLibuvEvents*) userData;
480  natsStatus s = NATS_OK;
481  bool sched;
482 
483  sched = ((uv_key_get(&uvLoopThreadKey) != nle->loop) ? true : false);
484 
485  // See comment in natsLibuvRead
486  if (sched || (nle->head != NULL))
487  s = uvScheduleToEventLoop(nle, NATS_LIBUV_DETACH, true);
488  else
489  uvAsyncDetach(nle);
490 
491  return s;
492 }
493  // end of libuvFunctions
495 
496 #ifdef __cplusplus
497 }
498 #endif
499 
500 #endif /* LIBUV_H_ */
NATS_EXTERN void natsConnection_Close(natsConnection *nc)
Closes the connection.
NATS_EXTERN void natsConnection_ProcessReadEvent(natsConnection *nc)
Process a read event when using external event loop.
NATS_EXTERN void natsConnection_ProcessWriteEvent(natsConnection *nc)
Process a write event when using external event loop.
void natsLibuv_SetThreadLocalLoop(uv_loop_t *loop)
Register the event loop with the thread running uv_run().
Definition: libuv.h:104
void natsLibuv_Init(void)
Initialize the adapter.
Definition: libuv.h:90
natsStatus natsLibuv_Read(void *userData, bool add)
Start or stop polling on READ events.
Definition: libuv.h:418
natsStatus natsLibuv_Attach(void **userData, void *loop, natsConnection *nc, natsSock socket)
Attach a connection to the given event loop.
Definition: libuv.h:343
natsStatus natsLibuv_Detach(void *userData)
The connection is closed, it can be safely detached.
Definition: libuv.h:477
natsStatus natsLibuv_Write(void *userData, bool add)
Start or stop polling on WRITE events.
Definition: libuv.h:451
struct __natsConnection natsConnection
A connection to a NATS Server.
Definition: nats.h:152
int natsSock
Definition: nats.h:50
natsStatus
Status returned by most of the APIs.
Definition: status.h:50
@ NATS_ERR
Generic error.
Definition: status.h:53
@ NATS_NO_MEMORY
Definition: status.h:102
@ NATS_ILLEGAL_STATE
Definition: status.h:88
@ NATS_OK
Success.
Definition: status.h:51