浏览代码

Z991239-351 #comment 优化 evtpoll 机制,winpr 进程等待优化

gifur 5 年之前
父节点
当前提交
e425778dbb

+ 147 - 63
libtoolkit/bus-unix.c

@@ -2,7 +2,6 @@
 #include "bus.h"
 #include "sockutil.h"
 #include "url.h"
-#include "memutil.h"
 #include "spinlock.h"
 #include "list.h"
 #include "bus_internal.h"
@@ -13,6 +12,8 @@
 #include <winpr/synch.h>
 #include <winpr/string.h>
 
+#include <sys/eventfd.h>
+
 #define TAG TOOLKIT_TAG("bus_unix")
 
 #define BUS_RESULT_DATA		    1				// ==BUS_TYPE_PACKET, callback: callback.on_pkt,  no use
@@ -26,7 +27,7 @@ typedef struct msg_t {
 	struct list_head entry;
 	int type;
 	int nparam;
-	int* params;
+	param_size_t* params;
 	HANDLE evt;
 	int evt_result;
 }msg_t;
@@ -41,10 +42,12 @@ struct bus_endpt_t {
 	char* url;
 	/*define here or iom_t area, this is definitely a problem for now.*/
 	evtpoll_t* ep;
+	int msg_fd;
+	event_epoll_data_t* msg_sem;
 	bus_endpt_callback callback;
 	struct list_head msg_list;
 	spinlock_t msg_lock;
-	HANDLE msg_sem;
+	
 	HANDLE tx_evt; //manually
 	HANDLE rx_evt; //manually
 	OVERLAPPED rx_overlapped;
@@ -61,6 +64,15 @@ static void free_msg(msg_t* msg)
 	free(msg);
 }
 
+static __inline int bus_endpoint__data_is_handle(const bus_endpt_t* endpt, void* data)
+{
+	if ((endpt->type == TYPE_TCP && data == &endpt->sock_handle) ||
+		(endpt->type == TYPE_PIPE && data == &endpt->pipe_handle)) {
+		return 1;
+	}
+	return 0;
+}
+
 static int to_result(int pkt_type)
 {
 	switch (pkt_type) {
@@ -154,7 +166,7 @@ static int tcp_send_buf(bus_endpt_t* endpt, const char* buf, int n)
 {
 	DWORD left = n;
 	DWORD offset = 0;
-	WLog_DBG(TAG, "fd(%d): tcp send buf len: %d", endpt->sock_handle, n);
+	WLog_DBG(TAG, "==> fd(%d): tcp send buf len: %d", endpt->sock_handle, n);
 	while (left > 0) {
 		BOOL ret;
 		WSABUF wsabuf;
@@ -206,9 +218,11 @@ static int tcp_send_buf(bus_endpt_t* endpt, const char* buf, int n)
 			left -= dwBytesTransfer;
 		}
 		else {
+			WLog_DBG(TAG, "<== error fd(%d): tcp send buf left: %d", endpt->sock_handle, left);
 			return -1;
 		}
 	}
+	WLog_DBG(TAG, "<== fd(%d): tcp send buf len: %d", endpt->sock_handle, n);
 	return 0;
 }
 
@@ -390,13 +404,8 @@ static int start_read_pkt(bus_endpt_t* endpt, iobuffer_t** p_pkt)
 	iobuffer_t* pkt = NULL;
 
 	*p_pkt = NULL;
-	WLog_DBG(TAG, "==> start_read_pkt");
-
+	WLog_DBG(TAG, "==>endpt(%d): start_read_pkt", endpt->epid);
 	ResetEvent(endpt->rx_evt);
-	/*we control it.*/
-	//memset(&endpt->rx_overlapped, 0, sizeof(OVERLAPPED));
-	//ioqueue_overlapped_set_mask(&endpt->rx_overlapped, sizeof(OVERLAPPED));
-	endpt->rx_overlapped.hEvent = endpt->rx_evt;
 	endpt->rx_pending_pkt_uc_len = 0;
 
 	if (endpt->type == TYPE_PIPE) {
@@ -437,7 +446,7 @@ static int start_read_pkt(bus_endpt_t* endpt, iobuffer_t** p_pkt)
 		return 0;
 	}
 	else if(errno == EAGAIN || errno == EWOULDBLOCK) {
-		WLog_DBG(TAG, "set rx pending flag.");
+		WLog_DBG(TAG, "endpt(%d): set rx pending flag.", endpt->epid);
 		endpt->rx_pending = 1;
 		return 0;
 	}
@@ -486,6 +495,7 @@ static int read_left_pkt(bus_endpt_t* endpt, iobuffer_t** p_pkt)
 
 	iobuffer_push_count(pkt, endpt->rx_pending_pkt_len);
 	*p_pkt = pkt;
+	WLog_DBG(TAG, "endpt(%d): reset rx_pending", endpt->epid);
 	endpt->rx_pending = 0;
 	return 0;
 }
