ソースを参照

Z991239-342 #comment 分离出 evtpoll 结构体

gifur 5 年 前
コミット
18f101855d

+ 2 - 0
libtoolkit/CMakeLists.txt

@@ -12,6 +12,8 @@ if(HAVE_DBGHELP)
 	message(STATUS "have dbghelp")
 endif(HAVE_DBGHELP)
 
+# negative impact: every times you add new file, you must sync once again or the new
+# structure would not be defined.
 file(GLOB ${MODULE_PREFIX}_SRCS RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "*.cpp" "*.h" "*.c")
 # remove the no use file at all platform so far
 # list(REMOVE_ITEM ${MODULE_PREFIX}_SRCS 

+ 14 - 13
libtoolkit/bus-unix.c

@@ -6,6 +6,7 @@
 #include "spinlock.h"
 #include "list.h"
 #include "bus_internal.h"
+#include "evtpoll.h"
 
 #include <winpr/file.h>
 #include <winpr/pipe.h>
@@ -42,8 +43,8 @@ struct bus_endpt_t {
 	struct list_head msg_list;
 	spinlock_t msg_lock;
 	HANDLE msg_sem;
-	HANDLE tx_evt;
-	HANDLE rx_evt;
+	HANDLE tx_evt; //manually
+	HANDLE rx_evt; //manually
 	OVERLAPPED rx_overlapped;
 	int rx_pending;
 	int rx_pending_pkt_len;
@@ -105,7 +106,7 @@ static SOCKET create_socket_handle(const char* ip, int port)
 {
 	SOCKET fd;
 	struct sockaddr_in addr;
-
+	/*Warning: only front three param is effective*/
 	fd = WSASocketA(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
 
 	if (fd == INVALID_SOCKET) {
@@ -116,6 +117,7 @@ static SOCKET create_socket_handle(const char* ip, int port)
 		BOOL f = TRUE;
 		u_long l = TRUE;
 		setsockopt(fd, SOL_SOCKET, SO_DONTLINGER, (char*)&f, sizeof(f));
+		/*non block sock*/
 		_ioctlsocket(fd, FIONBIO, &l);
 	}
 
@@ -411,13 +413,13 @@ static int start_read_pkt(bus_endpt_t* endpt, iobuffer_t** p_pkt)
 		wsabuf.buf = (char*)&endpt->rx_pending_pkt_len;
 		wsabuf.len = 4;
 		ret = _recv(endpt->sock_handle, wsabuf.buf, wsabuf.len, 0);
-		//TODO:
 	}
 	else {
 		return -1;
 	}
 
-	if (ret) {
+	if (ret >= 0) {
+		dwBytesTransferred = ret;
 		if (dwBytesTransferred == 0)
 			return -1;
 		if (dwBytesTransferred < 4) {
@@ -432,20 +434,18 @@ static int start_read_pkt(bus_endpt_t* endpt, iobuffer_t** p_pkt)
 				iobuffer_destroy(pkt);
 				return rc;
 			}
-
 			iobuffer_push_count(pkt, endpt->rx_pending_pkt_len);
 		}
 		*p_pkt = pkt;
 	}
 	else {
-		if (WSAGetLastError() == WSA_IO_PENDING) {
+		if (errno == EAGAIN) {
 			endpt->rx_pending = 1;
 		}
 		else {
 			return -1;
 		}
 	}
-
 	return 0;
 }
 
@@ -686,7 +686,7 @@ static int bus_endpt_poll_internal(bus_endpt_t* endpt, int* result, int timeout)
 		iobuffer_restore_read_state(pkt, read_state);
 		*result = to_result(pkt_type);
 		if (*result == BUS_RESULT_UNKNOWN) {
-			OutputDebugStringA("bug: unknown pkt type!\n");
+			WLog_ERR(TAG, "bug: unknown pkt type!");
 			return -1;
 		}
 		return 1;
@@ -698,7 +698,7 @@ static int bus_endpt_poll_internal(bus_endpt_t* endpt, int* result, int timeout)
 		if (rc < 0)
 			return rc;
 		if (pkt) {
-			OutputDebugStringA("pkt has read\n");
+			WLog_ERR(TAG, "pkt has read");
 			rc = append_rx_pkt(endpt, pkt);
 			if (rc < 0) {
 				iobuffer_destroy(pkt);
@@ -706,13 +706,14 @@ static int bus_endpt_poll_internal(bus_endpt_t* endpt, int* result, int timeout)
 			}
 		}
 		else {
-			OutputDebugStringA("pending\n");
+			WLog_ERR(TAG, "pending");
 		}
 	}
 
 	// if receive is pending, wait for send or receive complete event
 	if (!pkt) {
 		HANDLE hs[] = { endpt->msg_sem, endpt->rx_evt };
+		WLog_DBG(TAG, "wait msg sem or received event.");
 		ret = WaitForMultipleObjects(array_size(hs), &hs[0], FALSE, (DWORD)timeout);
 		if (ret == WAIT_TIMEOUT) {
 			return 0;
@@ -736,7 +737,7 @@ static int bus_endpt_poll_internal(bus_endpt_t* endpt, int* result, int timeout)
 		}
 	}
 	else {
-		OutputDebugStringA("pkt has readed\n");
+		WLog_ERR(TAG, "pkt has readed");
 	}
 
 	if (pkt) {
@@ -746,7 +747,7 @@ static int bus_endpt_poll_internal(bus_endpt_t* endpt, int* result, int timeout)
 		iobuffer_restore_read_state(pkt, read_state);
 		*result = to_result(type);
 		if (*result == BUS_RESULT_UNKNOWN) {
-			OutputDebugStringA("bug: unknown pkt type!\n");
+			WLog_ERR(TAG, "bug: unknown pkt type!");
 			return -1;
 		}
 	}

+ 676 - 0
libtoolkit/evtpoll.c

@@ -0,0 +1,676 @@
+#include <assert.h>
+#include <unistd.h>
+#include <errno.h>
+#include "evtpoll.h"
+#include "memutil.h"
+#include "spinlock.h"
+#include "list.h"
+#include "array.h"
+#include "refcnt.h"
+
+#include <winpr/wlog.h>
+#define TAG TOOLKIT_TAG("evtpoll")
+
+#ifndef _WIN32
+
+struct event_epoll_s {
+	int epfd;
+	/*insterest*/
+	spinlock_t interest_list_lock;
+	struct list_head interest_list;
+};
+
+struct evtpoll_interest_entry_s {
+	struct list_head entry;
+	int type;
+	int events;
+	int pending;
+	void* key;
+	void* io;
+
+	spinlock_t lock;
+	struct evtpoll_interest_s* owner;
+};
+
+struct evtpoll_interest_s {
+	struct list_head node; // for evtpoll_t::interest_list
+
+	struct list_head entry_list; //no used
+
+	int fd;
+	int events;
+	int type; //no used
+	evtpoll_t* owner;
+
+	array_header_t* entries;
+	spinlock_t lock;
+	volatile int entry_counts;
+};
+typedef struct evtpoll_interest_entry_s evtpoll_interest_entry_t;
+typedef struct evtpoll_interest_s evtpoll_interest_t;
+
+static int evtpoll__interest_empty(evtpoll_t* ep)
+{
+	int ret;
+	assert(ep);
+	spinlock_enter(&ep->interest_list_lock, -1);
+	ret = list_empty(&ep->interest_list);
+	spinlock_leave(&ep->interest_list_lock);
+	return ret;
+}
+
+evtpoll_t* evtpoll_create()
+{
+	evtpoll_t* ep = ZALLOC_T(evtpoll_t);
+	if (!ep)
+		return NULL;
+	/*
+	since Linux 2.6.8 the argument is ignored but must be g0 for compatibility,
+	bcz kernel would allocate space dynamically.
+	*/
+	ep->epfd = epoll_create(MAX_EPOLL_EVENT);
+	if (ep->epfd == -1) {
+		WLog_ERR(TAG, "epoll fd create failed: %s", strerror(errno));
+		goto on_error;
+	}
+	spinlock_init(&ep->interest_list_lock);
+	INIT_LIST_HEAD(&ep->interest_list);
+	return ep;
+on_error:
+	free(ep);
+	return NULL;
+}
+
+void evtpoll_destroy(evtpoll_t* evt_poll)
+{
+	assert(evtpoll__interest_empty(evt_poll));
+	close(evt_poll->epfd);
+	free(evt_poll);
+}
+
+int evtpoll_get_epoll_fd(const evtpoll_t* const evt_poll)
+{
+	return evt_poll->epfd;
+}
+
+int evtpoll_get_raw_fd(evtpoll_t* ep)
+{
+	return ep->epfd;
+}
+
+void evtpoll__interest_entry_set(
+	evtpoll_interest_entry_t* entry,
+	int events,
+	int type,
+	int pending,
+	void* key,
+	void* io)
+{
+	assert(entry);
+	assert(entry->owner);
+	entry->type = type;
+	entry->events = events;
+	entry->io = io;
+	entry->pending = pending;
+	entry->key = key;
+}
+
+void evtpoll__interest_entry_reset(evtpoll_interest_entry_t* entry) {
+	evtpoll__interest_entry_set(entry, 0, 0, 0, NULL, NULL);
+}
+
+static evtpoll_interest_entry_t* evtpoll__interest_entry_create(evtpoll_interest_t* interest)
+{
+	evtpoll_interest_entry_t* entry = MALLOC_T(evtpoll_interest_entry_t);
+	assert(entry);
+	entry->owner = interest;
+	evtpoll__interest_entry_reset(entry);
+	spinlock_init(&entry->lock);
+	return entry;
+}
+
+
+static int evtpoll__interest_entry_is_ready(const evtpoll_interest_entry_t* const entry)
+{
+	if (entry->pending != 0) {
+		return 0;
+	}
+	if (!entry->io) {
+		return 0;
+	}
+	if (entry->type == 0 || entry->events == 0) {
+		return 0;
+	}
+	return 1;
+}
+
+void evtpoll__interest_entry_destroy(evtpoll_interest_entry_t* entry)
+{
+	free(entry);
+}
+
+void evtpoll__interest_lock(evtpoll_interest_t* interest)
+{
+	spinlock_enter(&interest->lock, 0);
+}
+
+void evtpoll__interest_unlock(evtpoll_interest_t* interest)
+{
+	spinlock_leave(&interest->lock);
+}
+
+void evtpoll___add_interest_list(evtpoll_interest_t* inst)
+{
+	evtpoll_t* ep = inst->owner;
+	assert(ep);
+	spinlock_enter(&ep->interest_list_lock, -1);
+	list_add(&inst->node, &ep->interest_list);
+	spinlock_leave(&ep->interest_list_lock);
+}
+
+void evtpoll___del_interest_list(evtpoll_interest_t* inst)
+{
+	evtpoll_t* ep = inst->owner;
+	assert(ep);
+	spinlock_enter(&ep->interest_list_lock, -1);
+	list_del(&inst->node);
+	inst->node.next = inst->node.prev = NULL;
+	spinlock_leave(&ep->interest_list_lock);
+}
+
+void evtpoll__interest_free(evtpoll_interest_t* inst)
+{
+	evtpoll___del_interest_list(inst);
+}
+
+
+static int evtpoll__ctl(evtpoll_t* ep, int event_mask, int ctrl_mod, int fd, void* data)
+{
+	int ret = -1;
+	struct epoll_event ee;
+	if (ep) {
+		ee.events = event_mask;
+		if (data) {
+			ee.data.ptr = data;
+		}
+		else {
+			ee.data.fd = fd;
+		}
+		ret = epoll_ctl(evtpoll_get_epoll_fd(ep), ctrl_mod, fd, &ee);
+		if (ret == -1) {
+			WLog_ERR(TAG, "epoll ctl failed: %s", strerror(errno));
+		}
+		else {
+			WLog_INFO(TAG, "fd(%d), epoll_ctl: OUT:%d, IN:%d, OTHERS:0x%08X, ADD:%d, MOD:%d, data:0x%X ret:%d",
+				fd,
+				(event_mask & EPOLLOUT) > 0 ? 1 : 0, (event_mask & EPOLLIN) > 0 ? 1 : 0,
+				(event_mask & ~(EPOLLOUT | EPOLLIN)),
+				ctrl_mod == EPOLL_CTL_ADD ? 1 : 0, ctrl_mod == EPOLL_CTL_MOD ? 1 : 0, data, ret);
+		}
+	}
+	return ret;
+}
+
+static int evtpoll__ctl_remove(evtpoll_t* ep, int event_mask, int fd, void* data)
+{
+	int ret = -1;
+	struct epoll_event ee;
+	ee.events = event_mask;
+	if (data)
+		ee.data.ptr = data;
+	else
+		ee.data.fd = fd;
+	ret = epoll_ctl(evtpoll_get_epoll_fd(ep), EPOLL_CTL_DEL, fd, &ee);
+	if (ret == 0 || (ret == -1 && errno == ENOENT)) {
+		return 0;
+	}
+	WLog_ERR(TAG, "epoll remove ctrl failed: %s", strerror(errno));
+	return -1;
+}
+
+/*!
+ * @brief
+		 one 'evtpoll_interest_t' maps to one fd.
+		 each interest instance has {DEFAULT_INTEREST_OP_COUNT} evtpoll_interest_entry_t type object.
+ * @param[in]
+ * @return : SUCC: new evtpoll_interest_t type pointer object; NULL: failed
+ */
+static evtpoll_interest_t* evtpoll__interest_create(int fd)
+{
+	int i;
+	evtpoll_interest_t* inst = ZALLOC_T(evtpoll_interest_t);
+	if (!inst) {
+		return NULL;
+	}
+	inst->fd = fd;
+	INIT_LIST_HEAD(&inst->entry_list);
+	inst->entry_counts = 0;
+	inst->owner = NULL;
+	inst->type = 0;
+	inst->events = 0;
+
+	inst->entries = array_make(DEFAULT_INTEREST_OP_COUNT, sizeof(evtpoll_interest_entry_t*));
+	for (i = 0; i < DEFAULT_INTEREST_OP_COUNT; ++i) {
+		ARRAY_IDX(inst->entries, i, evtpoll_interest_entry_t*) = evtpoll__interest_entry_create(inst);
+	}
+	return inst;
+}
+
+static void evtpoll__interest_destroy(evtpoll_interest_t* inst)
+{
+	int i;
+	assert(inst);
+	assert(list_empty(&inst->entry_list));
+	for (i = 0; i < inst->entries->nelts; ++i)
+		evtpoll__interest_entry_destroy(ARRAY_IDX(inst->entries, i, evtpoll_interest_entry_t*));
+	array_free(inst->entries);
+	free(inst);
+}
+
+
+static evtpoll_interest_t* evtpoll__find_interest(const evtpoll_t* const cst_ep, int fd)
+{
+	struct list_head* pos;
+	struct list_head* tmp;
+	if (list_empty(&cst_ep->interest_list))
+		return NULL;
+	list_for_each_safe(pos, tmp, &cst_ep->interest_list) {
+		evtpoll_interest_t* node = list_entry(pos, evtpoll_interest_t, node);
+		if (node->fd == fd) {
+			return node;
+		}
+	}
+	return NULL;
+}
+
+static void evtpoll__interest_clear(evtpoll_t* ep)
+{
+	struct list_head* pos;
+	struct list_head* tmp;
+	if (list_empty(&ep->interest_list))
+		return;
+	spinlock_enter(&ep->interest_list_lock, -1);
+	list_for_each_safe(pos, tmp, &ep->interest_list) {
+		evtpoll_interest_t* node = list_entry(pos, evtpoll_interest_t, node);
+		list_del(pos);
+		evtpoll__interest_destroy(node);
+	}
+	spinlock_leave(&ep->interest_list_lock);
+	return;
+}
+
+static int evtpoll__register_interest(evtpoll_t* ep, evtpoll_interest_t* inst)
+{
+	int ret = -1;
+	assert(ep);
+	assert(inst);
+	evtpoll_interest_t* exist;
+#if 1
+	//×¢²áºó²»Á¢¼´½øÐмàÌý
+	ret = 0;
+#else
+	ret = evtpoll__ctl(ep, EV_READ_WRITE_WITH_LT_FULL, EPOLL_CTL_ADD, inst->fd, NULL);
+	if (ret != 0)
+		return ret;
+#endif
+	evtpoll___add_interest_list(inst);
+#if 1 //DEBUG
+	exist = evtpoll__find_interest(ep, inst->fd);
+	assert(exist);
+	assert(exist == inst);
+#endif
+	return ret;
+}
+
+static int evtpoll__unregister_interest(evtpoll_t* ep, int fd)
+{
+	int ret = 0;
+	evtpoll_interest_t* exist;
+	exist = evtpoll__find_interest(ep, fd);
+	if (exist) {
+		evtpoll__interest_free(exist);
+#if 1//DEBUG
+		exist = evtpoll__find_interest(ep, fd);
+		assert(!exist);
+#endif
+		evtpoll__interest_destroy(exist);
+	}
+	return ret;
+}
+
+int evtpoll_attach(evtpoll_t* ep, int interest_fd)
+{
+	int ret = 0;
+	assert(interest_fd > 0);
+	evtpoll_interest_t* exist;
+	WLog_DBG(TAG, "==> attach interest fd(%d) ", interest_fd);
+	exist = evtpoll__find_interest(ep, interest_fd);
+	if (exist) {
+		WLog_WARN(TAG, "detect interest fd %d already exist.", interest_fd);
+		return -1;
+	}
+	exist = evtpoll__interest_create(interest_fd);
+	if (!exist) {
+		WLog_ERR(TAG, "create epueue insterest fd %d failed!", interest_fd);
+		return -1;
+	}
+	exist->owner = ep;
+	ret = evtpoll__register_interest(ep, exist);
+	if (ret != 0) {
+		evtpoll__interest_destroy(exist);
+		return -1;
+	}
+	WLog_DBG(TAG, "<== normal attach interest fd(%d) succ.", interest_fd);
+	return 0;
+}
+
+static void evtpoll__detach(evtpoll_interest_t* inst)
+{
+	evtpoll__unregister_interest(inst->owner, inst);
+}
+
+void evtpoll_detach(evtpoll_t* ep, int interest_fd)
+{
+	evtpoll_interest_t* exist;
+	exist = evtpoll__find_interest(ep, interest_fd);
+	if (exist) {
+		evtpoll__detach(exist);
+	}
+}
+
+static int evtpoll__subscribe_precheck(evtpoll_interest_entry_t* entry, int exist, void* data)
+{
+	if (exist) {
+		if (entry->pending) {
+			WLog_ERR(TAG, "the entry is still pending...");
+			return -1;
+		}
+		else if (entry->io == data) {
+			WLog_WARN(TAG, "entry already exists and seems same, return previously.");
+			return 0;
+		}
+	}
+	if (data != NULL) {
+		const int io_type = ioqueue_overlapped_get_type(data);
+		WLog_INFO(TAG, "epueue ov type for subscribing: 0x%X", io_type);
+	}
+	else if (!entry->io) {
+		WLog_ERR(TAG, "no io data for event!");
+		return -1;
+	}
+	return 1;
+}
+
+static int evtpoll__subscribe_read(evtpoll_t* ep, evtpoll_interest_t* inst, void* data)
+{
+	int ret;
+	int io_type = 0;
+	int has = 0;
+	int op = EPOLL_CTL_ADD;
+	int events = EPOLLIN;
+	evtpoll_interest_entry_t* entry;
+	if (inst->events & EV_READ) {
+		has = 1;
+	}
+
+	entry = ARRAY_IDX(inst->entries, EV_INTEREST_ENTRY_IN_IDX, evtpoll_interest_entry_t*);
+	assert(entry);
+
+	ret = evtpoll__subscribe_precheck(entry, has, data);
+	if (ret < 1) {
+		return ret;
+	}
+
+	ret = 0;
+	io_type = ioqueue_overlapped_get_type(data);
+
+	if (has) {
+		evtpoll__interest_entry_set(entry, EV_READ, io_type, 0, NULL, data);
+		WLog_INFO(TAG, "fd(%d): read entry existed, only update the ov and returned.", inst->fd);
+	}
+	else {
+
+		if (inst->events & EV_WRITE) {
+			op = EPOLL_CTL_MOD;
+			events |= EPOLLOUT;
+		}
+		ret = evtpoll__ctl(ep, events, op, inst->fd, inst);
+		if (ret) {
+			WLog_ERR(TAG, "fd(%d): set read register failed !", inst->fd);
+		}
+		else {
+			inst->events |= EV_READ;
+			evtpoll__interest_entry_set(entry, EV_READ, io_type, 0, NULL, data);
+		}
+	}
+
+	return ret;
+}
+
+static int evtpoll__unsubscribe_read(evtpoll_t* ep, evtpoll_interest_t* inst)
+{
+	int ret;
+	int events = EPOLLIN;
+	int op = EPOLL_CTL_DEL;
+	evtpoll_interest_entry_t* entry = NULL;
+	if (!(inst->events & EV_READ)) {
+		WLog_WARN(TAG, "read event is not existed");
+		return 0;
+	}
+	if (inst->events & EV_WRITE) {
+		op = EPOLL_CTL_MOD;
+		events = EPOLLOUT;
+	}
+
+	ret = evtpoll__ctl(ep, events, op, inst->fd, inst);
+	if (ret) {
+		WLog_ERR(TAG, "un read register failed !");
+	}
+	else {
+		inst->events &= ~(EV_READ);
+		entry = ARRAY_IDX(inst->entries, EV_INTEREST_ENTRY_IN_IDX, evtpoll_interest_entry_t*);
+		evtpoll__interest_entry_reset(entry);
+	}
+	return ret;
+}
+
+static int evtpoll__subscribe_write(evtpoll_t* ep, evtpoll_interest_t* inst, void* data)
+{
+	int ret;
+	int io_type = -1;
+	int has = 0;
+	int op = EPOLL_CTL_ADD;
+	int events = EPOLLOUT;
+	evtpoll_interest_entry_t* entry = NULL;
+	if (inst->events & EV_WRITE) {
+		has = 1;
+	}
+
+	entry = ARRAY_IDX(inst->entries, EV_INTEREST_ENTRY_OUT_IDX, evtpoll_interest_entry_t*);
+	assert(entry);
+
+	ret = evtpoll__subscribe_precheck(entry, has, data);
+	if (ret < 1) {
+		return ret;
+	}
+
+	ret = 0;
+	io_type = ioqueue_overlapped_get_type(data);
+
+	if (has) {
+		evtpoll__interest_entry_set(entry, EV_WRITE, io_type, 0, NULL, data);
+		WLog_INFO(TAG, "fd(%d): write entry existed, only update the ov and returned.", inst->fd);
+	}
+	else {
+
+		if (inst->events & EV_READ) {
+			op = EPOLL_CTL_MOD;
+			events |= EPOLLIN;
+		}
+		ret = evtpoll__ctl(ep, events, op, inst->fd, inst);
+		if (ret) {
+			WLog_ERR(TAG, "fd(%d): set write register failed !", inst->fd);
+		}
+		else {
+			inst->events |= EV_WRITE;
+			evtpoll__interest_entry_set(entry, EV_WRITE, io_type, 0, NULL, data);
+		}
+	}
+
+	return ret;
+}
+
+static int evtpoll__unsubscribe_write(evtpoll_t* ep, evtpoll_interest_t* inst)
+{
+	int ret;
+	int events = EPOLLOUT;
+	int op = EPOLL_CTL_DEL;
+	if (!(inst->events & EV_WRITE)) {
+		WLog_WARN(TAG, "write event is not existed");
+		return 0;
+	}
+	if (inst->events & EV_READ) {
+		op = EPOLL_CTL_MOD;
+		events = EPOLLIN;
+	}
+
+	ret = evtpoll__ctl(ep, events, op, inst->fd, inst);
+	if (ret) {
+		WLog_ERR(TAG, "un read register failed !");
+	}
+	else {
+		evtpoll_interest_entry_t* entry =
+			ARRAY_IDX(inst->entries, EV_INTEREST_ENTRY_OUT_IDX, evtpoll_interest_entry_t*);
+		evtpoll__interest_entry_reset(entry);
+		inst->events &= ~(EV_WRITE);
+	}
+	return ret;
+}
+
+int evtpoll_subscribe(evtpoll_t* ep, int event_mask, int fd, void* rdata, void* wdata)
+{
+	int ret = 0;
+	evtpoll_interest_t* inst;
+	inst = evtpoll__find_interest(ep, fd);
+	if (!inst) {
+		WLog_ERR(TAG, "the fd %d have not been registered", fd);
+		return -1;
+	}
+	if (event_mask & EPOLLET) {
+		WLog_WARN(TAG, "EPOLLET event type is not supported now.");
+		event_mask &= ~EPOLLET;
+	}
+	if (event_mask & ~(EV_READ_WRITE_WITH_LT_PURE)) {
+		WLog_ERR(TAG, "event type exclude read or write is not supported now.");
+		return  -1;
+	}
+	if (!ret && (event_mask & EV_READ)) {
+		ret = evtpoll__subscribe_read(ep, inst, rdata);
+		if (ret > 0) {
+			WLog_WARN(TAG, "read event has been registered");
+			ret = 0;
+		}
+	}
+
+	if (!ret && (event_mask & EV_WRITE)) {
+		ret = evtpoll__subscribe_write(ep, inst, wdata);
+		if (ret > 0) {
+			WLog_WARN(TAG, "write event has been registered");
+			ret = 0;
+		}
+	}
+
+	return ret;
+}
+
+int evtpoll_unsubscribe(evtpoll_t* ep, int event_mask, int fd, int only_reset)
+{
+	int ret;
+	evtpoll_interest_t* exist;
+	exist = evtpoll__find_interest(ep, fd);
+	if (!exist) {
+		WLog_ERR(TAG, "the fd %d have not been registered", fd);
+		return -1;
+	}
+	if (event_mask & ~(EV_READ_WRITE_WITH_LT_PURE)) {
+		WLog_WARN(TAG, "event type exclude read or write is not supported now.");
+		return  -1;
+	}
+	ret = 0;
+	if (!ret && (event_mask & EV_READ)) {
+		if (only_reset) {
+			evtpoll_interest_entry_t* entry =
+				ARRAY_IDX(exist->entries, EV_INTEREST_ENTRY_IN_IDX, evtpoll_interest_entry_t*);
+			evtpoll__interest_entry_reset(entry);
+		}
+		else {
+			ret = evtpoll__unsubscribe_read(ep, exist);
+		}
+	}
+
+	if (!ret && (event_mask & EV_WRITE)) {
+		if (only_reset) {
+			evtpoll_interest_entry_t* entry =
+				ARRAY_IDX(exist->entries, EV_INTEREST_ENTRY_OUT_IDX, evtpoll_interest_entry_t*);
+			evtpoll__interest_entry_reset(entry);
+		}
+		else {
+			ret = evtpoll__unsubscribe_write(ep, exist);
+		}
+	}
+	return ret;
+}
+
+static int evtpoll__deal_inner(evtpoll_interest_t* inst, int idx, void** p_io)
+{
+	int ret = -1;
+	evtpoll_interest_entry_t* entry = ARRAY_IDX(inst->entries, idx, evtpoll_interest_entry_t*);
+	assert(entry);
+	if (evtpoll__interest_entry_is_ready(entry)) {
+		WLog_DBG(TAG, "interest entry is ready.");
+		entry->pending = 1;
+		*p_io = entry->io;
+		evtpoll__interest_entry_reset(entry);
+		ret = 1;
+	}
+	else {
+		WLog_DBG(TAG, "interest entry is not ready.");
+	}
+	return ret;
+}
+
+int evtpoll_deal(evtpoll_t* ep, struct epoll_event* event, void** p_io)
+{
+	int ret = 0;
+	evtpoll_interest_t* inst = NULL;
+	evtpoll_interest_t* exist = NULL;
+	evtpoll_interest_entry_t* entry = NULL;
+
+	if (!(event->events & (EPOLLIN | EPOLLOUT))) {
+		WLog_DBG(TAG, "no event mask.");
+		return -1;
+	}
+
+	inst = (evtpoll_interest_t*)event->data.ptr;
+	assert(inst);
+#if 1 //DEBUG
+	assert(inst->owner == ep);
+	exist = evtpoll__find_interest(ep, inst->fd);
+	assert(exist);
+	assert(exist == inst);
+#endif
+
+	if (event->events & EPOLLIN) { // read
+		ret = evtpoll__deal_inner(inst, EV_INTEREST_ENTRY_IN_IDX, p_io);
+		event->events &= ~(EPOLLIN);
+		return ret;
+	}
+	if (event->events & EPOLLOUT) { // write
+		ret = evtpoll__deal_inner(inst, EV_INTEREST_ENTRY_OUT_IDX, p_io);
+		event->events &= ~(EPOLLOUT);
+		return ret;
+	}
+	return -1;
+}
+
+#endif //NOT _WIN32

+ 59 - 0
libtoolkit/evtpoll.h

@@ -0,0 +1,59 @@
+#ifndef _TOOLKIT_EVENT_POLL_H_
+#define _TOOLKIT_EVENT_POLL_H_
+
+#ifndef _WIN32
+
+#include "config.h"
+#include <sys/epoll.h>
+
+/*
+edge-triggered(ET): use this flag, the app should use nonblock fd
+level-triggered(LT): default
+*/
+
+#define MAX_EPOLL_EVENT 1024
+
+#define EV_READ EPOLLIN
+#define EV_ACCEPT EPOLLIN
+#define EV_WRITE EPOLLOUT
+/*to disable the associated fd after the receipt of an event*/
+#define EV_READ_ONCE (EPOLLIN | EPOLLONESHOT)
+#define EV_WRITE_ONCE (EPOLLOUT | EPOLLONESHOT)
+#define EV_READ_WRITE_WITH_LT_FULL (EPOLLOUT | EPOLLIN | EPOLLERR | EPOLLHUP)
+#define EV_READ_WRITE_WITH_LT_PURE (EPOLLOUT | EPOLLIN)
+
+#define DEFAULT_INTEREST_OP_COUNT		5
+#define EV_INTEREST_ENTRY_IN_IDX             0
+#define EV_INTEREST_ENTRY_OUT_IDX          1
+#define EV_INTEREST_ENTRY_ERR_IDX           2
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+	typedef struct event_epoll_s evtpoll_t;
+
+	evtpoll_t* evtpoll_create();
+
+	void evtpoll_destroy(evtpoll_t* evt_poll);
+
+	int evtpoll_get_epoll_fd(const evtpoll_t* const evt_poll);
+
+	int evtpoll_deal(evtpoll_t* ep, struct epoll_event* event, void** p_io);
+	
+	int evtpoll_attach(evtpoll_t* ep, int interest_fd);
+
+	void evtpoll_detach(evtpoll_t* ep, int interest_fd);
+
+	int evtpoll_subscribe(evtpoll_t* ep, int event_mask, int fd, void* rdata, void* wdata);
+
+	int evtpoll_unsubscribe(evtpoll_t* ep, int event_mask, int fd, int only_reset);
+
+#ifdef __cplusplus
+} // extern "C" {
+#endif
+
+
+#endif //NOT _WIN32
+
+#endif //_TOOLKIT_EVENT_POLL_H_

