NATS C Client with JetStream and Streaming support  3.10.0
The nats.io C Client, Supported by Synadia Communications Inc.
Loading...
Searching...
No Matches
libevent.h
Go to the documentation of this file.
1// Copyright 2016-2018 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14#ifndef LIBEVENT_H_
15#define LIBEVENT_H_
16
17#ifdef __cplusplus
18extern "C" {
19#endif
20
24#include <event.h>
25#include <event2/thread.h>
26#include "../nats.h"
27
28typedef struct
29{
31 struct event_base *loop;
32 struct event *read;
33 struct event *write;
34 struct event *keepActive;
35
36} natsLibeventEvents;
37
38// Forward declarations
39natsStatus natsLibevent_Read(void *userData, bool add);
40natsStatus natsLibevent_Detach(void *userData);
41
57void
59{
60#if _WIN32
61 evthread_use_windows_threads();
62#else
63 evthread_use_pthreads();
64#endif
65}
66
67static void
68natsLibevent_ProcessEvent(evutil_socket_t fd, short event, void *arg)
69{
70 natsLibeventEvents *nle = (natsLibeventEvents*) arg;
71
72 if (event & EV_READ)
74
75 if (event & EV_WRITE)
77}
78
79static void
80keepAliveCb(evutil_socket_t fd, short flags, void * arg)
81{
82 // do nothing...
83}
84
98natsLibevent_Attach(void **userData, void *loop, natsConnection *nc, natsSock socket)
99{
100 struct event_base *libeventLoop = (struct event_base*) loop;
101 natsLibeventEvents *nle = (natsLibeventEvents*) (*userData);
103
104 // This is the first attach (when reconnecting, nle will be non-NULL).
105 if (nle == NULL)
106 {
107 nle = (natsLibeventEvents*) calloc(1, sizeof(natsLibeventEvents));
108 if (nle == NULL)
109 return NATS_NO_MEMORY;
110
111 nle->nc = nc;
112 nle->loop = libeventLoop;
113
114 nle->keepActive = event_new(nle->loop, -1, EV_PERSIST, keepAliveCb, NULL);
115 if (nle->keepActive == NULL)
116 s = NATS_NO_MEMORY;
117
118 if (s == NATS_OK)
119 {
120 struct timeval timeout;
121
122 timeout.tv_sec = 100000;
123 timeout.tv_usec = 0;
124
125 if (event_add(nle->keepActive, &timeout) != 0)
126 s = NATS_ERR;
127 }
128 }
129 else
130 {
131 if (nle->read != NULL)
132 {
133 event_free(nle->read);
134 nle->read = NULL;
135 }
136 if (nle->write != NULL)
137 {
138 event_free(nle->write);
139 nle->write = NULL;
140 }
141 }
142
143 if (s == NATS_OK)
144 {
145 nle->read = event_new(nle->loop, socket, EV_READ|EV_PERSIST,
146 natsLibevent_ProcessEvent, (void*) nle);
147 natsLibevent_Read((void*) nle, true);
148
149 nle->write = event_new(nle->loop, socket, EV_WRITE|EV_PERSIST,
150 natsLibevent_ProcessEvent, (void*) nle);
151 }
152
153 if (s == NATS_OK)
154 *userData = (void*) nle;
155 else
156 natsLibevent_Detach((void*) nle);
157
158 return s;
159}
160
161static void
162_closeCb(evutil_socket_t fd, short event, void *arg)
163{
164 natsSock socket = (natsSock) fd;
165
166 // We have stopped polling for the "READ" event and are now in the
167 // event loop thread and invoke this so that the NATS C client
168 // library can proceed with the close of the socket/connection.
170}
171
181natsLibevent_Read(void *userData, bool add)
182{
183 natsLibeventEvents *nle = (natsLibeventEvents*) userData;
184 int res;
185
186 if (add)
187 res = event_add(nle->read, NULL);
188 else
189 {
190 int socket = event_get_fd(nle->read);
191 res = event_del_noblock(nle->read);
192 if (res == 0)
193 {
194 // This will schedule a one-time event that guarantees that the
195 // callback `_closeCb` will be invoked from the event loop thread.
196 res = event_base_once(nle->loop, socket, EV_TIMEOUT, _closeCb, (void*) nle, NULL);
197 }
198 }
199
200 return (res == 0 ? NATS_OK : NATS_ERR);
201}
202
212natsLibevent_Write(void *userData, bool add)
213{
214 natsLibeventEvents *nle = (natsLibeventEvents*) userData;
215 int res;
216
217 if (add)
218 res = event_add(nle->write, NULL);
219 else
220 res = event_del_noblock(nle->write);
221
222 return (res == 0 ? NATS_OK : NATS_ERR);
223}
224
234natsLibevent_Detach(void *userData)
235{
236 natsLibeventEvents *nle = (natsLibeventEvents*) userData;
237
238 if (nle->read != NULL)
239 event_free(nle->read);
240 if (nle->write != NULL)
241 event_free(nle->write);
242 if (nle->keepActive != NULL)
243 {
244 event_active(nle->keepActive, 0, 0);
245 event_free(nle->keepActive);
246 }
247
248 free(nle);
249
250 return NATS_OK;
251}
252
// end of libeventFunctions
254
255#ifdef __cplusplus
256}
257#endif
258
259#endif /* LIBEVENT_H_ */
NATS_EXTERN void natsConnection_ProcessCloseEvent(natsSock *socket)
Process a socket close event when using external event loop.
NATS_EXTERN void natsConnection_ProcessReadEvent(natsConnection *nc)
Process a read event when using external event loop.
NATS_EXTERN void natsConnection_ProcessWriteEvent(natsConnection *nc)
Process a write event when using external event loop.
void natsLibevent_Init(void)
Initialize the adapter.
Definition libevent.h:58
natsStatus natsLibevent_Attach(void **userData, void *loop, natsConnection *nc, natsSock socket)
Attach a connection to the given event loop.
Definition libevent.h:98
natsStatus natsLibevent_Write(void *userData, bool add)
Start or stop polling on WRITE events.
Definition libevent.h:212
natsStatus natsLibevent_Read(void *userData, bool add)
Start or stop polling on READ events.
Definition libevent.h:181
natsStatus natsLibevent_Detach(void *userData)
The connection is closed, it can be safely detached.
Definition libevent.h:234
struct __natsConnection natsConnection
A connection to a NATS Server.
Definition nats.h:160
int natsSock
Definition nats.h:58
natsStatus
Status returned by most of the APIs.
Definition status.h:50
@ NATS_ERR
Generic error.
Definition status.h:53
@ NATS_NO_MEMORY
Definition status.h:102
@ NATS_OK
Success.
Definition status.h:51