NATS C Client with JetStream and Streaming support  3.9.1
The nats.io C Client, Supported by Synadia Communications Inc.
libevent.h
Go to the documentation of this file.
1 // Copyright 2016-2018 The NATS Authors
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 #ifndef LIBEVENT_H_
15 #define LIBEVENT_H_
16 
17 #ifdef __cplusplus
18 extern "C" {
19 #endif
20 
24 #include <event.h>
25 #include <event2/thread.h>
26 #include "../nats.h"
27 
28 typedef struct
29 {
30  natsConnection *nc;
31  struct event_base *loop;
32  struct event *read;
33  struct event *write;
34  struct event *keepActive;
35 
36 } natsLibeventEvents;
37 
38 // Forward declarations
39 natsStatus natsLibevent_Read(void *userData, bool add);
40 natsStatus natsLibevent_Detach(void *userData);
41 
57 void
59 {
60 #if _WIN32
61  evthread_use_windows_threads();
62 #else
63  evthread_use_pthreads();
64 #endif
65 }
66 
67 static void
68 natsLibevent_ProcessEvent(evutil_socket_t fd, short event, void *arg)
69 {
70  natsLibeventEvents *nle = (natsLibeventEvents*) arg;
71 
72  if (event & EV_READ)
74 
75  if (event & EV_WRITE)
77 }
78 
79 static void
80 keepAliveCb(evutil_socket_t fd, short flags, void * arg)
81 {
82  // do nothing...
83 }
84 
98 natsLibevent_Attach(void **userData, void *loop, natsConnection *nc, natsSock socket)
99 {
100  struct event_base *libeventLoop = (struct event_base*) loop;
101  natsLibeventEvents *nle = (natsLibeventEvents*) (*userData);
102  natsStatus s = NATS_OK;
103 
104  // This is the first attach (when reconnecting, nle will be non-NULL).
105  if (nle == NULL)
106  {
107  nle = (natsLibeventEvents*) calloc(1, sizeof(natsLibeventEvents));
108  if (nle == NULL)
109  return NATS_NO_MEMORY;
110 
111  nle->nc = nc;
112  nle->loop = libeventLoop;
113 
114  nle->keepActive = event_new(nle->loop, -1, EV_PERSIST, keepAliveCb, NULL);
115  if (nle->keepActive == NULL)
116  s = NATS_NO_MEMORY;
117 
118  if (s == NATS_OK)
119  {
120  struct timeval timeout;
121 
122  timeout.tv_sec = 100000;
123  timeout.tv_usec = 0;
124 
125  if (event_add(nle->keepActive, &timeout) != 0)
126  s = NATS_ERR;
127  }
128  }
129  else
130  {
131  if (nle->read != NULL)
132  {
133  event_free(nle->read);
134  nle->read = NULL;
135  }
136  if (nle->write != NULL)
137  {
138  event_free(nle->write);
139  nle->write = NULL;
140  }
141  }
142 
143  if (s == NATS_OK)
144  {
145  nle->read = event_new(nle->loop, socket, EV_READ|EV_PERSIST,
146  natsLibevent_ProcessEvent, (void*) nle);
147  natsLibevent_Read((void*) nle, true);
148 
149  nle->write = event_new(nle->loop, socket, EV_WRITE|EV_PERSIST,
150  natsLibevent_ProcessEvent, (void*) nle);
151  }
152 
153  if (s == NATS_OK)
154  *userData = (void*) nle;
155  else
156  natsLibevent_Detach((void*) nle);
157 
158  return s;
159 }
160 
170 natsLibevent_Read(void *userData, bool add)
171 {
172  natsLibeventEvents *nle = (natsLibeventEvents*) userData;
173  int res;
174 
175  if (add)
176  res = event_add(nle->read, NULL);
177  else
178  res = event_del_noblock(nle->read);
179 
180  return (res == 0 ? NATS_OK : NATS_ERR);
181 }
182 
192 natsLibevent_Write(void *userData, bool add)
193 {
194  natsLibeventEvents *nle = (natsLibeventEvents*) userData;
195  int res;
196 
197  if (add)
198  res = event_add(nle->write, NULL);
199  else
200  res = event_del_noblock(nle->write);
201 
202  return (res == 0 ? NATS_OK : NATS_ERR);
203 }
204 
214 natsLibevent_Detach(void *userData)
215 {
216  natsLibeventEvents *nle = (natsLibeventEvents*) userData;
217 
218  if (nle->read != NULL)
219  event_free(nle->read);
220  if (nle->write != NULL)
221  event_free(nle->write);
222  if (nle->keepActive != NULL)
223  {
224  event_active(nle->keepActive, 0, 0);
225  event_free(nle->keepActive);
226  }
227 
228  free(nle);
229 
230  return NATS_OK;
231 }
232  // end of libeventFunctions
234 
235 #ifdef __cplusplus
236 }
237 #endif
238 
239 #endif /* LIBEVENT_H_ */
NATS_EXTERN void natsConnection_ProcessReadEvent(natsConnection *nc)
Process a read event when using external event loop.
NATS_EXTERN void natsConnection_ProcessWriteEvent(natsConnection *nc)
Process a write event when using external event loop.
void natsLibevent_Init(void)
Initialize the adapter.
Definition: libevent.h:58
natsStatus natsLibevent_Attach(void **userData, void *loop, natsConnection *nc, natsSock socket)
Attach a connection to the given event loop.
Definition: libevent.h:98
natsStatus natsLibevent_Write(void *userData, bool add)
Start or stop polling on WRITE events.
Definition: libevent.h:192
natsStatus natsLibevent_Read(void *userData, bool add)
Start or stop polling on READ events.
Definition: libevent.h:170
natsStatus natsLibevent_Detach(void *userData)
The connection is closed, it can be safely detached.
Definition: libevent.h:214
struct __natsConnection natsConnection
A connection to a NATS Server.
Definition: nats.h:152
int natsSock
Definition: nats.h:50
natsStatus
Status returned by most of the APIs.
Definition: status.h:50
@ NATS_ERR
Generic error.
Definition: status.h:53
@ NATS_NO_MEMORY
Definition: status.h:102
@ NATS_OK
Success.
Definition: status.h:51