+ 6 - 0
libtoolkit/fileutil.h

@@ -36,11 +36,17 @@
 #define SPLIT_SLASH_STR BACK_SLASH_STR
 #define MB_SPLIT_SLASH_STR MB_BACK_SLASH_STR
 #define W_SPLIT_SLASH_STR   W_BACK_SLASH_STR
+
+#define LINE_BREAK_STR "\r\n"
+
 #else
 #define SPLIT_SLASH SLASH
 #define SPLIT_SLASH_STR SLASH_STR
 #define MB_SPLIT_SLASH_STR MB_SLASH_STR
 #define W_SPLIT_SLASH_STR   W_SLASH_STR
+
+#define LINE_BREAK_STR "\n"
+
 #endif
 
 #ifndef _WIN32

+ 26 - 679
libtoolkit/ioqueue-unix.c

@@ -7,8 +7,7 @@
 #include "sockutil.h"
 #include "bus.h"
 #include "array.h"
-
-#include <sys/epoll.h>
+#include "evtpoll.h"
 
 #include <winpr/file.h>
 #include <winpr/handle.h>
@@ -19,27 +18,6 @@
 
 #define TAG TOOLKIT_TAG("ioqueue_unix")
 
-/*
-edge-triggered(ET): use this flag, the app should use nonblock fd
-level-triggered(LT): default
-*/
-
-#define MAX_EPOLL_EVENT 1024
-
-#define EV_READ EPOLLIN
-#define EV_ACCEPT EPOLLIN
-#define EV_WRITE EPOLLOUT
-/*to disable the associated fd after the receipt of an event*/
-#define EV_READ_ONCE (EPOLLIN | EPOLLONESHOT)
-#define EV_WRITE_ONCE (EPOLLOUT | EPOLLONESHOT)
-#define EV_READ_WRITE_WITH_LT_FULL (EPOLLOUT | EPOLLIN | EPOLLERR | EPOLLHUP)
-#define EV_READ_WRITE_WITH_LT_PURE (EPOLLOUT | EPOLLIN)
-
-#define DEFAULT_INTEREST_OP_COUNT		5
-#define EV_INTEREST_ENTRY_IN_IDX             0
-#define EV_INTEREST_ENTRY_OUT_IDX          1
-#define EV_INTEREST_ENTRY_ERR_IDX           2
-
 #ifndef SO_UPDATE_CONNECT_CONTEXT
 #define SO_UPDATE_CONNECT_CONTEXT 0x7010
 #endif
