25#include <event2/thread.h>
31 struct event_base *loop;
34 struct event *keepActive;
62 evthread_use_windows_threads();
64 evthread_use_pthreads();
69natsLibevent_ProcessEvent(evutil_socket_t fd,
short event,
void *arg)
71 natsLibeventEvents *nle = (natsLibeventEvents*) arg;
81keepAliveCb(evutil_socket_t fd,
short flags,
void * arg)
87natsLibeventEvents_free(natsLibeventEvents *nle,
bool processDetachedEvents)
89 if (nle->read != NULL)
90 event_free(nle->read);
91 if (nle->write != NULL)
92 event_free(nle->write);
93 if (nle->keepActive != NULL)
95 event_active(nle->keepActive, 0, 0);
96 event_free(nle->keepActive);
98 if (processDetachedEvents)
106_freeCb(evutil_socket_t ignoredSocket,
short ignoredEvent,
void *arg)
108 natsLibeventEvents *nle = (natsLibeventEvents*) arg;
109 natsLibeventEvents_free(nle,
true);
127 struct event_base *libeventLoop = (
struct event_base*) loop;
128 natsLibeventEvents *nle = (natsLibeventEvents*) (*userData);
130 bool created =
false;
135 nle = (natsLibeventEvents*) calloc(1,
sizeof(natsLibeventEvents));
142 nle->loop = libeventLoop;
144 nle->keepActive = event_new(nle->loop, -1, EV_PERSIST, keepAliveCb, NULL);
145 if (nle->keepActive == NULL)
150 struct timeval timeout;
152 timeout.tv_sec = 100000;
155 if (event_add(nle->keepActive, &timeout) != 0)
161 if (nle->read != NULL)
163 event_free(nle->read);
166 if (nle->write != NULL)
168 event_free(nle->write);
175 nle->read = event_new(nle->loop, socket, EV_READ|EV_PERSIST,
176 natsLibevent_ProcessEvent, (
void*) nle);
179 nle->write = event_new(nle->loop, socket, EV_WRITE|EV_PERSIST,
180 natsLibevent_ProcessEvent, (
void*) nle);
184 *userData = (
void*) nle;
186 natsLibeventEvents_free(nle,
false);
192_closeCb(evutil_socket_t ignoredSocket,
short ignoredEvent,
void *arg)
194 natsLibeventEvents *nle = (natsLibeventEvents*) arg;
213 natsLibeventEvents *nle = (natsLibeventEvents*) userData;
217 res = event_add(nle->read, NULL);
220 int socket = event_get_fd(nle->read);
221 res = event_del_noblock(nle->read);
226 nle->socketToClose = (
natsSock) socket;
227 res = event_base_once(nle->loop, -1, EV_TIMEOUT, _closeCb, (
void*) nle, NULL);
245 natsLibeventEvents *nle = (natsLibeventEvents*) userData;
249 res = event_add(nle->write, NULL);
251 res = event_del_noblock(nle->write);
267 natsLibeventEvents *nle = (natsLibeventEvents*) userData;
271 int res = event_base_once(nle->loop, -1, EV_TIMEOUT, _freeCb, (
void*) nle, NULL);
NATS_EXTERN void natsConnection_ProcessDetachedEvent(natsConnection *nc)
Process a detach event when using external event loop.
NATS_EXTERN void natsConnection_ProcessCloseEvent(natsSock *socket)
Process a socket close event when using external event loop.
NATS_EXTERN void natsConnection_ProcessReadEvent(natsConnection *nc)
Process a read event when using external event loop.
NATS_EXTERN void natsConnection_ProcessWriteEvent(natsConnection *nc)
Process a write event when using external event loop.
void natsLibevent_Init(void)
Initialize the adapter.
Definition libevent.h:59
natsStatus natsLibevent_Attach(void **userData, void *loop, natsConnection *nc, natsSock socket)
Attach a connection to the given event loop.
Definition libevent.h:125
natsStatus natsLibevent_Write(void *userData, bool add)
Start or stop polling on WRITE events.
Definition libevent.h:243
natsStatus natsLibevent_Read(void *userData, bool add)
Start or stop polling on READ events.
Definition libevent.h:211
natsStatus natsLibevent_Detach(void *userData)
The connection is closed, it can be safely detached.
Definition libevent.h:265
struct __natsConnection natsConnection
A connection to a NATS Server.
Definition nats.h:152
int natsSock
Definition nats.h:50
natsStatus
Status returned by most of the APIs.
Definition status.h:50
@ NATS_ERR
Generic error.
Definition status.h:53
@ NATS_NO_MEMORY
Definition status.h:102
@ NATS_OK
Success.
Definition status.h:51