NATS C Client with JetStream and Streaming support  3.8.0
The nats.io C Client, Supported by Synadia Communications Inc.
Loading...
Searching...
No Matches
libevent.h
Go to the documentation of this file.
1// Copyright 2016-2018 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14#ifndef LIBEVENT_H_
15#define LIBEVENT_H_
16
17#ifdef __cplusplus
18extern "C" {
19#endif
20
24#include <event.h>
25#include <event2/thread.h>
26#include "../nats.h"
27
28typedef struct
29{
31 struct event_base *loop;
32 struct event *read;
33 struct event *write;
34 struct event *keepActive;
35
36} natsLibeventEvents;
37
38// Forward declarations
39natsStatus natsLibevent_Read(void *userData, bool add);
40natsStatus natsLibevent_Detach(void *userData);
41
57void
59{
60#if _WIN32
61 evthread_use_windows_threads();
62#else
63 evthread_use_pthreads();
64#endif
65}
66
67static void
68natsLibevent_ProcessEvent(evutil_socket_t fd, short event, void *arg)
69{
70 natsLibeventEvents *nle = (natsLibeventEvents*) arg;
71
72 if (event & EV_READ)
74
75 if (event & EV_WRITE)
77}
78
79static void
80keepAliveCb(evutil_socket_t fd, short flags, void * arg)
81{
82 // do nothing...
83}
84
98natsLibevent_Attach(void **userData, void *loop, natsConnection *nc, natsSock socket)
99{
100 struct event_base *libeventLoop = (struct event_base*) loop;
101 natsLibeventEvents *nle = (natsLibeventEvents*) (*userData);
103
104 // This is the first attach (when reconnecting, nle will be non-NULL).
105 if (nle == NULL)
106 {
107 nle = (natsLibeventEvents*) calloc(1, sizeof(natsLibeventEvents));
108 if (nle == NULL)
109 return NATS_NO_MEMORY;
110
111 nle->nc = nc;
112 nle->loop = libeventLoop;
113
114 nle->keepActive = event_new(nle->loop, -1, EV_PERSIST, keepAliveCb, NULL);
115 if (nle->keepActive == NULL)
116 s = NATS_NO_MEMORY;
117
118 if (s == NATS_OK)
119 {
120 struct timeval timeout;
121
122 timeout.tv_sec = 100000;
123 timeout.tv_usec = 0;
124
125 if (event_add(nle->keepActive, &timeout) != 0)
126 s = NATS_ERR;
127 }
128 }
129 else
130 {
131 if (nle->read != NULL)
132 {
133 event_free(nle->read);
134 nle->read = NULL;
135 }
136 if (nle->write != NULL)
137 {
138 event_free(nle->write);
139 nle->write = NULL;
140 }
141 }
142
143 if (s == NATS_OK)
144 {
145 nle->read = event_new(nle->loop, socket, EV_READ|EV_PERSIST,
146 natsLibevent_ProcessEvent, (void*) nle);
147 natsLibevent_Read((void*) nle, true);
148
149 nle->write = event_new(nle->loop, socket, EV_WRITE|EV_PERSIST,
150 natsLibevent_ProcessEvent, (void*) nle);
151 }
152
153 if (s == NATS_OK)
154 *userData = (void*) nle;
155 else
156 natsLibevent_Detach((void*) nle);
157
158 return s;
159}
160
170natsLibevent_Read(void *userData, bool add)
171{
172 natsLibeventEvents *nle = (natsLibeventEvents*) userData;
173 int res;
174
175 if (add)
176 res = event_add(nle->read, NULL);
177 else
178 res = event_del_noblock(nle->read);
179
180 return (res == 0 ? NATS_OK : NATS_ERR);
181}
182
192natsLibevent_Write(void *userData, bool add)
193{
194 natsLibeventEvents *nle = (natsLibeventEvents*) userData;
195 int res;
196
197 if (add)
198 res = event_add(nle->write, NULL);
199 else
200 res = event_del_noblock(nle->write);
201
202 return (res == 0 ? NATS_OK : NATS_ERR);
203}
204
214natsLibevent_Detach(void *userData)
215{
216 natsLibeventEvents *nle = (natsLibeventEvents*) userData;
217
218 if (nle->read != NULL)
219 event_free(nle->read);
220 if (nle->write != NULL)
221 event_free(nle->write);
222 if (nle->keepActive != NULL)
223 {
224 event_active(nle->keepActive, 0, 0);
225 event_free(nle->keepActive);
226 }
227
228 free(nle);
229
230 return NATS_OK;
231}
232
233 // end of libeventFunctions
234
235#ifdef __cplusplus
236}
237#endif
238
239#endif /* LIBEVENT_H_ */
NATS_EXTERN void natsConnection_ProcessReadEvent(natsConnection *nc)
Process a read event when using external event loop.
NATS_EXTERN void natsConnection_ProcessWriteEvent(natsConnection *nc)
Process a write event when using external event loop.
void natsLibevent_Init(void)
Initialize the adapter.
Definition libevent.h:58
natsStatus natsLibevent_Attach(void **userData, void *loop, natsConnection *nc, natsSock socket)
Attach a connection to the given event loop.
Definition libevent.h:98
natsStatus natsLibevent_Write(void *userData, bool add)
Start or stop polling on WRITE events.
Definition libevent.h:192
natsStatus natsLibevent_Read(void *userData, bool add)
Start or stop polling on READ events.
Definition libevent.h:170
natsStatus natsLibevent_Detach(void *userData)
The connection is closed, it can be safely detached.
Definition libevent.h:214
struct __natsConnection natsConnection
A connection to a NATS Server.
Definition nats.h:152
int natsSock
Definition nats.h:50
natsStatus
Status returned by most of the APIs.
Definition status.h:50
@ NATS_ERR
Generic error.
Definition status.h:53
@ NATS_NO_MEMORY
Definition status.h:102
@ NATS_OK
Success.
Definition status.h:51