@@ -60,7 +38,8 @@ level-triggered(LT): default
 struct ioqueue_t {
 	HANDLE iocp; //for build successfully temporary!!!!!!!!!!!!!!! TODO: delete it!!
 
-	int epfd;
+	evtpoll_t* ep;
+	
 	void *user_data;
 	/* timer */
 	spinlock_t tm_queue_lock;
@@ -73,9 +52,6 @@ struct ioqueue_t {
 	struct list_head connect_list;
 	spinlock_t handler_list_lock;
 	struct list_head handler_list;
-	/*insterest*/
-	spinlock_t interest_list_lock;
-	struct list_head interest_list;
 	LONG stop;
 };
 
@@ -226,622 +202,6 @@ typedef struct ioqueue_connectpipe_overlapped_t {
 	ioqueue_on_pipe_accept_callback on_accept_callback;
 }ioqueue_connectpipe_overlapped_t;
 
-//////////////////////////////////////////////////////////////////////////
-
-struct ioqueue_interest_entry_s {
-	struct list_head entry;
-	ioqueue_overlapped_t* io;
-	int type;
-	int events;
-	int pending;
-	void* key;
-	spinlock_t lock;
-	struct ioqueue_interest_s* owner;
-};
-
-
-struct ioqueue_interest_s {
-	struct list_head node; // for ioqueue_t::interest_list
-
-	struct list_head entry_list; //no used
-
-	int fd;
-	int events;
-	int type; //no used
-	ioqueue_t* owner;
-
-	array_header_t* entries;
-	spinlock_t lock;
-	volatile int entry_counts;
-};
-typedef struct ioqueue_interest_entry_s ioqueue_interest_entry_t;
-typedef struct ioqueue_interest_s ioqueue_interest_t;
-
-static __inline int ioqueue__get_raw_fd(ioqueue_t* ioq)
-{
-	return ioq->epfd;
-}
-
-static __inline void ioqueue__interest_entry_set(
-	ioqueue_interest_entry_t* entry, 
-	int events, 
-	int type,
-	int pending, 
-	void* key, 
-	ioqueue_overlapped_t* io)
-{
-	assert(entry);
-	assert(entry->owner);
-	entry->type = type;
-	entry->events = events;
-	entry->io = io;
-	entry->pending = pending;
-	entry->key = key;
-}
-
-static __inline void ioqueue__interest_entry_reset(ioqueue_interest_entry_t* entry) {
-	ioqueue__interest_entry_set(entry, 0, 0, 0, NULL, NULL);
-}
-
-static ioqueue_interest_entry_t* ioqueue__interest_entry_create(ioqueue_interest_t* interest)
-{
-	ioqueue_interest_entry_t* entry = MALLOC_T(ioqueue_interest_entry_t);
-	assert(entry);
-	entry->owner = interest;
-	ioqueue__interest_entry_reset(entry);
-	spinlock_init(&entry->lock);
-	return entry;
-}
-
-
-static __inline int ioqueue__interest_entry_is_ready(const ioqueue_interest_entry_t* const entry)
-{
-	if (entry->pending != 0) {
-		return 0;
-	}
-	if (!entry->io) {
-		return 0;
-	}
-	if (entry->type == 0 || entry->events == 0) {
-		return 0;
-	}
-	return 1;
-}
-
-static __inline void ioqueue__interest_entry_destroy(ioqueue_interest_entry_t* entry)
-{
-	free(entry);
-}
-
-static __inline void ioqueue__interest_lock(ioqueue_interest_t* interest)
-{
-	spinlock_enter(&interest->lock, 0);
-}
-
-static __inline void ioqueue__interest_unlock(ioqueue_interest_t* interest)
-{
-	spinlock_leave(&interest->lock);
-}
-
-static __inline void ioqueue___add_interest_list(ioqueue_interest_t* inst)
-{
-	ioqueue_t* ioq = inst->owner;
-	assert(ioq);
-	spinlock_enter(&ioq->interest_list_lock, -1);
-	list_add(&inst->node, &ioq->interest_list);
-	spinlock_leave(&ioq->interest_list_lock);
-}
-
-static __inline void ioqueue___del_interest_list(ioqueue_interest_t* inst)
-{
-	ioqueue_t* ioq = inst->owner;
-	assert(ioq);
-	spinlock_enter(&ioq->interest_list_lock, -1);
-	list_del(&inst->node);
-	inst->node.next = inst->node.prev = NULL;
-	spinlock_leave(&ioq->interest_list_lock);
-}
-
-static __inline void ioqueue__interest_free(ioqueue_interest_t* inst)
-{
-	ioqueue___del_interest_list(inst);
-}
-
-
-static int ioqueue__ctl(ioqueue_t* ioq, int event_mask, int ctrl_mod, int fd, void* data)
-{
-	int ret = -1;
-	struct epoll_event ee;
-	if (ioq) {
-		ee.events = event_mask;
-		if (data) {
-			ee.data.ptr = data;
-		}
-		else {
-			ee.data.fd = fd;
-		}
-		ret = epoll_ctl(ioqueue__get_raw_fd(ioq), ctrl_mod, fd, &ee);
-		if (ret == -1) {
-			WLog_ERR(TAG, "epoll ctl failed: %s", strerror(errno));
-		}
-		else {
-			WLog_INFO(TAG, "fd(%d), epoll_ctl: OUT:%d, IN:%d, OTHERS:0x%08X, ADD:%d, MOD:%d, data:0x%X ret:%d",
-				fd,
-				(event_mask & EPOLLOUT) > 0 ? 1 : 0, (event_mask & EPOLLIN) > 0 ? 1 : 0,
-				(event_mask & ~(EPOLLOUT | EPOLLIN)),
-				ctrl_mod == EPOLL_CTL_ADD ? 1 : 0, ctrl_mod == EPOLL_CTL_MOD ? 1 : 0, data, ret);
-		}
-	}
-	return ret;
-}
-
-static int ioqueue__ctl_remove(ioqueue_t* ioq, int event_mask, int fd, void* data)
-{
-	int ret = -1;
-	struct epoll_event ee;
-	ee.events = event_mask;
-	if (data)
-		ee.data.ptr = data;
-	else
-		ee.data.fd = fd;
-	ret = epoll_ctl(ioqueue__get_raw_fd(ioq), EPOLL_CTL_DEL, fd, &ee);
-	if (ret == 0 || (ret == -1 && errno == ENOENT)) {
-		return 0;
-	}
-	WLog_ERR(TAG, "epoll remove ctrl failed: %s", strerror(errno));
-	return -1;
-}
-
-/*!
- * @brief
-		 one 'ioqueue_interest_t' maps to one fd.
-		 each interest instance has {DEFAULT_INTEREST_OP_COUNT} ioqueue_interest_entry_t type object.
- * @param[in]  
- * @return : SUCC: new ioqueue_interest_t type pointer object; NULL: failed
- */
-static ioqueue_interest_t* ioqueue__interest_create(int fd)
-{
-	int i;
-	ioqueue_interest_t* inst = ZALLOC_T(ioqueue_interest_t);
-	if (!inst) {
-		return NULL;
-	}
-	inst->fd = fd;
-	INIT_LIST_HEAD(&inst->entry_list);
-	inst->entry_counts = 0;
-	inst->owner = NULL;
-	inst->type = 0;
-	inst->events = 0;
-
-	inst->entries = array_make(DEFAULT_INTEREST_OP_COUNT, sizeof(ioqueue_interest_entry_t*));
-	for (i = 0; i < DEFAULT_INTEREST_OP_COUNT; ++i) {
-		ARRAY_IDX(inst->entries, i, ioqueue_interest_entry_t*) = ioqueue__interest_entry_create(inst);
-	}
-	return inst;
-}
-
-static void ioqueue__interest_destroy(ioqueue_interest_t* inst)
-{
-	int i;
-	assert(inst);
-	assert(list_empty(&inst->entry_list));
-	for (i = 0; i < inst->entries->nelts; ++i)
-		ioqueue__interest_entry_destroy(ARRAY_IDX(inst->entries, i, ioqueue_interest_entry_t*));
-	array_free(inst->entries);
-	free(inst);
-}
-
-
-static ioqueue_interest_t* ioqueue__find_interest(const ioqueue_t* const cst_ioq, int fd)
-{
-	struct list_head* pos;
-	struct list_head* tmp;
-	if (list_empty(&cst_ioq->interest_list))
-		return NULL;
-	list_for_each_safe(pos, tmp, &cst_ioq->interest_list) {
-		ioqueue_interest_t* node = list_entry(pos, ioqueue_interest_t, node);
-		if (node->fd == fd) {
-			return node;
-		}
-	}
-	return NULL;
-}
-
-static void ioqueue__interest_clear(ioqueue_t* ioq)
-{
-	struct list_head* pos;
-	struct list_head* tmp;
-	if (list_empty(&ioq->interest_list))
-		return;
-	spinlock_enter(&ioq->interest_list_lock, -1);
-	list_for_each_safe(pos, tmp, &ioq->interest_list) {
-		ioqueue_interest_t* node = list_entry(pos, ioqueue_interest_t, node);
-		list_del(pos);
-		ioqueue__interest_destroy(node);
-	}
-	spinlock_leave(&ioq->interest_list_lock);
-	return;
-}
-
-static int ioqueue__register_interest(ioqueue_t* ioq, ioqueue_interest_t* inst)
-{
-	int ret = -1;
-	assert(ioq);
-	assert(inst);
-	ioqueue_interest_t* exist;
-#if 1
-	//注册后不立即进行监听
-	ret = 0;
-#else
-	ret = ioqueue__ctl(ioq, EV_READ_WRITE_WITH_LT_FULL, EPOLL_CTL_ADD, inst->fd, NULL);
-	if (ret != 0)
-		return ret;
-#endif
-	ioqueue___add_interest_list(inst);
-#if 1 //DEBUG
-	exist = ioqueue__find_interest(ioq, inst->fd);
-	assert(exist);
-	assert(exist == inst);
-#endif
-	return ret;
-}
-
-static int ioqueue__unregister_interest(ioqueue_t* ioq, int fd)
-{
-	int ret = 0;
-	ioqueue_interest_t* exist;
-	exist = ioqueue__find_interest(ioq, fd);
-	if (exist) {
-		ioqueue__interest_free(exist);
-#if 1//DEBUG
-		exist = ioqueue__find_interest(ioq, fd);
-		assert(!exist);
-#endif
-		ioqueue__interest_destroy(exist);
-	}
-	return ret;
-}
-
-static int ioqueue__attach(ioqueue_t* ioq, int interest_fd)
-{
-	int ret = 0;
-	assert(interest_fd > 0);
-	ioqueue_interest_t* exist;
-	WLog_DBG(TAG, "==> attach interest fd(%d) ", interest_fd);
-	exist = ioqueue__find_interest(ioq, interest_fd);
-	if (exist) {
-		WLog_WARN(TAG, "detect interest fd %d already exist.", interest_fd);
-		return -1;
-	}
-	exist = ioqueue__interest_create(interest_fd);
-	if (!exist) {
-		WLog_ERR(TAG, "create ioqueue insterest fd %d failed!", interest_fd);
-		return -1;
-	}
-	exist->owner = ioq;
-	ret = ioqueue__register_interest(ioq, exist);
-	if (ret != 0) {
-		ioqueue__interest_destroy(exist);
-		return -1;
-	}
-	WLog_DBG(TAG, "<== normal attach interest fd(%d) succ.", interest_fd);
-	return 0;
-}
-
-static void ioqueue__detach(ioqueue_interest_t* inst)
-{
-	ioqueue__unregister_interest(inst->owner, inst);
-}
-
-static int ioqueue__subscribe_update(ioqueue_interest_entry_t* entry, int exist, ioqueue_overlapped_t* data)
-{
-	if (exist) {
-		if (entry->pending) {
-			WLog_ERR(TAG, "the entry is still pending...");
-			return -1;
-		}
-		else if (entry->io == data) {
-			WLog_WARN(TAG, "entry already exists and seems same, return previously.");
-			return 0;
-		}
-	}
-	if (data != NULL) {
-		int io_type = ioqueue_overlapped_get_type(data);
-		WLog_INFO(TAG, "ioqueue ov type for subscribing: 0x%X", io_type);
-	}
-	else if (!entry->io) {
-		WLog_ERR(TAG, "no io data for event!");
-		return -1;
-	}
-	return 1;
-}
-
-static int ioqueue__subscribe_read(ioqueue_t* ioq, ioqueue_interest_t* inst, ioqueue_overlapped_t* data)
-{
-	int ret;
-	int io_type = 0;
-	int has = 0;
-	int op = EPOLL_CTL_ADD;
-	int events = EPOLLIN;
-	ioqueue_interest_entry_t* entry;
-	if (inst->events & EV_READ) {
-		has = 1;
-	}
-
-	entry = ARRAY_IDX(inst->entries, EV_INTEREST_ENTRY_IN_IDX, ioqueue_interest_entry_t*);
-	assert(entry);
-
-	ret = ioqueue__subscribe_update(entry, has, data);
-	if (ret < 1) {
-		return ret;
-	}
-
-	ret = 0;
-	io_type = ioqueue_overlapped_get_type(data);
-
-	if (has) {
-		ioqueue__interest_entry_set(entry, EV_READ, io_type, 0, NULL, data);
-		WLog_INFO(TAG, "fd(%d): read entry existed, only update the ov and returned.", inst->fd);
-	}
-	else {
-
-		if (inst->events & EV_WRITE) {
-			op = EPOLL_CTL_MOD;
-			events |= EPOLLOUT;
-		}
-		ret = ioqueue__ctl(ioq, events, op, inst->fd, inst);
-		if (ret) {
-			WLog_ERR(TAG, "fd(%d): set read register failed !", inst->fd);
-		}
-		else {
-			inst->events |= EV_READ;
-			ioqueue__interest_entry_set(entry, EV_READ, io_type, 0, NULL, data);
-		}
-	}
-
-	return ret;
-}
-
-static int ioqueue__unsubscribe_read(ioqueue_t* ioq, ioqueue_interest_t* inst)
-{
-	int ret;
-	int events = EPOLLIN;
-	int op = EPOLL_CTL_DEL;
-	ioqueue_interest_entry_t* entry = NULL;
-	if (!(inst->events & EV_READ)) {
-		WLog_WARN(TAG, "read event is not existed");
-		return 0;
-	}
-	if (inst->events & EV_WRITE) {
-		op = EPOLL_CTL_MOD;
-		events = EPOLLOUT;
-	}
-
-	ret = ioqueue__ctl(ioq, events, op, inst->fd, inst);
-	if (ret) {
-		WLog_ERR(TAG, "un read register failed !");
-	} else {
-		inst->events &= ~(EV_READ);
-		entry = ARRAY_IDX(inst->entries, EV_INTEREST_ENTRY_IN_IDX, ioqueue_interest_entry_t*);
-		ioqueue__interest_entry_reset(entry);
-	}
-	return ret;
-}
-
-static int ioqueue__subscribe_write(ioqueue_t* ioq, ioqueue_interest_t* inst, ioqueue_overlapped_t* data)
-{
-	int ret;
-	int io_type = -1;
-	int has = 0;
-	int op = EPOLL_CTL_ADD;
-	int events = EPOLLOUT;
-	ioqueue_overlapped_t* old_ov = NULL;
-	ioqueue_interest_entry_t* entry = NULL;
-	if (inst->events & EV_WRITE) {
-		has = 1;
-	}
-
-	entry = ARRAY_IDX(inst->entries, EV_INTEREST_ENTRY_OUT_IDX, ioqueue_interest_entry_t*);
-	assert(entry);
-
-	ret = ioqueue__subscribe_update(entry, has, data);
-	if (ret < 1) {
-		return ret;
-	}
-
-	ret = 0;
-	io_type = ioqueue_overlapped_get_type(data);
-	
-	if (has) {
-		ioqueue__interest_entry_set(entry, EV_WRITE, io_type, 0, NULL, data);
-		WLog_INFO(TAG, "fd(%d): write entry existed, only update the ov and returned.", inst->fd);
-	}
-	else {
-
-		if (inst->events & EV_READ) {
-			op = EPOLL_CTL_MOD;
-			events |= EPOLLIN;
-		}
-		ret = ioqueue__ctl(ioq, events, op, inst->fd, inst);
-		if (ret) {
-			WLog_ERR(TAG, "fd(%d): set write register failed !", inst->fd);
-		}
-		else {
-			inst->events |= EV_WRITE;
-			ioqueue__interest_entry_set(entry, EV_WRITE, io_type, 0, NULL, data);
-		}
-	}
-
-	return ret;
-}
-
-static int ioqueue__unsubscribe_write(ioqueue_t* ioq, ioqueue_interest_t* inst)
-{
-	int ret;
-	int events = EPOLLOUT;
-	int op = EPOLL_CTL_DEL;
-	if (!(inst->events & EV_WRITE)) {
-		WLog_WARN(TAG, "write event is not existed");
-		return 0;
-	}
-	if (inst->events & EV_READ) {
-		op = EPOLL_CTL_MOD;
-		events = EPOLLIN;
-	}
-
-	ret = ioqueue__ctl(ioq, events, op, inst->fd, inst);
-	if (ret) {
-		WLog_ERR(TAG, "un read register failed !");
-	}
-	else {
-		ioqueue_interest_entry_t* entry = 
-			ARRAY_IDX(inst->entries, EV_INTEREST_ENTRY_OUT_IDX, ioqueue_interest_entry_t*);
-		ioqueue__interest_entry_reset(entry);
-		inst->events &= ~(EV_WRITE);
-	}
-	return ret;
-}
-
-static int ioqueue__subscribe(ioqueue_t* ioq, int event_mask, int fd, ioqueue_overlapped_t* fdata, ioqueue_overlapped_t* wdata)
-{	
-	int ret = 0;
-	ioqueue_interest_t* inst;
-	inst = ioqueue__find_interest(ioq, fd);
-	if (!inst) {
-		WLog_ERR(TAG, "the fd %d have not been registered", fd);
-		return -1;
-	}
-	if (event_mask & EPOLLET) {
-		WLog_WARN(TAG, "EPOLLET event type is not supported now.");
-		event_mask &= ~EPOLLET;
-	}
-	if (event_mask & ~(EV_READ_WRITE_WITH_LT_PURE)) {
-		WLog_ERR(TAG, "event type exclude read or write is not supported now.");
-		return  -1;
-	}
-	if (!ret && (event_mask & EV_READ)) {
-		ret = ioqueue__subscribe_read(ioq, inst, fdata);
-		if (ret > 0) {
-			WLog_WARN(TAG, "read event has been registered");
-			ret = 0;
-		}
-	}
-
-	if (!ret && (event_mask & EV_WRITE)) {
-		ret = ioqueue__subscribe_write(ioq, inst, wdata);
-		if (ret > 0) {
-			WLog_WARN(TAG, "write event has been registered");
-			ret = 0;
-		}
-	}
-
-	return ret;
-}
-
-static int ioqueue__unsubscribe(ioqueue_t* ioq, int event_mask, int fd, int only_reset)
-{
-	int ret;
-	ioqueue_interest_t* exist;
-	exist = ioqueue__find_interest(ioq, fd);
-	if (!exist) {
-		WLog_ERR(TAG, "the fd %d have not been registered", fd);
-		return -1;
-	}
-	if (event_mask & ~(EV_READ_WRITE_WITH_LT_PURE)) {
-		WLog_WARN(TAG, "event type exclude read or write is not supported now.");
-		return  -1;
-	}
-	ret = 0;
-	if (!ret && (event_mask & EV_READ)) {
-		if (only_reset) {
-			ioqueue_interest_entry_t* entry = 
-				ARRAY_IDX(exist->entries, EV_INTEREST_ENTRY_IN_IDX, ioqueue_interest_entry_t*);
-			ioqueue__interest_entry_reset(entry);
-		}
-		else {
-			ret = ioqueue__unsubscribe_read(ioq, exist);
-		}
-	}
-
-	if (!ret && (event_mask & EV_WRITE)) {
-		if (only_reset) {
-			ioqueue_interest_entry_t* entry = 
-				ARRAY_IDX(exist->entries, EV_INTEREST_ENTRY_OUT_IDX, ioqueue_interest_entry_t*);
-			ioqueue__interest_entry_reset(entry);
-		}
-		else {
-			ret = ioqueue__unsubscribe_write(ioq, exist);
-		}
-	}
-	return ret;
-}
-
-static int ioqueue__deal_inner(ioqueue_interest_t* inst, int idx, ioqueue_overlapped_t** p_io)
-{
-	int ret = -1;
-	ioqueue_interest_entry_t* entry = ARRAY_IDX(inst->entries, idx, ioqueue_interest_entry_t*);
-	assert(entry);
-	if (ioqueue__interest_entry_is_ready(entry)) {
-		WLog_DBG(TAG, "interest entry is ready.");
-		entry->pending = 1;
-		*p_io = entry->io;
-		ioqueue__interest_entry_reset(entry);
-		ret = 1;
-	}
-	else {
-		WLog_DBG(TAG, "interest entry is not ready.");
-	}
-	return ret;
-}
-
-static int ioqueue__deal(ioqueue_t* ioq, struct epoll_event* event, ioqueue_overlapped_t** p_io)
-{
-	int ret = 0;
-	ioqueue_interest_t* inst = NULL;
-	ioqueue_interest_t* exist = NULL;
-	ioqueue_interest_entry_t* entry = NULL;
-
-	if (!(event->events & (EPOLLIN | EPOLLOUT))) {
-		WLog_DBG(TAG, "no event mask.");
-		return -1;
-	}
-		
-	inst = (ioqueue_interest_t*)event->data.ptr;
-	assert(inst);
-#if 1 //DEBUG
-	assert(inst->owner == ioq);
-	exist = ioqueue__find_interest(ioq, inst->fd);
-	assert(exist);
-	assert(exist == inst);
-#endif
-
-	if (event->events & EPOLLIN) { // read
-		ret = ioqueue__deal_inner(inst, EV_INTEREST_ENTRY_IN_IDX, p_io);
-		event->events &= ~(EPOLLIN);
-		return ret;
-	} 
-	if (event->events & EPOLLOUT) { // write
-		ret = ioqueue__deal_inner(inst, EV_INTEREST_ENTRY_OUT_IDX, p_io);
-		event->events &= ~(EPOLLOUT);
-		return ret;
-	}
-	return -1;
-}
-
-//////////////////////////////////////////////////////////////////////////
-
-static int reuse_addr(SOCKET sock)
-{
-	BOOL reuseaddr = 1;
-	return setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char*)&reuseaddr, sizeof(reuseaddr));
-}
-
-static int nonblock_sock(SOCKET sock)
-{
-	unsigned long v = 1;
-	return _ioctlsocket(sock, FIONBIO, &v);
-}
-
 static int is_os_gte_xp() /* is os version greater and equal than xp */
 {
 	static int yes = -1;
@@ -947,28 +307,25 @@ TOOLKIT_API ioqueue_t *ioqueue_create()
 	ioqueue_t *ioq = ZALLOC_T(ioqueue_t);
 	if (!ioq)
 		return NULL;
-	/*
-	since Linux 2.6.8 the argument is ignored but must be g0 for compatibility,
-	bcz kernel would allocate space dynamically.
-	*/
-	ioq->epfd = epoll_create(MAX_EPOLL_EVENT);
-		if (ioq->epfd == -1) {
-			WLog_ERR(TAG, "epoll fd create failed: %s", strerror(errno));
-			goto on_error_0;
-		}
-	if (timer_heap_create(&ioq->tm_queue) != 0)
+
+	ioq->ep = evtpoll_create();
+	if (!ioq->ep) {
+		goto on_error_0;
+	}
+
+	if (timer_heap_create(&ioq->tm_queue) != 0) {
 		goto on_error_3;
+	}
 	spinlock_init(&ioq->tm_queue_lock);
 	spinlock_init(&ioq->connect_list_lock);
 	INIT_LIST_HEAD(&ioq->connect_list);
 	spinlock_init(&ioq->handler_list_lock);
 	INIT_LIST_HEAD(&ioq->handler_list);
-	spinlock_init(&ioq->interest_list_lock);
-	INIT_LIST_HEAD(&ioq->interest_list);
+
 	return ioq;
 
 on_error_3:
-	close(ioq->epfd);
+	evtpoll_destroy(ioq->ep);
 on_error_0:
 	free(ioq);
 	return NULL;
@@ -979,9 +336,8 @@ TOOLKIT_API void ioqueue_destroy(ioqueue_t *ioq)
 	assert(ioq);
 	assert(ioqueue_handler_empty(ioq));
 	assert(ioqueue_msg_empty(ioq));
-	assert(ioqueue_interest_empty(ioq));
 	timer_queue_destroy(ioq->tm_queue);
-	close(ioq->epfd);
+	evtpoll_destroy(ioq->ep);
 	free(ioq);
 }
 