@@ -529,6 +539,8 @@ TOOLKIT_API int bus_endpt_create(const char* url, int epid, const bus_endpt_call
 
 	endpt = ZALLOC_T(bus_endpt_t);
 	endpt->sock_handle = -1;
+	endpt->msg_fd = -1;
+	endpt->ep = NULL;
 	endpt->url = tmp_url;
 
 	if (_stricmp(uf.scheme, "tcp") == 0) {
@@ -559,7 +571,12 @@ TOOLKIT_API int bus_endpt_create(const char* url, int epid, const bus_endpt_call
 	endpt->tx_evt = CreateEventA(NULL, TRUE, FALSE, NULL);
 	endpt->rx_evt = CreateEventA(NULL, TRUE, FALSE, NULL);
 	endpt->rx_buf_queue = iobuffer_queue_create();
-	endpt->msg_sem = CreateSemaphoreA(NULL, 0, 0x7fffffff, NULL);
+	endpt->msg_fd = eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE);
+	if (endpt->msg_fd == -1) {
+		WLog_ERR(TAG, "create event fd failed: %s(%d)", strerror(errno), errno);
+		goto on_error;
+	}
+
 	INIT_LIST_HEAD(&endpt->msg_list);
 	spinlock_init(&endpt->msg_lock);
 	memcpy(&endpt->callback, callback, sizeof(bus_endpt_callback));
@@ -590,19 +607,30 @@ TOOLKIT_API int bus_endpt_create(const char* url, int epid, const bus_endpt_call
 	if (ans_buf)
 		iobuffer_destroy(ans_buf);
 
+	if (-1 == evtpoll_attach(endpt->ep, endpt->msg_fd)) {
+		goto on_error;
+	}
+	if (evtpoll_subscribe(endpt->ep, EV_READ, endpt->msg_fd, &endpt->msg_fd, NULL)) {
+		WLog_ERR(TAG, "epoll subscribe bus endpt eventfd failed.");
+		goto on_error_msg;
+	}
+
 	/* subscribe read event [3/27/2020 Gifur] */
-	ioqueue_overlapped_set_mask(&endpt->rx_overlapped, sizeof(OVERLAPPED));
 	if (evtpoll_attach(endpt->ep, endpt->sock_handle)) {
 		WLog_ERR(TAG, "epoll attch bus endpt failed.");
-		goto on_error;
+		goto on_error_msg;
 	}
-	if (evtpoll_subscribe(endpt->ep, EV_READ, endpt->sock_handle, &endpt->rx_overlapped, NULL)) {
+	if (evtpoll_subscribe(endpt->ep, EV_READ, endpt->sock_handle, &endpt->sock_handle, NULL)) {
 		WLog_ERR(TAG, "epoll subscribe bus endpt failed.");
-		goto on_error;
+		goto on_error_handle;
 	}
 	*p_endpt = endpt;
 	return 0;
 
+on_error_handle:
+	evtpoll_detach(endpt->ep, endpt->sock_handle);
+on_error_msg:
+	evtpoll_detach(endpt->ep, endpt->msg_fd);
 on_error:
 	if (endpt->type == TYPE_TCP) {
 		closesocket(endpt->sock_handle);
@@ -610,8 +638,8 @@ on_error:
 	else if (endpt->type == TYPE_PIPE) {
 		CloseHandle(endpt->pipe_handle);
 	}
-	if (endpt->msg_sem)
-		CloseHandle(endpt->msg_sem);
+	if (endpt->msg_fd > 0)
+		close(endpt->msg_fd);
 	if (endpt->tx_evt)
 		CloseHandle(endpt->tx_evt);
 	if (endpt->rx_evt)
@@ -626,6 +654,9 @@ on_error:
 		iobuffer_destroy(buf);
 	if (ans_buf)
 		iobuffer_destroy(ans_buf);
+	if (endpt->ep != NULL) {
+		evtpoll_destroy(endpt->ep);
+	}
 	return -1;
 }
 
@@ -673,8 +704,8 @@ on_error:
 	else if (endpt->type == TYPE_PIPE) {
 		CloseHandle(endpt->pipe_handle);
 	}
-	if (endpt->msg_sem)
-		CloseHandle(endpt->msg_sem);
+	if (endpt->msg_fd)
+		close(endpt->msg_fd);
 	if (endpt->tx_evt)
 		CloseHandle(endpt->tx_evt);
 	if (endpt->rx_evt)
@@ -734,30 +765,25 @@ static int bus_endpt_poll_internal(bus_endpt_t* endpt, int* result, int timeout)
 				int i;
 				struct epoll_event events[MAX_EPOLL_EVENT];
 				struct epoll_event* pe;
-				nfds = epoll_wait(evtpoll_get_epoll_fd(endpt->ep) , events, MAX_EPOLL_EVENT, timeout);
+				nfds = evtpoll_wait(endpt->ep, events, MAX_EPOLL_EVENT, timeout);
 				if (nfds == 0) {
-					assert(timeout != -1);
 					WLog_DBG(TAG, "epoll wait timeout.");
 					return 0; //timeout
 				}
-				if (nfds == -1 && errno != EINTR) {
-					WLog_ERR(TAG, "epoll wait error: %s(%d)", strerror(errno), errno);
+				if (nfds == -1) {
 					return -1;
 				}
 				WLog_DBG(TAG, "epoll wait return nfd: %d", nfds);
 				for (i = 0; i < nfds; ++i) {
-					void* io_ctx = NULL;
+					void* pdata = NULL;
 					pe = events + i;
 					WLog_INFO(TAG, "loop events[%d]::fd(0x%08X) OUT:%d, IN:%d", i, pe->data.fd,
 						pe->events & EPOLLOUT ? 1 : 0, pe->events & EPOLLIN ? 1 : 0);
 					assert(pe->events & EPOLLIN);
-					ret = evtpoll_deal(endpt->ep, pe, &io_ctx);
-					if (ret > 0) {
-						assert(io_ctx);
-#if 1 //DEBUG
-						uint32_t io_type = ioqueue_overlapped_get_type(io_ctx);
-						assert(io_type == EV_BUS_ENDPOINT);
-#endif
+					ret = evtpoll_deal(endpt->ep, pe, &pdata, 0);
+					if (!ret) {
+						assert(pdata);
+						if(bus_endpoint__data_is_handle(endpt, pdata))
 						{
 							rc = read_left_pkt(endpt, &pkt);
 							if (rc < 0)
@@ -770,6 +796,21 @@ static int bus_endpt_poll_internal(bus_endpt_t* endpt, int* result, int timeout)
 								}
 							}
 						}
+						else if (pdata == &endpt->msg_fd) {
+							uint64_t rdata;
+							WLog_DBG(TAG, "message arrive.");
+							do 
+							{
+								ret = read(endpt->msg_fd, &rdata, sizeof rdata);
+							} while (ret < 0 && errno == EINTR);
+
+							if (ret < 0) {
+								WLog_ERR(TAG, "read msg fd failed: %s", strerror(errno));
+								abort();
+							}
+							*result = BUS_RESULT_MSG;
+							return 1;
+						}
 					}
 				}
 			}
@@ -799,22 +840,45 @@ static int recv_until(bus_endpt_t* endpt, int type, iobuffer_t** p_ansbuf)
 	int rc;
 	iobuffer_t* ans_pkt = NULL;
 	int ans_type;
-
+	WLog_DBG(TAG, "==>endpt(%d): recv until type: 0x%08X", endpt->epid, type);
 	for (;;) {
 		if (!endpt->rx_pending) {
 			rc = start_read_pkt(endpt, &ans_pkt);
 			if (rc < 0) {
-				DWORD dwError = WSAGetLastError();
 				break;
 			}
+		} else {
+			WLog_DBG(TAG, "endpt(%d) is pending", endpt->epid);
 		}
 		if (!ans_pkt) {
-			DWORD ret = WaitForSingleObject(endpt->rx_evt, INFINITE);
-			if (ret != WAIT_OBJECT_0)
+			int nfds;
+			int ret;
+			int i, flag = 0;
+			struct epoll_event events[MAX_EPOLL_EVENT];
+			struct epoll_event* pe;
+			nfds = evtpoll_wait(endpt->ep, events, MAX_EPOLL_EVENT, -1);
+			if (nfds == 0) {
+				continue;
+			}
+			if (nfds == -1) {
 				return -1;
-			rc = read_left_pkt(endpt, &ans_pkt);
-			if (rc < 0)
-				break;
+			}
+			WLog_DBG(TAG, "epoll wait return nfd: %d", nfds);
+			for (i = 0; i < nfds; ++i) {
+				void* pdata = NULL;
+				pe = events + i;
+				ret = evtpoll_deal(endpt->ep, pe, &pdata, 0);
+				if (!ret && bus_endpoint__data_is_handle(endpt, pdata)) {
+					flag = 1;
+					break;
+				}
+			}
+			if (flag) {
+				rc = read_left_pkt(endpt, &ans_pkt);
+				if (rc < 0) {
+					break;
+				}
+			}
 		}
 		if (ans_pkt) {
 			int read_state = iobuffer_get_read_state(ans_pkt);
@@ -836,7 +900,6 @@ static int recv_until(bus_endpt_t* endpt, int type, iobuffer_t** p_ansbuf)
 			}
 		}
 	}
-
 	return rc;
 }
 
@@ -868,14 +931,12 @@ static int recv_until_state(bus_endpt_t* endpt, int* p_state)
 	rc = recv_until(endpt, BUS_TYPE_ENDPT_GET_STATE, &ans_pkt);
 	if (rc < 0)
 		return rc;
-
 	iobuffer_read(ans_pkt, IOBUF_T_I4, &type, 0);
 	iobuffer_read(ans_pkt, IOBUF_T_I4, &epid, 0);
 	iobuffer_read(ans_pkt, IOBUF_T_I4, &state, 0);
 	iobuffer_destroy(ans_pkt);
-
+	WLog_DBG(TAG, "state address: 0x%08X", p_state);
 	*p_state = state;
-
 	return rc;
 }
 
@@ -925,7 +986,7 @@ TOOLKIT_API int bus_endpt_send_info(bus_endpt_t* endpt, int epid, int type, iobu
 	int write_state;
 
 	assert(endpt);
-
+	WLog_DBG(TAG, "endpt(%d) send info: %d, %d.", endpt->epid, epid, type);
 	read_state = iobuffer_get_read_state(pkt);
 	write_state = iobuffer_get_write_state(pkt);
 
@@ -1093,9 +1154,10 @@ TOOLKIT_API int bus_endpt_get_state(bus_endpt_t* endpt, int epid, int* p_state)
 	iobuffer_write(buf, IOBUF_T_I4, &v, 0);
 
 	rc = send_pkt_raw(endpt, buf);
-	if (rc < 0)
+	if (rc < 0) {
+		WLog_ERR(TAG, "send pkt raw failed.");
 		goto on_error;
-
+	}
 	rc = recv_until_state(endpt, p_state);
 
 on_error:
@@ -1106,18 +1168,19 @@ on_error:
 	return rc;
 }
 
