NATS C Client with JetStream and Streaming support  3.11.0
The nats.io C Client, Supported by Synadia Communications Inc.
Loading...
Searching...
No Matches
libevent.h
Go to the documentation of this file.
1// Copyright 2016-2018 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14#ifndef LIBEVENT_H_
15#define LIBEVENT_H_
16
17#ifdef __cplusplus
18extern "C" {
19#endif
20
24#include <event.h>
25#include <event2/thread.h>
26#include "../nats.h"
27
28typedef struct
29{
31 struct event_base *loop;
32 struct event *read;
33 struct event *write;
34 struct event *keepActive;
35 natsSock socketToClose;
36
37} natsLibeventEvents;
38
39// Forward declarations
40natsStatus natsLibevent_Read(void *userData, bool add);
41natsStatus natsLibevent_Detach(void *userData);
42
58void
60{
61#if _WIN32
62 evthread_use_windows_threads();
63#else
64 evthread_use_pthreads();
65#endif
66}
67
68static void
69natsLibevent_ProcessEvent(evutil_socket_t fd, short event, void *arg)
70{
71 natsLibeventEvents *nle = (natsLibeventEvents*) arg;
72
73 if (event & EV_READ)
75
76 if (event & EV_WRITE)
78}
79
80static void
81keepAliveCb(evutil_socket_t fd, short flags, void * arg)
82{
83 // do nothing...
84}
85
86static void
87natsLibeventEvents_free(natsLibeventEvents *nle, bool processDetachedEvents)
88{
89 if (nle->read != NULL)
90 event_free(nle->read);
91 if (nle->write != NULL)
92 event_free(nle->write);
93 if (nle->keepActive != NULL)
94 {
95 event_active(nle->keepActive, 0, 0);
96 event_free(nle->keepActive);
97 }
98 if (processDetachedEvents)
100 free(nle);
101}
102
103// This callback is invoked from the event loop thread and will free the
104// `natsLibeventEvents` object.
105static void
106_freeCb(evutil_socket_t ignoredSocket, short ignoredEvent, void *arg)
107{
108 natsLibeventEvents *nle = (natsLibeventEvents*) arg;
109 natsLibeventEvents_free(nle, true);
110}
111
125natsLibevent_Attach(void **userData, void *loop, natsConnection *nc, natsSock socket)
126{
127 struct event_base *libeventLoop = (struct event_base*) loop;
128 natsLibeventEvents *nle = (natsLibeventEvents*) (*userData);
130 bool created = false;
131
132 // This is the first attach (when reconnecting, nle will be non-NULL).
133 if (nle == NULL)
134 {
135 nle = (natsLibeventEvents*) calloc(1, sizeof(natsLibeventEvents));
136 if (nle == NULL)
137 return NATS_NO_MEMORY;
138
139 // Indicate that we have created the object here (in case we get a failure).
140 created = true;
141 nle->nc = nc;
142 nle->loop = libeventLoop;
143
144 nle->keepActive = event_new(nle->loop, -1, EV_PERSIST, keepAliveCb, NULL);
145 if (nle->keepActive == NULL)
146 s = NATS_NO_MEMORY;
147
148 if (s == NATS_OK)
149 {
150 struct timeval timeout;
151
152 timeout.tv_sec = 100000;
153 timeout.tv_usec = 0;
154
155 if (event_add(nle->keepActive, &timeout) != 0)
156 s = NATS_ERR;
157 }
158 }
159 else
160 {
161 if (nle->read != NULL)
162 {
163 event_free(nle->read);
164 nle->read = NULL;
165 }
166 if (nle->write != NULL)
167 {
168 event_free(nle->write);
169 nle->write = NULL;
170 }
171 }
172
173 if (s == NATS_OK)
174 {
175 nle->read = event_new(nle->loop, socket, EV_READ|EV_PERSIST,
176 natsLibevent_ProcessEvent, (void*) nle);
177 natsLibevent_Read((void*) nle, true);
178
179 nle->write = event_new(nle->loop, socket, EV_WRITE|EV_PERSIST,
180 natsLibevent_ProcessEvent, (void*) nle);
181 }
182
183 if (s == NATS_OK)
184 *userData = (void*) nle;
185 else if (created)
186 natsLibeventEvents_free(nle, false);
187
188 return s;
189}
190
191static void
192_closeCb(evutil_socket_t ignoredSocket, short ignoredEvent, void *arg)
193{
194 natsLibeventEvents *nle = (natsLibeventEvents*) arg;
195
196 // We have stopped polling for the "READ" event and are now in the
197 // event loop thread and invoke this so that the NATS C client
198 // library can proceed with the close of the socket/connection.
199 natsConnection_ProcessCloseEvent(&(nle->socketToClose));
200}
201
211natsLibevent_Read(void *userData, bool add)
212{
213 natsLibeventEvents *nle = (natsLibeventEvents*) userData;
214 int res;
215
216 if (add)
217 res = event_add(nle->read, NULL);
218 else
219 {
220 int socket = event_get_fd(nle->read);
221 res = event_del_noblock(nle->read);
222 if (res == 0)
223 {
224 // This will schedule a one-time event that guarantees that the
225 // callback `_closeCb` will be invoked from the event loop thread.
226 nle->socketToClose = (natsSock) socket;
227 res = event_base_once(nle->loop, -1, EV_TIMEOUT, _closeCb, (void*) nle, NULL);
228 }
229 }
230
231 return (res == 0 ? NATS_OK : NATS_ERR);
232}
233
243natsLibevent_Write(void *userData, bool add)
244{
245 natsLibeventEvents *nle = (natsLibeventEvents*) userData;
246 int res;
247
248 if (add)
249 res = event_add(nle->write, NULL);
250 else
251 res = event_del_noblock(nle->write);
252
253 return (res == 0 ? NATS_OK : NATS_ERR);
254}
255
265natsLibevent_Detach(void *userData)
266{
267 natsLibeventEvents *nle = (natsLibeventEvents*) userData;
268
269 // This will schedule a one-time event that guarantees that the
270 // callback `_freeCb` will be invoked from the event loop thread.
271 int res = event_base_once(nle->loop, -1, EV_TIMEOUT, _freeCb, (void*) nle, NULL);
272
273 return (res == 0 ? NATS_OK : NATS_ERR);
274}
275
// end of libeventFunctions
277
278#ifdef __cplusplus
279}
280#endif
281
282#endif /* LIBEVENT_H_ */
NATS_EXTERN void natsConnection_ProcessDetachedEvent(natsConnection *nc)
Process a detach event when using external event loop.
NATS_EXTERN void natsConnection_ProcessCloseEvent(natsSock *socket)
Process a socket close event when using external event loop.
NATS_EXTERN void natsConnection_ProcessReadEvent(natsConnection *nc)
Process a read event when using external event loop.
NATS_EXTERN void natsConnection_ProcessWriteEvent(natsConnection *nc)
Process a write event when using external event loop.
void natsLibevent_Init(void)
Initialize the adapter.
Definition libevent.h:59
natsStatus natsLibevent_Attach(void **userData, void *loop, natsConnection *nc, natsSock socket)
Attach a connection to the given event loop.
Definition libevent.h:125
natsStatus natsLibevent_Write(void *userData, bool add)
Start or stop polling on WRITE events.
Definition libevent.h:243
natsStatus natsLibevent_Read(void *userData, bool add)
Start or stop polling on READ events.
Definition libevent.h:211
natsStatus natsLibevent_Detach(void *userData)
The connection is closed, it can be safely detached.
Definition libevent.h:265
struct __natsConnection natsConnection
A connection to a NATS Server.
Definition nats.h:152
int natsSock
Definition nats.h:50
natsStatus
Status returned by most of the APIs.
Definition status.h:50
@ NATS_ERR
Generic error.
Definition status.h:53
@ NATS_NO_MEMORY
Definition status.h:102
@ NATS_OK
Success.
Definition status.h:51