@@ -1001,16 +357,6 @@ TOOLKIT_API int ioqueue_msg_empty(ioqueue_t *ioq)
 	return ioq->msg_cnt == 0;
 }
 
-TOOLKIT_API int ioqueue_interest_empty(ioqueue_t* ioq)
-{
-	int ret;
-	assert(ioq);
-	spinlock_enter(&ioq->interest_list_lock, -1);
-	ret = list_empty(&ioq->interest_list);
-	spinlock_leave(&ioq->interest_list_lock);
-	return ret;
-}
-
 TOOLKIT_API int ioqueue_msg_add_handler(ioqueue_t *ioq, int msg_type, int priority, ioqueue_on_msg_callback cb)
 {
 	assert(ioq);
@@ -1285,7 +631,7 @@ static void dispatch_network(BOOL ret, DWORD dwBytesTransfer, ioqueue_overlapped
 							overlapped->base.ov.Offset = 0;
 							overlapped->base.ov.OffsetHigh = 0;
 							WLog_WARN(TAG, "OV_SENDN: Must be due with this situation %d < %d", overlapped->sended_bytes, overlapped->total_bytes);
-							ioqueue__subscribe(ioqueue_tcpsock_get_owned_ioqueue(tcpsock), EV_WRITE, tcpsock->u.sock, NULL, io_ctx);
+							evtpoll_subscribe(ioqueue_tcpsock_get_owned_ioqueue(tcpsock)->ep, EV_WRITE, tcpsock->u.sock, NULL, io_ctx);
 							//if (rc != 0 && WSAGetLastError() != WSA_IO_PENDING) {
 							//	dec_pending_io(handle_ctx);
 							//	overlapped->on_send_callback(handle_ctx, io_ctx, overlapped->original_buf, 
@@ -1293,7 +639,7 @@ static void dispatch_network(BOOL ret, DWORD dwBytesTransfer, ioqueue_overlapped
 							//}
 						} else {
 							ioqueue_tcpsock_t* tcpsock = overlapped->base.handle_ctx;
-							ioqueue__unsubscribe(ioqueue_tcpsock_get_owned_ioqueue(tcpsock), EV_WRITE, tcpsock->u.sock, 0);
+							evtpoll_unsubscribe(ioqueue_tcpsock_get_owned_ioqueue(tcpsock)->ep, EV_WRITE, tcpsock->u.sock, 0);
 							overlapped->on_send_callback(handle_ctx, io_ctx, overlapped->original_buf, 
 								overlapped->sended_bytes, base_ov->user_data, err);
 						}
@@ -1361,7 +707,7 @@ static void dispatch_network(BOOL ret, DWORD dwBytesTransfer, ioqueue_overlapped
 							overlapped->base.ov.OffsetHigh = 0;
 							overlapped->dwFlags = 0;
 							WLog_WARN(TAG, "OV_RECVN: Must be due with this situation %d < %d", overlapped->recved_bytes, overlapped->total_bytes);
-							ioqueue__subscribe(ioqueue_tcpsock_get_owned_ioqueue(tcpsock),  EV_READ, tcpsock->u.sock, io_ctx, NULL);
+							evtpoll_subscribe(ioqueue_tcpsock_get_owned_ioqueue(tcpsock)->ep,  EV_READ, tcpsock->u.sock, io_ctx, NULL);
 							//if (rc != 0 && WSAGetLastError() != WSA_IO_PENDING) {
 							//	dec_pending_io(handle_ctx);
 							//	overlapped->on_recv_callback(handle_ctx, io_ctx, overlapped->original_buf, 
@@ -1590,7 +936,7 @@ TOOLKIT_API int ioqueue_poll(ioqueue_t* q, int timeout)
 		BOOL ret;
 		DWORD dwBytesTransfer = 0;
 		//有时会出现惊群的问题!!
-		int epfd = ioqueue__get_raw_fd(ioq);
+		int epfd = evtpoll_get_epoll_fd(ioq->ep);
 		nfds = epoll_wait(epfd, events, MAX_EPOLL_EVENT, t ? 0 : timeout);
 		if ((nfds < 0 && EINTR != errno) || nfds == 0) {
 			t = 0;
@@ -1607,7 +953,7 @@ TOOLKIT_API int ioqueue_poll(ioqueue_t* q, int timeout)
 
 			while (n >= 0) {
 				ioqueue_overlapped_t* io_ctx = NULL;
-				n = ioqueue__deal(ioq, &events[i], &io_ctx);
+				n = evtpoll_deal(ioq->ep, &events[i], &io_ctx);
 				if (n > 0) {
 					assert(io_ctx);
 					pre_dispatch_network(&ret, &dwBytesTransfer, io_ctx, epfd);
@@ -1618,7 +964,8 @@ TOOLKIT_API int ioqueue_poll(ioqueue_t* q, int timeout)
 			}
 		}
 		if (nfds < 0) {
-			WLog_WARN(TAG, "nfds < 0, error: %s(%d)", strerror(errno), errno);
+			WLog_ERR(TAG, "nfds < 0, error: %s(%d)", strerror(errno), errno);
+			break;
 		}
 	} while (t > 0);
 
@@ -1719,7 +1066,7 @@ TOOLKIT_API int ioqueue_acceptor_create(ioqueue_t *ioq,
 	service.sin_addr.s_addr = ip ? inet_addr(ip) : htonl(INADDR_ANY);
 	if (bind(acceptor->u.sock, (struct sockaddr*)&service, sizeof(struct sockaddr)) != 0)
 		goto on_error;
-	if (ioqueue__attach(ioq, acceptor->u.sock)) {
+	if (evtpoll_attach(ioq->ep, acceptor->u.sock)) {
 		goto on_error;
 	}
 	acceptor->type = HANDLE_TYPE_ACCEPTOR;
@@ -1790,7 +1137,7 @@ TOOLKIT_API int ioqueue_acceptor_async_accept(ioqueue_acceptor_t* acceptor,
 	overlapped->base.handle_ctx = acceptor;
 	inc_pending_io(acceptor);
 	overlapped->on_accept_callback = on_accept_callback;
-	ret = ioqueue__subscribe(ioq, EV_ACCEPT, acceptor->u.sock, ov, NULL);
+	ret = evtpoll_subscribe(ioq->ep, EV_ACCEPT, acceptor->u.sock, ov, NULL);
 	if (ret == 0) {
 		WLog_INFO(TAG, "acceptor subscribes accept event succ.");
 		return 0;
@@ -1850,7 +1197,7 @@ TOOLKIT_API int ioqueue_acceptor_create_client(ioqueue_acceptor_t* acceptor, SOC
 	tcpsock->user_data = NULL;
 	fastlock_init(tcpsock->ov_pending_list_lock);
 	INIT_LIST_HEAD(&tcpsock->ov_pending_list);
-	if (0 != ioqueue__attach(ioq, s)) {
+	if (0 != evtpoll_attach(ioq->ep, s)) {
 		return -1;
 	}
 	add_handler_list(tcpsock, ioq);
@@ -2166,7 +1513,7 @@ TOOLKIT_API int ioqueue_tcpsock_async_sendn(ioqueue_tcpsock_t *tcpsock,
 	overlapped->total_bytes = len;
 	inc_pending_io(tcpsock);
 	WLog_INFO(TAG, "ioqueue_tcpsock_async_sendn fired! sock: %d", tcpsock->u.sock);
-	rc = ioqueue__subscribe(ioq, EV_WRITE, tcpsock->u.sock, NULL, ov);
+	rc = evtpoll_subscribe(ioq->ep, EV_WRITE, tcpsock->u.sock, NULL, ov);
 	if (!rc) {
 		return 0;
 	}
@@ -2310,7 +1657,7 @@ TOOLKIT_API int ioqueue_tcpsock_async_recvn(ioqueue_tcpsock_t *tcpsock,
 	overlapped->total_bytes = len;
 	overlapped->dwFlags = 0;
 	inc_pending_io(tcpsock);
-	rc = ioqueue__subscribe(ioq, EV_READ, tcpsock->u.sock, ov, NULL);
+	rc = evtpoll_subscribe(ioq->ep, EV_READ, tcpsock->u.sock, ov, NULL);
 	if (!rc) {
 		return 0;
 	}

+ 0 - 12
libtoolkit/ioqueue-win.c

@@ -202,18 +202,6 @@ typedef struct ioqueue_connectpipe_overlapped_t {
 	ioqueue_on_pipe_accept_callback on_accept_callback;
 }ioqueue_connectpipe_overlapped_t;
 
-static int reuse_addr(SOCKET sock)
-{
-	BOOL reuseaddr = 1;
-	return setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char*)&reuseaddr, sizeof(reuseaddr));
-}
-
-static int nonblock_sock(SOCKET sock)
-{
-	unsigned long v = 1;
-	return _ioctlsocket(sock, FIONBIO, &v);
-}
-
 static int is_os_gte_xp() /* is os version greater and equal than xp */
 {
 	static int yes = -1;

+ 0 - 3
libtoolkit/ioqueue.h

@@ -777,9 +777,6 @@ TOOLKIT_API int ioqueue_pipe_acceptor_close_pending_handle(ioqueue_pipe_acceptor
 
 #ifndef _WIN32
 
-TOOLKIT_API int ioqueue_interest_empty(ioqueue_t* ioq);
-//TOOLKIT_API int ioqueue_attach(ioqueue_t* ioq, int interest_fd);
-
 TOOLKIT_API int ioqueue_overlapped_get_mask(const ioqueue_overlapped_t* const io);
 TOOLKIT_API void ioqueue_overlapped_set_mask(ioqueue_overlapped_t* io, int mask_value);
 TOOLKIT_API int ioqueue_overlapped_get_type(const ioqueue_overlapped_t* const io);

+ 1 - 6
libtoolkit/log_periodic.c

@@ -266,13 +266,8 @@ static int periodicfilefactory_record_log(void *self,
 				st.wHour, st.wMinute, st.wSecond, st.wMilliseconds, log_level2str(level));
 			plog->first_record = 0;
 		} else {
-#ifdef _WIN32
-			n = sprintf(tmp, "\r\n[%02d:%02d:%02d.%03d][%u][%s] ",
+			n = sprintf(tmp, LINE_BREAK_STR "[%02d:%02d:%02d.%03d][%05u][%s] ",
 				st.wHour, st.wMinute, st.wSecond, st.wMilliseconds, GetCurrentThreadId(), log_level2str(level));
-#else
-			n = sprintf(tmp, "\n[%02d:%02d:%02d.%03d][%s] ",
-				st.wHour, st.wMinute, st.wSecond, st.wMilliseconds, log_level2str(level));
-#endif // _WIN32
 		}
 		_fwrite_nolock(tmp, n, 1, plog->fp);
 		_fwrite_nolock(s, sn, 1, plog->fp);

+ 12 - 0
libtoolkit/sockutil.h

@@ -88,6 +88,18 @@ static int trecv_n_v3(int fd,
 	return trecv_n_v(fd, &wbuf[0], 3, timeout);
 }
 
+static int nonblock_sock(SOCKET sock)
+{
+	unsigned long v = 1;
+	return _ioctlsocket(sock, FIONBIO, &v);
+}
+
+static int reuse_addr(SOCKET sock)
+{
+	BOOL reuseaddr = 1;
+	return setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char*)&reuseaddr, sizeof(reuseaddr));
+}
+
 TOOLKIT_API int disable_udp_connreset(SOCKET sock);
 
 TOOLKIT_API int tsendto(int fd, const char *buf, int n, const struct sockaddr* to, int tolen, long timeout);