-TOOLKIT_API int bus_endpt_post_msg(bus_endpt_t* endpt, int msg, int nparam, int params[])
+TOOLKIT_API int bus_endpt_post_msg(bus_endpt_t* endpt, int msg, int nparam, param_size_t params[])
 {
 	msg_t* e;
-
+	int rc;
+	uint64_t wdata = 0;
 	assert(endpt);
-
+	WLog_DBG(TAG, "==> endpt(%d) post msg: %d", endpt->epid, msg);
 	e = MALLOC_T(msg_t);
 	e->type = msg;
 	e->nparam = nparam;
 	if (nparam) {
-		e->params = (int*)malloc(sizeof(int) * nparam);
-		memcpy(e->params, params, sizeof(int) * nparam);
+		e->params = (param_size_t*)malloc(sizeof(param_size_t) * nparam);
+		memcpy(e->params, params, sizeof(param_size_t) * nparam);
 	}
 	else {
 		e->params = NULL;
@@ -1126,40 +1189,55 @@ TOOLKIT_API int bus_endpt_post_msg(bus_endpt_t* endpt, int msg, int nparam, int
 	spinlock_enter(&endpt->msg_lock, -1);
 	list_add_tail(&e->entry, &endpt->msg_list);
 	spinlock_leave(&endpt->msg_lock);
-	ReleaseSemaphore(endpt->msg_sem, 1, NULL);
-
+	wdata = 1;
+	rc = write(endpt->msg_fd, &wdata, sizeof wdata);
+	if (rc == -1) {
+		WLog_ERR(TAG, "write to eventfd failed: %s(%d)", strerror(errno), errno);
+		return -1;
+	}
 	return 0;
 }
 
-TOOLKIT_API int bus_endpt_send_msg(bus_endpt_t* endpt, int msg, int nparam, int params[])
+TOOLKIT_API int bus_endpt_send_msg(bus_endpt_t* endpt, int msg, int nparam, param_size_t params[])
 {
 	msg_t e;
-
+	int rc;
+	uint64_t wdata = 0;
 	assert(endpt);
-
+	WLog_DBG(TAG, "==> endpt(%d) send msg: %d, %d", endpt->epid, msg, nparam);
 	e.type = msg;
 	e.nparam = nparam;
 	if (nparam) {
-		e.params = (int*)malloc(sizeof(int) * nparam);
-		memcpy(e.params, params, sizeof(int) * nparam);
+		e.params = (param_size_t*)malloc(sizeof(param_size_t) * nparam);
+		memcpy(e.params, params, sizeof(param_size_t) * nparam);
 	}
 	else {
 		e.params = NULL;
 	}
 	e.evt_result = 0;
 	e.evt = CreateEventA(NULL, TRUE, FALSE, NULL);
+	assert(e.evt != NULL);
 	spinlock_enter(&endpt->msg_lock, -1);
 	list_add_tail(&e.entry, &endpt->msg_list);
 	spinlock_leave(&endpt->msg_lock);
-	ReleaseSemaphore(endpt->msg_sem, 1, NULL);
-
+	wdata = 1;
+	rc = write(endpt->msg_fd, &wdata, sizeof wdata);
+	if (rc == -1) {
+		WLog_ERR(TAG, "write to eventfd failed: %s(%d)", strerror(errno), errno);
+		CloseHandle(e.evt);
+		if (nparam) {
+			free(e.params);
+		}
+		WLog_DBG(TAG, "<== error endpt(%d) send msg: %d", endpt->epid, msg);
+		return -1;
+	}
 	WaitForSingleObject(e.evt, INFINITE);
 	CloseHandle(e.evt);
 
 	if (nparam) {
 		free(e.params);
 	}
-
+	WLog_DBG(TAG, "<== endpt(%d) send msg: %d, res: %d", endpt->epid, msg, e.evt_result);
 	return e.evt_result;
 }
 
@@ -1219,17 +1297,23 @@ TOOLKIT_API int bus_endpt_poll(bus_endpt_t* endpt, int timeout)
 					msg->params, 
 					msg->evt ? &msg->evt_result : NULL,
 					endpt->callback.user_data);
-				if (msg->evt)
+				if (msg->evt) {
+					WLog_DBG(TAG, "after recv msg, send finished evt.");
 					SetEvent(msg->evt);
-				else
+				}
+				else {
+					WLog_DBG(TAG, "free msg");
 					free_msg(msg);
+				}
 			}
 			else {
 				if (msg->evt) {
 					msg->evt_result = -1;
+					WLog_DBG(TAG, "after on msg failed, send finished evt.");
 					SetEvent(msg->evt);
 				}
 				else {
+					WLog_DBG(TAG, "free msg");
 					free_msg(msg);
 				}
 			}

+ 4 - 3
libtoolkit/bus.h

@@ -12,6 +12,7 @@
 #pragma once
 
 #include "config.h"
+#include "memutil.h"
 
 #ifdef __cplusplus
 extern "C" {
@@ -38,7 +39,7 @@ typedef struct bus_endpt_callback {
 	void (*on_pkt)(bus_endpt_t *endpt, int epid, int type, iobuffer_t **p_pkt, void *user_data);
 	void (*on_inf)(bus_endpt_t *endpt, int epid, int type, iobuffer_t **p_pkt, void *user_data);
 	void (*on_evt)(bus_endpt_t *endpt, int epid, int type, iobuffer_t **p_pkt, void *user_data);
-	void (*on_msg)(bus_endpt_t *endpt, int msg, int nparam, int params[], int *result, void *user_data);
+	void (*on_msg)(bus_endpt_t *endpt, int msg, int nparam, param_size_t params[], int *result, void *user_data);
 	void (*on_sys)(bus_endpt_t *endpt, int epid, int state, void *user_data);
 	void *user_data;
 }bus_endpt_callback;
@@ -51,8 +52,8 @@ TOOLKIT_API int bus_endpt_send_pkt(bus_endpt_t *endpt, int epid, int type, iobuf
 TOOLKIT_API int bus_endpt_send_info(bus_endpt_t *endpt, int epid, int type, iobuffer_t *pkt);
 TOOLKIT_API int bus_endpt_bcast_evt(bus_endpt_t *endpt, int type, iobuffer_t *pkt);
 TOOLKIT_API int bus_endpt_get_state(bus_endpt_t *endpt, int epid, int *state);
-TOOLKIT_API int bus_endpt_post_msg(bus_endpt_t *endpt, int msg, int nparam, int params[]);
-TOOLKIT_API int bus_endpt_send_msg(bus_endpt_t *endpt, int msg, int nparam, int params[]);
+TOOLKIT_API int bus_endpt_post_msg(bus_endpt_t *endpt, int msg, int nparam, param_size_t params[]);
+TOOLKIT_API int bus_endpt_send_msg(bus_endpt_t *endpt, int msg, int nparam, param_size_t params[]);
 TOOLKIT_API int bus_endpt_get_epid(bus_endpt_t *endpt);
 TOOLKIT_API const char *bus_endpt_get_url(bus_endpt_t *endpt);
 TOOLKIT_API int bus_endpt_set_quit_flag(bus_endpt_t *endpt);

+ 2 - 2
libtoolkit/bus_daemon-unix.c

@@ -242,9 +242,9 @@ static endpt_session_t *find_session(bus_daemon_t *daemon, int epid)
 	daemon_unlock(daemon);
 #ifndef NDEBUG
 	if (!session)
-		WLog_ERR(TAG, "find session failed.");
+		WLog_ERR(TAG, "find session(%d) failed.", epid);
 	else
-		WLog_DBG(TAG, "find session.");
+		WLog_DBG(TAG, "find session(%d) here it is.", epid);
 #endif // !NDEBUG
 	return session;
 }

+ 148 - 97
libtoolkit/evtpoll.c

@@ -6,7 +6,6 @@
 #include "array.h"
 #include "refcnt.h"
 #include "evtpoll.h"
-#include "ioqueue.h"
 #include <winpr/wlog.h>
 #define TAG TOOLKIT_TAG("evtpoll")
 
@@ -14,6 +13,12 @@
 
 #include <unistd.h>
 
+struct event_epoll_data_s {
+	int type;
+	int fd;
+	void* owner;
+};
+
 struct event_epoll_s {
 	int epfd;
 	/*insterest*/
@@ -23,11 +28,11 @@ struct event_epoll_s {
 
 struct evtpoll_interest_entry_s {
 	struct list_head entry;
-	int type;
+	//int type;
 	int events;
 	int pending;
-	void* key;
-	void* io;
+	//void* key;
+	void* data;
 
 	spinlock_t lock;
 	struct evtpoll_interest_s* owner;
@@ -35,7 +40,6 @@ struct evtpoll_interest_entry_s {
 
 struct evtpoll_interest_s {
 	struct list_head node; // for evtpoll_t::interest_list
-
 	struct list_head entry_list; //no used
 
 	int fd;
@@ -60,6 +64,52 @@ static int evtpoll__interest_empty(evtpoll_t* ep)
 	return ret;
 }
 
+event_epoll_data_t* evtpoll_data_new()
+{
+	event_epoll_data_t* data = ZALLOC_T(event_epoll_data_t);
+	return data;
+}
+
+void evtpoll_data_destroy(event_epoll_data_t* epoll_data)
+{
+	if (epoll_data) {
+		free(epoll_data);
+	}
+}
+
+int evtpoll_data_set(event_epoll_data_t* epoll_data, int type, int fd, void* data)
+{
+	assert(epoll_data);
+	epoll_data->fd = fd;
+	epoll_data->type = type;
+	epoll_data->owner = data;
+}
+
+int evtpoll_data_get_fd(const event_epoll_data_t* const epoll_data)
+{
+	assert(epoll_data);
+	return epoll_data->fd;
+}
+
+int evtpoll_data_get_type(const event_epoll_data_t* const epoll_data)
+{
+	assert(epoll_data);
+	return epoll_data->type;
+}
+
+void evtpoll_data_set_data(event_epoll_data_t* epoll_data, void* data)
+{
+	assert(epoll_data);
+	epoll_data->owner = data;
+}
+
+void* evtpoll_data_get_data(const event_epoll_data_t* const epoll_data)
+{
+	assert(epoll_data);
+	return epoll_data->owner;
+}
+
+
 evtpoll_t* evtpoll_create()
 {
 	evtpoll_t* ep = ZALLOC_T(evtpoll_t);
@@ -105,15 +155,15 @@ void evtpoll__interest_entry_set(
 	int type,
 	int pending,
 	void* key,
-	void* io)
+	void* data)
 {
 	assert(entry);
 	assert(entry->owner);
-	entry->type = type;
+	//entry->type = type;
 	entry->events = events;
-	entry->io = io;
+	entry->data = data;
 	entry->pending = pending;
-	entry->key = key;
+	//entry->key = key;
 }
 
 void evtpoll__interest_entry_reset(evtpoll_interest_entry_t* entry) {
@@ -136,10 +186,10 @@ static int evtpoll__interest_entry_is_ready(const evtpoll_interest_entry_t* cons
 	if (entry->pending != 0) {
 		return 0;
 	}
-	if (!entry->io) {
+	if (!entry->data) {
 		return 0;
 	}
-	if (entry->type == 0 || entry->events == 0) {
+	if (entry->events == 0) {
 		return 0;
 	}
 	return 1;
@@ -185,14 +235,14 @@ void evtpoll__interest_free(evtpoll_interest_t* inst)
 }
 
 
-static int evtpoll__ctl(evtpoll_t* ep, int event_mask, int ctrl_mod, int fd, void* data)
+static int evtpoll__ctl(evtpoll_t* ep, int event_mask, int ctrl_mod, int fd, evtpoll_interest_t* inst)
 {
 	int ret = -1;
 	struct epoll_event ee;
 	assert(ep);
 	ee.events = event_mask;
-	if (data) {
-		ee.data.ptr = data;
+	if (inst) {
+		ee.data.ptr = inst;
 	}
 	else {//data is union type.
 		ee.data.fd = fd;
@@ -202,11 +252,11 @@ static int evtpoll__ctl(evtpoll_t* ep, int event_mask, int ctrl_mod, int fd, voi
 		WLog_ERR(TAG, "epoll ctl failed: %s", strerror(errno));
 	}
 	else {
-		WLog_INFO(TAG, "fd(%d), epoll_ctl: OUT:%d, IN:%d, OTHERS:0x%08X, ADD:%d, MOD:%d, data:0x%X ret:%d",
+		WLog_INFO(TAG, "epoll_ctl(%d): OUT:%d, IN:%d, OTHERS:0x%08X, ADD:%d, MOD:%d, data:0x%X ret:%d",
 			fd,
 			(event_mask & EPOLLOUT) > 0 ? 1 : 0, (event_mask & EPOLLIN) > 0 ? 1 : 0,
 			(event_mask & ~(EPOLLOUT | EPOLLIN)),
-			ctrl_mod == EPOLL_CTL_ADD ? 1 : 0, ctrl_mod == EPOLL_CTL_MOD ? 1 : 0, data, ret);
+			ctrl_mod == EPOLL_CTL_ADD ? 1 : 0, ctrl_mod == EPOLL_CTL_MOD ? 1 : 0, inst, ret);
 	}
 	return ret;
 }
@@ -374,6 +424,9 @@ void evtpoll_detach(evtpoll_t* ep, int interest_fd)
 	evtpoll_interest_t* exist;
 	exist = evtpoll__find_interest(ep, interest_fd);
 	if (exist) {
+		if (exist->events != 0) {
+			evtpoll_unsubscribe(ep, EV_READ_WRITE_WITH_LT_PURE, exist->fd, 0);
+		}
 		evtpoll__detach(exist);
 	}
 }
@@ -385,27 +438,26 @@ static int evtpoll__subscribe_precheck(evtpoll_interest_entry_t* entry, int exis
 			WLog_ERR(TAG, "the entry is still pending...");
 			return -1;
 		}
-		else if ((uintptr_t)entry->io == (uintptr_t)data) {
+		else if ((uintptr_t)entry->data == (uintptr_t)data) {
 			WLog_WARN(TAG, "entry already exists and seems same, return previously.");
 			return 0;
 		}
-		WLog_DBG(TAG, "io:0x%08X vs data:0x%08X", entry->io, data);
-	}
-	if (data != NULL) {
-		const int io_type = ioqueue_overlapped_get_type(data);
-		WLog_INFO(TAG, "ioqueue ov type for subscribing: 0x%X", io_type);
-	}
-	else if (!entry->io) {
-		WLog_ERR(TAG, "no io data for event!");
-		return -1;
-	}
+		WLog_DBG(TAG, "data:0x%08X vs data:0x%08X", entry->data, data);
+	}
+	//if (data != NULL) {
+	//	const int io_type = ioqueue_overlapped_get_type(data);
+	//	WLog_INFO(TAG, "ioqueue ov type for subscribing: 0x%X", io_type);
+	//}
+	//else if (!entry->data) {
+	//	WLog_ERR(TAG, "no data data for event!");
+	//	return -1;
+	//}
 	return 1;
 }
 
 static int evtpoll__subscribe_read(evtpoll_t* ep, evtpoll_interest_t* inst, void* data)
 {
 	int ret;
-	int io_type = 0;
 	int has = 0;
 	int op = EPOLL_CTL_ADD;
 	int events = EPOLLIN;
@@ -423,26 +475,24 @@ static int evtpoll__subscribe_read(evtpoll_t* ep, evtpoll_interest_t* inst, void
 	}
 
 	ret = 0;
-	io_type = ioqueue_overlapped_get_type(data);
 
 	if (has) {
-		evtpoll__interest_entry_set(entry, EV_READ, io_type, 0, NULL, data);
+		evtpoll__interest_entry_set(entry, EV_READ, 0, 0, NULL, data);
 		WLog_INFO(TAG, "fd(%d): read entry existed, only update the ov and returned.", inst->fd);
+		return 1;
 	}
-	else {
 
-		if (inst->events & EV_WRITE) {
-			op = EPOLL_CTL_MOD;
-			events |= EPOLLOUT;
-		}
-		ret = evtpoll__ctl(ep, events, op, inst->fd, inst);
-		if (ret) {
-			WLog_ERR(TAG, "fd(%d): set read register failed !", inst->fd);
-		}
-		else {
-			inst->events |= EV_READ;
-			evtpoll__interest_entry_set(entry, EV_READ, io_type, 0, NULL, data);
-		}
+	if (inst->events & EV_WRITE) {
+		op = EPOLL_CTL_MOD;
+		events |= EPOLLOUT;
+	}
+	ret = evtpoll__ctl(ep, events, op, inst->fd, inst);
+	if (ret) {
+		WLog_ERR(TAG, "fd(%d): set read register failed !", inst->fd);
+	}
+	else {
+		inst->events |= EV_READ;
+		evtpoll__interest_entry_set(entry, EV_READ, 0, 0, NULL, data);
 	}
 
 	return ret;
@@ -478,7 +528,7 @@ static int evtpoll__unsubscribe_read(evtpoll_t* ep, evtpoll_interest_t* inst)
 static int evtpoll__subscribe_write(evtpoll_t* ep, evtpoll_interest_t* inst, void* data)
 {
 	int ret;
-	int io_type = -1;
+	//int io_type = -1;
 	int has = 0;
 	int op = EPOLL_CTL_ADD;
 	int events = EPOLLOUT;
@@ -496,26 +546,25 @@ static int evtpoll__subscribe_write(evtpoll_t* ep, evtpoll_interest_t* inst, voi
 	}
 
 	ret = 0;
-	io_type = ioqueue_overlapped_get_type(data);
+	//io_type = ioqueue_overlapped_get_type(data);
 
 	if (has) {
-		evtpoll__interest_entry_set(entry, EV_WRITE, io_type, 0, NULL, data);
+		evtpoll__interest_entry_set(entry, EV_WRITE, 0, 0, NULL, data);
 		WLog_INFO(TAG, "fd(%d): write entry existed, only update the ov and returned.", inst->fd);
+		return 1;
 	}
-	else {
 
-		if (inst->events & EV_READ) {
-			op = EPOLL_CTL_MOD;
-			events |= EPOLLIN;
-		}
-		ret = evtpoll__ctl(ep, events, op, inst->fd, inst);
-		if (ret) {
-			WLog_ERR(TAG, "fd(%d): set write register failed !", inst->fd);
-		}
-		else {
-			inst->events |= EV_WRITE;
-			evtpoll__interest_entry_set(entry, EV_WRITE, io_type, 0, NULL, data);
-		}
+	if (inst->events & EV_READ) {
+		op = EPOLL_CTL_MOD;
+		events |= EPOLLIN;
+	}
+	ret = evtpoll__ctl(ep, events, op, inst->fd, inst);
+	if (ret) {
+		WLog_ERR(TAG, "fd(%d): set write register failed !", inst->fd);
+	}
+	else {
+		inst->events |= EV_WRITE;
+		evtpoll__interest_entry_set(entry, EV_WRITE, 0, 0, NULL, data);
 	}
 
 	return ret;
@@ -526,6 +575,7 @@ static int evtpoll__unsubscribe_write(evtpoll_t* ep, evtpoll_interest_t* inst)
 	int ret;
 	int events = EPOLLOUT;
 	int op = EPOLL_CTL_DEL;
+
 	if (!(inst->events & EV_WRITE)) {
 		WLog_WARN(TAG, "write event is not existed");
 		return 0;
@@ -540,8 +590,7 @@ static int evtpoll__unsubscribe_write(evtpoll_t* ep, evtpoll_interest_t* inst)
 		WLog_ERR(TAG, "un read register failed !");
 	}
 	else {
-		evtpoll_interest_entry_t* entry =
-			ARRAY_IDX(inst->entries, EV_INTEREST_ENTRY_OUT_IDX, evtpoll_interest_entry_t*);
+		evtpoll_interest_entry_t* entry = ARRAY_IDX(inst->entries, EV_INTEREST_ENTRY_OUT_IDX, evtpoll_interest_entry_t*);
 		evtpoll__interest_entry_reset(entry);
 		inst->events &= ~(EV_WRITE);
 	}
@@ -597,7 +646,9 @@ int evtpoll_unsubscribe(evtpoll_t* ep, int event_mask, int fd, int only_reset)
 		WLog_WARN(TAG, "event type exclude read or write is not supported now.");
 		return  -1;
 	}
+
 	ret = 0;
+	
 	if (!ret && (event_mask & EV_READ)) {
 		if (only_reset) {
 			evtpoll_interest_entry_t* entry =
@@ -622,30 +673,27 @@ int evtpoll_unsubscribe(evtpoll_t* ep, int event_mask, int fd, int only_reset)
 	return ret;
 }
 
-static int evtpoll__deal_inner(evtpoll_interest_t* inst, int idx, void** p_io)
+static int evtpoll__deal_inner(evtpoll_interest_t* inst, int idx, void** data, int reset)
 {
-	int ret = -1;
 	evtpoll_interest_entry_t* entry = ARRAY_IDX(inst->entries, idx, evtpoll_interest_entry_t*);
 	assert(entry);
 	if (evtpoll__interest_entry_is_ready(entry)) {
 		WLog_DBG(TAG, "interest entry is ready.");
-		entry->pending = 1;
-		*p_io = entry->io;
-		evtpoll__interest_entry_reset(entry);
-		ret = 1;
-	}
-	else {
-		WLog_DBG(TAG, "interest entry is not ready.");
+		*data = entry->data;
+		if (reset) {
+			evtpoll__interest_entry_reset(entry);
+		}
+		return 0;
 	}
-	return ret;
+	WLog_WARN(TAG, "interest entry is not ready.");
+	return -1;
 }
 
-int evtpoll_deal(evtpoll_t* ep, struct epoll_event* event, void** p_io)
+int evtpoll_deal(evtpoll_t* ep, struct epoll_event* event, void** data, int cancel)
 {
 	int ret = 0;
-	evtpoll_interest_t* inst = NULL;
-	evtpoll_interest_t* exist = NULL;
-	evtpoll_interest_entry_t* entry = NULL;
+	evtpoll_interest_t* inst;
+	evtpoll_interest_t* exist;
 
 	if (!(event->events & (EPOLLIN | EPOLLOUT))) {
 		WLog_DBG(TAG, "no event mask.");
@@ -662,48 +710,51 @@ int evtpoll_deal(evtpoll_t* ep, struct epoll_event* event, void** p_io)
 #endif
 
 	if (event->events & EPOLLIN) { // read
-		ret = evtpoll__deal_inner(inst, EV_INTEREST_ENTRY_IN_IDX, p_io);
+		ret = evtpoll__deal_inner(inst, EV_INTEREST_ENTRY_IN_IDX, data, cancel);
+		if (!ret && cancel) {
+			evtpoll__unsubscribe_read(ep, exist);
+		}
 		event->events &= ~(EPOLLIN);
 		return ret;
 	}
 	if (event->events & EPOLLOUT) { // write
-		ret = evtpoll__deal_inner(inst, EV_INTEREST_ENTRY_OUT_IDX, p_io);
+		ret = evtpoll__deal_inner(inst, EV_INTEREST_ENTRY_OUT_IDX, data, cancel);
+		if (!ret && cancel) {
+			evtpoll__unsubscribe_write(ep, exist);
+		}
 		event->events &= ~(EPOLLOUT);
 		return ret;
 	}
 	return -1;
 }
 
-int evtpoll_loop(evtpoll_t* ep, int timeout)
+int evtpoll_wait(evtpoll_t* ep, struct epoll_event event_array[], int event_array_size, int timeout)
 {
 	int nfds;
-	int ret = -1;
-	int i;
-	struct epoll_event events[MAX_EPOLL_EVENT];
-	nfds = epoll_wait(ep->epfd, events, MAX_EPOLL_EVENT, timeout);
+
+	do 
+	{
+
+		nfds = epoll_wait(ep->epfd, event_array, event_array_size, timeout);
+
+	} while (nfds == -1 && errno == EINTR);
+
 	if (nfds == 0) {
 		assert(timeout != -1);
-		return 0; //timeout
+		return 0;
 	}
 	if (nfds == -1) {
 		WLog_ERR(TAG, "epoll wait error: %s(%d)", strerror(errno), errno);
 		return -1;
 	}
 
-	for (i = 0; i < nfds; ++i) {
-		ioqueue_overlapped_t* io_ctx = NULL;
-		WLog_INFO(TAG, "loop events[%d] OUT:%d, IN:%d", i, events[i].events & EPOLLOUT ? 1 : 0, events[i].events & EPOLLIN ? 1 : 0);
-		ret = evtpoll_deal(ep, &events[i], &io_ctx);
-		if (ret > 0) {
-			assert(io_ctx);
-			assert(events[i].events & EPOLLIN);
-			uint32_t io_type = ioqueue_overlapped_get_mask(io_ctx);
-			if (io_type == EV_BUS_ENDPOINT) {
-				WLog_DBG(TAG, "set endpoint event!");
-				SetEvent(((OVERLAPPED*)io_ctx)->hEvent);
-			}
-		}
-	}
+	return nfds;
+}
+
+int evtpoll_loop(evtpoll_t* ep, int timeout)
+{
+	int nfds;
+	int ret = -1;
 	return ret;
 }
 

+ 19 - 2
libtoolkit/evtpoll.h

@@ -23,7 +23,7 @@ level-triggered(LT): default
 #define EV_READ_WRITE_WITH_LT_FULL (EPOLLOUT | EPOLLIN | EPOLLERR | EPOLLHUP)
 #define EV_READ_WRITE_WITH_LT_PURE (EPOLLOUT | EPOLLIN)
 
-#define DEFAULT_INTEREST_OP_COUNT		5
+#define DEFAULT_INTEREST_OP_COUNT		3
 #define EV_INTEREST_ENTRY_IN_IDX             0
 #define EV_INTEREST_ENTRY_OUT_IDX          1
 #define EV_INTEREST_ENTRY_ERR_IDX           2
@@ -34,15 +34,32 @@ level-triggered(LT): default
 extern "C" {
 #endif
 
+	typedef struct event_epoll_data_s event_epoll_data_t;
 	typedef struct event_epoll_s evtpoll_t;
 
+	event_epoll_data_t* evtpoll_data_new();
+
+	void evtpoll_data_destroy(event_epoll_data_t* epoll_data);
+
+	int evtpoll_data_set(event_epoll_data_t* epoll_data, int type, int fd, void* data);
+
+	int evtpoll_data_get_fd(const event_epoll_data_t* const epoll_data);
+
+	int evtpoll_data_get_type(const event_epoll_data_t* const epoll_data);
+
+	void evtpoll_data_set_data(event_epoll_data_t* epoll_data, void* data);
+
+	void* evtpoll_data_get_data(const event_epoll_data_t* const epoll_data);
+
 	evtpoll_t* evtpoll_create();
 
 	void evtpoll_destroy(evtpoll_t* evt_poll);
 
 	int evtpoll_get_epoll_fd(const evtpoll_t* const evt_poll);
 
-	int evtpoll_deal(evtpoll_t* ep, struct epoll_event* event, void** p_io);
+	int evtpoll_deal(evtpoll_t* ep, struct epoll_event* event, void** data, int cancel);
+
+	int evtpoll_wait(evtpoll_t* ep, struct epoll_event event_array[], int event_array_size, int timeout);
 	
 	int evtpoll_attach(evtpoll_t* ep, int interest_fd);
 

+ 14 - 0
libtoolkit/iobuffer.h

@@ -30,6 +30,20 @@ extern "C" {
 #define IOBUF_T_WSTR 6 /* wchar_t */
 #define IOBUF_T_7BIT	   7
 
+
+#ifdef _WIN32
+#define IOBUF_T_PARAM IOBUF_T_I4
+#else
+#if defined(__x86_64__)
+#define IOBUF_T_PARAM IOBUF_T_I8
+#elif defined(__i386__)
+#define IOBUF_T_PARAM IOBUF_T_I4
+#else
+#define IOBUF_T_PARAM IOBUF_T_I4
+#endif
+#endif //_WIN32
+
+
 typedef struct iobuffer_t iobuffer_t;
 
 TOOLKIT_API iobuffer_t *iobuffer_create(int head_capacity, int capacity);

+ 9 - 18
libtoolkit/ioqueue-unix.c

@@ -430,7 +430,7 @@ static void dispatch_pipe_acceptor(int err,
 	}
 }
 
-static int pre_dispatch_network(BOOL* ret, DWORD* dwBytesTransfer, ioqueue_overlapped_t* io_ctx, int epfd)
+static int pre_dispatch_network(BOOL* ret, DWORD* dwBytesTransfer, ioqueue_overlapped_t* io_ctx)
 {
 	int err = 0;
 	ioqueue_base_overlapped_t* base_ov = (ioqueue_base_overlapped_t*)io_ctx;
@@ -928,10 +928,8 @@ TOOLKIT_API int ioqueue_poll(ioqueue_t* q, int timeout)
 	ioqueue_t *ioq = (ioqueue_t*)q;
 	int count = 0, t = 0;
 	int nfds = 0, i = 0;
-	static int flag = 0;
 	bus_daemon_t* deamon;
 	struct epoll_event events[MAX_EPOLL_EVENT];
-	struct epoll_event* pe = NULL;
 	deamon = (bus_daemon_t*)ioqueue_get_user_data(ioq);
 	assert(deamon != NULL);
 	/* network and msg, dispatch until no events */
@@ -939,38 +937,31 @@ TOOLKIT_API int ioqueue_poll(ioqueue_t* q, int timeout)
 	{
 		BOOL ret;
 		DWORD dwBytesTransfer = 0;
+		t = 0;
 		//有时会出现惊群的问题!!
-		int epfd = evtpoll_get_epoll_fd(ioq->ep);
-		nfds = epoll_wait(epfd, events, MAX_EPOLL_EVENT, t ? 0 : timeout);
-		if ((nfds < 0 && EINTR == errno) || nfds == 0) {
-			t = 0;
+		nfds = evtpoll_wait(ioq->ep, events, MAX_EPOLL_EVENT, t ? 0 : timeout);
+		if (nfds < 0) { 
+			break; 
 		}
 		for (i = 0; i < nfds; ++i) {
+			struct epoll_event* pe;
 			int n = 1;
 			pe = events + i;
 			WLog_INFO(TAG, "poll events[%d]::fd(0x%08X) OUT:%d, IN:%d", i, pe->data.fd
 				, pe->events & EPOLLOUT ? 1: 0 ,pe->events & EPOLLIN ? 1: 0);
 
-			if (pe->events & EPOLLOUT) {
-				flag = 1;
-			}
-
 			while (n >= 0) {
 				ioqueue_overlapped_t* io_ctx = NULL;
-				n = evtpoll_deal(ioq->ep, pe, &io_ctx);
-				if (n > 0) {
+				n = evtpoll_deal(ioq->ep, pe, &io_ctx, 1);
+				if (!n) {
 					assert(io_ctx);
-					pre_dispatch_network(&ret, &dwBytesTransfer, io_ctx, epfd);
+					pre_dispatch_network(&ret, &dwBytesTransfer, io_ctx);
 					dispatch_network(ret, dwBytesTransfer, io_ctx);
 					t++;
 					count++;
 				}
 			}
 		}
-		if (nfds < 0) {
-			WLog_ERR(TAG, "nfds < 0, error: %s(%d)", strerror(errno), errno);
-			break;
-		}
 	} while (t > 0);
 
 	/* win2k connect event */

+ 13 - 0
libtoolkit/memutil.h

@@ -85,6 +85,19 @@ static __inline __int64 int64_make(int low_part, int high_part)
 
 #define FLAG_BIT(f) f##_BIT
 
+/* for compatility at different platform [3/31/2020 Gifur] */
+#ifdef _WIN32
+typedef int param_size_t;
+#else
+#if defined(__x86_64__)
+typedef intptr_t param_size_t;
+#elif defined(__i386__)
+typedef int param_size_t;
+#else
+typedef intptr_t param_size_t;
+#endif
+#endif //_WIN32
+
 #define SAFE_INVOKE_0(lpfn) if(lpfn) lpfn()
 #define SAFE_INVOKE_1(lpfn, param1) if(lpfn) lpfn(param1)
 #define SAFE_INVOKE_2(lpfn, param1, param2) if(lpfn) lpfn(param1, param2)

+ 12 - 13
spbase/sp_iom.c

@@ -12,8 +12,8 @@
 
 #define POLL_INTERVAL	10
 
-#define IOM_T_EXIT			0
-#define IOM_T_GET_STATE		1
+#define IOM_T_EXIT			        0
+#define IOM_T_GET_STATE		    1
 #define IOM_T_SEND_INFO		2
 
 static int translate_error(int bus_error)
@@ -153,10 +153,10 @@ static void on_pkt(bus_endpt_t *endpt, int epid, int type, iobuffer_t **p_pkt, v
 	iom_pkt_handler_unlock(iom);
 }
 
-static void on_msg(bus_endpt_t *endpt, int msg, int nparam, int params[], int *result, void *user_data)
+static void on_msg(bus_endpt_t *endpt, int msg, int nparam, param_size_t params[], int *result, void *user_data)
 {
 	sp_iom_t *iom = (sp_iom_t *)user_data;
-
+	sp_dbg_debug("==> on msg %d, %d", msg, nparam);
 	if (msg == IOM_T_SEND_INFO) {
 		int pkt_type = params[0];
 		int this_svc_id = params[1];
@@ -172,7 +172,7 @@ static void on_msg(bus_endpt_t *endpt, int msg, int nparam, int params[], int *r
 		iobuffer_write_head(pkt, IOBUF_T_I4, &svc_id, 0);
 		iobuffer_write_head(pkt, IOBUF_T_I4, &this_svc_id, 0);
 		if (result) {
-			int state;
+			int state = 0;
 			ret = bus_endpt_get_state(endpt, epid, &state);
 			if (ret == 0)
 				ret = (state == BUS_STATE_ON) ? 0 : -1;
@@ -189,7 +189,7 @@ static void on_msg(bus_endpt_t *endpt, int msg, int nparam, int params[], int *r
 			bus_endpt_send_info(endpt, epid, pkt_type, pkt);
 			iobuffer_dec_ref(pkt);
 		}
-		//sp_dbg_debug("on_msg send_info end, %d, %d, %d, %d", this_svc_id, epid, svc_id, pkt_id);
+		sp_dbg_debug("on_msg send_info end, %d, %d, %d, %d", this_svc_id, epid, svc_id, pkt_id);
 	} else if (msg == IOM_T_GET_STATE) {
 		int epid = params[0];
 		int *state = (int*)params[1];
@@ -412,13 +412,13 @@ int sp_iom_send(sp_iom_t *iom, int this_svc_id, int epid, int svc_id, int pkt_ty
 	}
 
 	{
-		int params[] = {
+		param_size_t params[] = {
 			pkt_type, 
 			this_svc_id, 
 			epid, 
 			svc_id, 
 			pkt_id,
-			(int)pkt,
+			(param_size_t)pkt,
 		};
 		rc = bus_endpt_send_msg(iom->endpt, IOM_T_SEND_INFO, array_size(params), params);
 	}
@@ -448,13 +448,13 @@ int sp_iom_post(sp_iom_t *iom, int this_svc_id, int epid, int svc_id, int pkt_ty
 	}
 
 	{
-		int params[] = {
+		param_size_t params[] = {
 			pkt_type, 
 			this_svc_id, 
 			epid, 
 			svc_id, 
 			pkt_id,
-			(int)pkt,
+			(param_size_t)pkt,
 		};
 		rc = bus_endpt_post_msg(iom->endpt, IOM_T_SEND_INFO, array_size(params), params);
 	}
@@ -477,11 +477,10 @@ int sp_iom_get_state(sp_iom_t *iom, int epid, int *state)
 {
 	int rc = 0;
 	int rc1;
-	int params[] = {epid, (int)state, (int)&rc};
-
+	param_size_t params[] = {epid, (param_size_t)state, (param_size_t)&rc};
 	// use -1 for get state
 	rc1 = bus_endpt_send_msg(iom->endpt, IOM_T_GET_STATE, array_size(params), params);
-
+	sp_dbg_debug("get epid(%d) state: %d, rc1:%d, rc:%d", epid, *state, rc1, rc);
 	if (rc != 0)
 		rc = Error_Unexpect;
 

+ 1 - 0
spbase/sp_mod.c

@@ -1630,6 +1630,7 @@ static int load_module(sp_mod_mgr_t *mgr, sp_mod_t *mod, int trigger_entity_id)
 		for (i = 0; i < tries; ++i) {
 			int state = BUS_STATE_OFF;
 			rc = sp_svc_get_state(mgr->shell_svc, mod->cfg->idx, &state);
+			sp_dbg_debug("cfg::idx: %d, state: %d", mod->cfg->idx, state);
 			if (rc == 0) {
 				if (state != BUS_STATE_ON) {
 					DWORD dwRet;

+ 0 - 4
winpr/libwinpr/synch/test/TestSynchSemaphore.c

@@ -5,16 +5,12 @@
 int TestSynchSemaphore(int argc, char* argv[])
 {
 	HANDLE semaphore;
-
 	semaphore = CreateSemaphore(NULL, 0, 1, NULL);
-
 	if (!semaphore)
 	{
 		printf("CreateSemaphore failure\n");
 		return -1;
 	}
-
 	CloseHandle(semaphore);
-
 	return 0;
 }

+ 63 - 2
winpr/libwinpr/synch/wait.c

@@ -121,6 +121,14 @@ int _mach_safe_clock_gettime(int clk_id, struct timespec* t)
 
 #endif
 
+static void simple_sleep(int msecs)
+{
+	struct timespec t;
+	t.tv_sec = msecs / 1000;
+	t.tv_nsec = (msecs % 1000) * 1000000;
+	nanosleep(&t, NULL);
+}
+
 static long long ts_difftime(const struct timespec* o, const struct timespec* n)
 {
 	long long oldValue = o->tv_sec * 1000000000LL + o->tv_nsec;
@@ -246,11 +254,11 @@ DWORD WaitForSingleObject(HANDLE hHandle, DWORD dwMilliseconds)
 
 	if (Type == HANDLE_TYPE_PROCESS)
 	{
+#if 0
 		WINPR_PROCESS* process;
 		process = (WINPR_PROCESS*)Object;
 
-		if (process->pid != waitpid(process->pid, &(process->status), 0))
-		{
+		if (process->pid != waitpid(process->pid, &(process->status), 0)) {
 			WLog_ERR(TAG, "waitpid failure [%d] %s", errno, strerror(errno));
 			SetLastError(ERROR_INTERNAL_ERROR);
 			return WAIT_FAILED;
@@ -258,6 +266,59 @@ DWORD WaitForSingleObject(HANDLE hHandle, DWORD dwMilliseconds)
 
 		process->dwExitCode = (DWORD)process->status;
 		return WAIT_OBJECT_0;
+#else
+		WINPR_PROCESS* process;
+		int status, ret = 0;
+		process = (WINPR_PROCESS*)Object;
+
+		if (dwMilliseconds == INFINITE) {
+			if (process->pid != waitpid(process->pid, &status, 0)) {
+				WLog_ERR(TAG, "waitpid failure [%d] %s", errno, strerror(errno));
+				SetLastError(ERROR_INTERNAL_ERROR);
+				return WAIT_FAILED;
+			}
+		}
+		else if (dwMilliseconds == 0) {
+			ret = waitpid(process->pid, &status, WNOHANG);
+			if (ret == 0) {
+				return WAIT_TIMEOUT;
+			}
+		}
+		else {
+			struct timespec timeout;
+			struct timespec timenow;
+			clock_gettime(CLOCK_MONOTONIC, &timeout);
+			ts_add_ms(&timeout, dwMilliseconds);
+			do {
+				ret = waitpid(process->pid, &status, WNOHANG);
+				if (ret == process->pid) {
+					break;
+				}
+				else if (ret < 0) {
+					WLog_ERR(TAG, "waitpid failure [%d] %s", errno, strerror(errno));
+					SetLastError(ERROR_INTERNAL_ERROR);
+					return WAIT_FAILED;
+				}
+				clock_gettime(CLOCK_MONOTONIC, &timenow);
+				if (ts_difftime(&timeout, &timenow) >= 0) {
+					return WAIT_TIMEOUT;
+				}
+				simple_sleep(200);
+			} while (true);
+		}
+		process->status = status;
+		if (WIFEXITED(status)) {
+			//normal exit
+			process->dwExitCode = (DWORD)WEXITSTATUS(status);
+		}
+		else if (WIFSIGNALED(status)) {
+			process->dwExitCode = -WTERMSIG(status); //to comfirm
+		}
+		else {
+			process->dwExitCode = (DWORD)process->status;
+		}
+		return WAIT_OBJECT_0;
+#endif
 	}
 	else if (Type == HANDLE_TYPE_MUTEX)
 	{

+ 95 - 13
winpr/libwinpr/thread/test/TestThreadCreateProcess.c

@@ -10,12 +10,12 @@
 #define TESTENV_A "HELLO=WORLD"
 #define TESTENV_T _T(TESTENV_A)
 
-int TestThreadCreateProcess(int argc, char* argv[])
+
+int TestProcessWaitTimeout(LPTSTR lpCommandLine)
 {
 	BOOL status;
 	DWORD exitCode;
 	LPCTSTR lpApplicationName;
-	LPTSTR lpCommandLine;
 	LPSECURITY_ATTRIBUTES lpProcessAttributes;
 	LPSECURITY_ATTRIBUTES lpThreadAttributes;
 	BOOL bInheritHandles;
@@ -36,11 +36,69 @@ int TestThreadCreateProcess(int argc, char* argv[])
 
 	lpApplicationName = NULL;
 
-#ifdef _WIN32
-	lpCommandLine = _T("cmd /C set");
-#else
-	lpCommandLine = _T("printenv");
+	lpProcessAttributes = NULL;
+	lpThreadAttributes = NULL;
+	bInheritHandles = FALSE;
+	dwCreationFlags = 0;
+#ifdef _UNICODE
+	dwCreationFlags |= CREATE_UNICODE_ENVIRONMENT;
 #endif
+	lpEnvironment = lpszEnvironmentBlock;
+	lpCurrentDirectory = NULL;
+	ZeroMemory(&StartupInfo, sizeof(STARTUPINFO));
+	StartupInfo.cb = sizeof(STARTUPINFO);
+	ZeroMemory(&ProcessInformation, sizeof(PROCESS_INFORMATION));
+
+	status = CreateProcess(lpApplicationName, lpCommandLine, lpProcessAttributes,
+		lpThreadAttributes, bInheritHandles, dwCreationFlags, lpEnvironment,
+		lpCurrentDirectory, &StartupInfo, &ProcessInformation);
+
+	if (!status) {
+		printf("CreateProcess failed. error=%" PRIu32 "\n", GetLastError());
+		return 1;
+	}
+	printf("wait process(%s)\n", lpCommandLine);
+	if (WaitForSingleObject(ProcessInformation.hProcess, 5000) != WAIT_TIMEOUT) {
+		printf("Failed to wait process timeout\n");
+		return 1;
+	}
+
+	exitCode = 0;
+	status = GetExitCodeProcess(ProcessInformation.hProcess, &exitCode);
+
+	printf("GetExitCodeProcess status: %" PRId32 "\n", status);
+	printf("Process exited with code: 0x%08" PRIX32 "\n", exitCode);
+
+	CloseHandle(ProcessInformation.hProcess);
+	CloseHandle(ProcessInformation.hThread);
+	FreeEnvironmentStrings(lpszEnvironmentBlock);
+	return 1;
+}
+
+int TestCreateProcess(LPTSTR lpCommandLine)
+{
+	BOOL status;
+	DWORD exitCode;
+	LPCTSTR lpApplicationName;
+	LPSECURITY_ATTRIBUTES lpProcessAttributes;
+	LPSECURITY_ATTRIBUTES lpThreadAttributes;
+	BOOL bInheritHandles;
+	DWORD dwCreationFlags;
+	LPVOID lpEnvironment;
+	LPCTSTR lpCurrentDirectory;
+	STARTUPINFO StartupInfo;
+	PROCESS_INFORMATION ProcessInformation;
+	LPTCH lpszEnvironmentBlock;
+	HANDLE pipe_read = NULL;
+	HANDLE pipe_write = NULL;
+	char buf[1024];
+	DWORD read_bytes;
+	int ret = 0;
+	SECURITY_ATTRIBUTES saAttr;
+
+	lpszEnvironmentBlock = GetEnvironmentStrings();
+
+	lpApplicationName = NULL;
 
 	lpProcessAttributes = NULL;
 	lpThreadAttributes = NULL;
@@ -64,7 +122,7 @@ int TestThreadCreateProcess(int argc, char* argv[])
 		printf("CreateProcess failed. error=%" PRIu32 "\n", GetLastError());
 		return 1;
 	}
-
+	printf("wait process(%s)\n", lpCommandLine);
 	if (WaitForSingleObject(ProcessInformation.hProcess, 5000) != WAIT_OBJECT_0)
 	{
 		printf("Failed to wait for first process. error=%" PRIu32 "\n", GetLastError());
@@ -150,11 +208,35 @@ int TestThreadCreateProcess(int argc, char* argv[])
 	CloseHandle(ProcessInformation.hProcess);
 	CloseHandle(ProcessInformation.hThread);
 
-#ifndef _WIN32
-	ret = _spawnl(_P_WAIT, lpCommandLine, "1", "2", "3", "4", NULL);
-	if (ret < 0) {
-		printf("_spawnl failed.\n");
-	}
-#endif //_WIN32
+//#ifndef _WIN32
+//	ret = _spawnl(_P_WAIT, lpCommandLine, "1", "2", "3", "4", NULL);
+//	if (ret < 0) {
+//		printf("_spawnl failed.\n");
+//	}
+//#endif //_WIN32
 	return ret;
 }
+
+
+
+
+int TestThreadCreateProcess(int argc, char* argv[])
+{
+	CHAR lpCommandLine[256] = {'\0'};
+#ifdef _WIN32
+	strcpy(lpCommandLine, "cmd /C set");
+#else
+	strcpy(lpCommandLine, "printenv");
+#endif
+
+	if (0 != TestCreateProcess(lpCommandLine)) {
+		return -1;
+	}
+#ifndef _WIN32
+	strcpy(lpCommandLine, "sleep 10s");
+	if (0 != TestProcessWaitTimeout(lpCommandLine)) {
+		return -1;
+	}
+#endif //NOT _WIN32
+	return 0;
+}