Bläddra i källkod

Z991239-351 #comment 改动比较多,整体来说是对spshell退出做了清理

gifur 5 år sedan
förälder
incheckning
7db8127c03

+ 1 - 0
libtoolkit/memutil.h

@@ -170,6 +170,7 @@ TOOLKIT_API wchar_t *toolkit_wcsdup(const wchar_t *s);
 #define MALLOC_T(type) (type*)malloc(sizeof(type))
 #define ZALLOC_T(type) (type*)zalloc(sizeof(type))
 #define CALLOC_T(num, type) (type*)calloc(num, sizeof(type))
+#define REALLOC realloc
 #define FREE(x) do {free(x); x = NULL;} while(0)
 
 #if !defined(_DEBUG) || !defined(DEBUG)

+ 1 - 1
libtoolkit/process_monitor.h

@@ -3,7 +3,7 @@
  * apply status:
  * edit status:
  * build status:
- * description:
+ * description: TODO: use SIGCLD to replace it under unix platform.
  */
 
 #ifndef __PROCESS_MONITOR_H__

+ 108 - 0
libtoolkit/queue.h

@@ -0,0 +1,108 @@
+/* Copyright (c) 2013, Ben Noordhuis <info@bnoordhuis.nl>
+ *
+ * Permission to use, copy, modify, and/or distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#ifndef QUEUE_H_
+#define QUEUE_H_
+
+#include <stddef.h>
+
+typedef void *QUEUE[2];
+
+/* Private macros. */
+#define QUEUE_NEXT(q)       (*(QUEUE **) &((*(q))[0]))
+#define QUEUE_PREV(q)       (*(QUEUE **) &((*(q))[1]))
+#define QUEUE_PREV_NEXT(q)  (QUEUE_NEXT(QUEUE_PREV(q)))
+#define QUEUE_NEXT_PREV(q)  (QUEUE_PREV(QUEUE_NEXT(q)))
+
+/* Public macros. */
+#define QUEUE_DATA(ptr, type, field)                                          \
+  ((type *) ((char *) (ptr) - offsetof(type, field)))
+
+/* Important note: mutating the list while QUEUE_FOREACH is
+ * iterating over its elements results in undefined behavior.
+ */
+#define QUEUE_FOREACH(q, h)                                                   \
+  for ((q) = QUEUE_NEXT(h); (q) != (h); (q) = QUEUE_NEXT(q))
+
+#define QUEUE_EMPTY(q)                                                        \
+  ((const QUEUE *) (q) == (const QUEUE *) QUEUE_NEXT(q))
+
+#define QUEUE_HEAD(q)                                                         \
+  (QUEUE_NEXT(q))
+
+#define QUEUE_INIT(q)                                                         \
+  do {                                                                        \
+    QUEUE_NEXT(q) = (q);                                                      \
+    QUEUE_PREV(q) = (q);                                                      \
+  }                                                                           \
+  while (0)
+
+#define QUEUE_ADD(h, n)                                                       \
+  do {                                                                        \
+    QUEUE_PREV_NEXT(h) = QUEUE_NEXT(n);                                       \
+    QUEUE_NEXT_PREV(n) = QUEUE_PREV(h);                                       \
+    QUEUE_PREV(h) = QUEUE_PREV(n);                                            \
+    QUEUE_PREV_NEXT(h) = (h);                                                 \
+  }                                                                           \
+  while (0)
+
+#define QUEUE_SPLIT(h, q, n)                                                  \
+  do {                                                                        \
+    QUEUE_PREV(n) = QUEUE_PREV(h);                                            \
+    QUEUE_PREV_NEXT(n) = (n);                                                 \
+    QUEUE_NEXT(n) = (q);                                                      \
+    QUEUE_PREV(h) = QUEUE_PREV(q);                                            \
+    QUEUE_PREV_NEXT(h) = (h);                                                 \
+    QUEUE_PREV(q) = (n);                                                      \
+  }                                                                           \
+  while (0)
+
+#define QUEUE_MOVE(h, n)                                                      \
+  do {                                                                        \
+    if (QUEUE_EMPTY(h))                                                       \
+      QUEUE_INIT(n);                                                          \
+    else {                                                                    \
+      QUEUE* q = QUEUE_HEAD(h);                                               \
+      QUEUE_SPLIT(h, q, n);                                                   \
+    }                                                                         \
+  }                                                                           \
+  while (0)
+
+#define QUEUE_INSERT_HEAD(h, q)                                               \
+  do {                                                                        \
+    QUEUE_NEXT(q) = QUEUE_NEXT(h);                                            \
+    QUEUE_PREV(q) = (h);                                                      \
+    QUEUE_NEXT_PREV(q) = (q);                                                 \
+    QUEUE_NEXT(h) = (q);                                                      \
+  }                                                                           \
+  while (0)
+
+#define QUEUE_INSERT_TAIL(h, q)                                               \
+  do {                                                                        \
+    QUEUE_NEXT(q) = (h);                                                      \
+    QUEUE_PREV(q) = QUEUE_PREV(h);                                            \
+    QUEUE_PREV_NEXT(q) = (q);                                                 \
+    QUEUE_PREV(h) = (q);                                                      \
+  }                                                                           \
+  while (0)
+
+#define QUEUE_REMOVE(q)                                                       \
+  do {                                                                        \
+    QUEUE_PREV_NEXT(q) = QUEUE_NEXT(q);                                       \
+    QUEUE_NEXT_PREV(q) = QUEUE_PREV(q);                                       \
+  }                                                                           \
+  while (0)
+
+#endif /* QUEUE_H_ */

+ 0 - 8
libtoolkit/sockutil.c

@@ -7,21 +7,13 @@
 
 TOOLKIT_API int winsock_init()
 {
-#ifdef _WIN32
 	WSADATA wsaData;
 	return WSAStartup(0x0202, &wsaData);
-#else
-	return 0;
-#endif
 }
 
 TOOLKIT_API int winsock_term()
 {
-#ifdef _WIN32
 	return WSACleanup();
-#else
-	return 0;
-#endif
 }
 
 TOOLKIT_API char* get_local_ip(char *buf, int n)

+ 36 - 27
libtoolkit/unix/bus.c

@@ -6,6 +6,7 @@
 #include "list.h"
 #include "bus_internal.h"
 #include "evtpoll.h"
+#include "core.h"
 
 #include <winpr/file.h>
 #include <winpr/pipe.h>
@@ -127,7 +128,11 @@ static SOCKET create_socket_handle(const char* ip, int port)
 	if (fd == INVALID_SOCKET) {
 		return fd;
 	}
-
+	if (make_fd_cloexec(fd, 1)) {
+		WLog_ERR(TAG, "set cloexec for socket fd failed: %d", errno);
+		closesocket(fd);
+		return INVALID_SOCKET;
+	}
 	{
 		BOOL f = TRUE;
 		u_long l = TRUE;
@@ -190,7 +195,7 @@ static int tcp_send_buf(bus_endpt_t* endpt, const char* buf, int n)
 				tv.tv_usec = 0;
 				retval = _select(endpt->sock_handle + 1, NULL, &wfds, NULL, &tv);
 				if (retval == -1) {
-					WLog_ERR(TAG, "select error, errno: %s", strerror(errno));
+					WLog_ERR(TAG, "select error, errno: %d", errno);
 				}
 				else if (retval && FD_ISSET(endpt->sock_handle, &wfds) > 0) {
 					WLog_INFO(TAG, "can write");
@@ -206,7 +211,7 @@ static int tcp_send_buf(bus_endpt_t* endpt, const char* buf, int n)
 				}
 			}
 			else {
-				WLog_ERR(TAG, "_send failed: %s", strerror(errno));
+				WLog_ERR(TAG, "_send failed: %d", errno);
 			}
 		}
 		else {
@@ -327,7 +332,7 @@ static int tcp_recv_buf(bus_endpt_t* endpt, char* buf, DWORD n)
 				tv.tv_usec = 0;
 				retval = _select(endpt->sock_handle + 1, &rfds, NULL, NULL, &tv);
 				if (retval == -1) {
-					WLog_ERR(TAG, "select failed: %s", strerror(errno));
+					WLog_ERR(TAG, "select failed: %d", errno);
 				}
 				else if (retval && FD_ISSET(endpt->sock_handle, &rfds) > 0) {
 					WLog_INFO(TAG, "can read");
@@ -406,8 +411,9 @@ static int start_read_pkt(bus_endpt_t* endpt, iobuffer_t** p_pkt)
 	*p_pkt = NULL;
 	WLog_DBG(TAG, "==>endpt(%d): start_read_pkt", endpt->epid);
 	ResetEvent(endpt->rx_evt);
-	WLog_DBG(TAG, "ResetEvent(endpt->rx_evt) ???");
+	WLog_DBG(TAG, "ResetEvent(endpt->rx_evt).");
 	endpt->rx_pending_pkt_uc_len = 0;
+	endpt->rx_pending_pkt_len = 0;
 
 	if (endpt->type == TYPE_PIPE) {
 		ret = ReadFile(endpt->pipe_handle,
@@ -417,12 +423,9 @@ static int start_read_pkt(bus_endpt_t* endpt, iobuffer_t** p_pkt)
 			&endpt->rx_overlapped);
 	}
 	else if (endpt->type == TYPE_TCP) {
-		do {
-			WLog_DBG(TAG, "_recv ???");
-			ret = _recv(endpt->sock_handle, (char*)&endpt->rx_pending_pkt_len, 4, 0);
-
-		} while (ret < 0 && errno == EINTR);
-		WLog_DBG(TAG, "_recv return: %d, %d", ret, errno);
+		WLog_DBG(TAG, "to _recv.");
+		ret = _recv(endpt->sock_handle, (char*)&endpt->rx_pending_pkt_len, 4, 0);
+		WLog_DBG(TAG, "_recv return: %d, (%d)", ret, errno);
 	}
 	else {
 		WLog_ERR(TAG, "<== endpt(%d): start_read_pkt unkonwn type", endpt->epid);
@@ -432,7 +435,7 @@ static int start_read_pkt(bus_endpt_t* endpt, iobuffer_t** p_pkt)
 		dwBytesTransferred = ret;
 		endpt->rx_pending_pkt_uc_len = ret;
 		if (dwBytesTransferred == 0) {
-			WLog_ERR(TAG, "<== endpt(%d): start_read_pkt dwBytesTransferred is error.", endpt->epid);
+			WLog_ERR(TAG, "<== endpt(%d): start_read_pkt peer socket close.", endpt->epid);
 			return -1;
 		}
 		if (dwBytesTransferred < 4) {
@@ -483,19 +486,17 @@ static int read_left_pkt(bus_endpt_t* endpt, iobuffer_t** p_pkt)
 		ret = GetOverlappedResult(endpt->pipe_handle, &endpt->rx_overlapped, &dwBytesTransferred, TRUE);
 	}
 	else if (endpt->type == TYPE_TCP) {
-		do 
-		{
-			ret = _recv(endpt->sock_handle,
-				(char*)&endpt->rx_pending_pkt_len + endpt->rx_pending_pkt_uc_len,
-				4 - endpt->rx_pending_pkt_uc_len, 0);
-		} while (ret == -1 && errno == EINTR);
+		ret = _recv(endpt->sock_handle,
+			(char*)&endpt->rx_pending_pkt_len + endpt->rx_pending_pkt_uc_len,
+			4 - endpt->rx_pending_pkt_uc_len, 0);
 	}
 	if (ret < 0) {
-		WLog_ERR(TAG, "<== fd(%d): read left pkt failed: ret %d, err: %s", endpt->sock_handle, ret, strerror(errno));
+		WLog_ERR(TAG, "<== fd(%d): read left pkt failed: ret %d, err: %d", endpt->sock_handle, ret, (errno));
 		return -1;
 	}
 	else if(ret == 0) {
 		WLog_WARN(TAG, "<== fd(%d): peer socket close.", endpt->sock_handle);
+		return -1;
 	}
 
 	if (dwBytesTransferred < 4) {
@@ -594,7 +595,8 @@ TOOLKIT_API int bus_endpt_create(const char* url, int epid, const bus_endpt_call
 	endpt->rx_buf_queue = iobuffer_queue_create();
 	endpt->msg_fd = eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE);
 	if (endpt->msg_fd == -1) {
-		WLog_ERR(TAG, "create event fd failed: %s(%d)", strerror(errno), errno);
+		WLog_ERR(TAG, "create event fd failed: %d", errno);
+		endpt->msg_fd = 0;
 		goto on_error;
 	}
 
@@ -720,13 +722,17 @@ on_error:
 	if (ans_buf)
 		iobuffer_destroy(ans_buf);
 	if (endpt->type == TYPE_TCP) {
+		evtpoll_detach(endpt->ep, endpt->sock_handle);
 		closesocket(endpt->sock_handle);
 	}
 	else if (endpt->type == TYPE_PIPE) {
 		CloseHandle(endpt->pipe_handle);
 	}
-	if (endpt->msg_fd)
+
+	if (endpt->msg_fd) {
+		evtpoll_detach(endpt->ep, endpt->msg_fd);
 		close(endpt->msg_fd);
+	}
 	if (endpt->tx_evt)
 		CloseHandle(endpt->tx_evt);
 	if (endpt->rx_evt)
@@ -826,7 +832,7 @@ static int bus_endpt_poll_internal(bus_endpt_t* endpt, int* result, int timeout)
 							} while (ret < 0 && errno == EINTR);
 
 							if (ret < 0) {
-								WLog_ERR(TAG, "read msg fd failed: %s", strerror(errno));
+								WLog_ERR(TAG, "read msg fd failed: %d", (errno));
 								abort();
 							}
 							*result = BUS_RESULT_MSG;
@@ -1211,9 +1217,11 @@ TOOLKIT_API int bus_endpt_post_msg(bus_endpt_t* endpt, int msg, int nparam, para
 	list_add_tail(&e->entry, &endpt->msg_list);
 	spinlock_leave(&endpt->msg_lock);
 	wdata = 1;
-	rc = write(endpt->msg_fd, &wdata, sizeof wdata);
+	do {
+		rc = write(endpt->msg_fd, &wdata, sizeof wdata);
+	} while (rc < 0 && rc == EINTR);
 	if (rc == -1) {
-		WLog_ERR(TAG, "write to eventfd failed: %s(%d)", strerror(errno), errno);
+		WLog_ERR(TAG, "write to eventfd failed: %d", errno);
 		return -1;
 	}
 	return 0;
@@ -1242,9 +1250,11 @@ TOOLKIT_API int bus_endpt_send_msg(bus_endpt_t* endpt, int msg, int nparam, para
 	list_add_tail(&e.entry, &endpt->msg_list);
 	spinlock_leave(&endpt->msg_lock);
 	wdata = 1;
-	rc = write(endpt->msg_fd, &wdata, sizeof wdata);
+	do {
+		rc = write(endpt->msg_fd, &wdata, sizeof wdata);
+	} while (rc < 0 && rc == EINTR);
 	if (rc == -1) {
-		WLog_ERR(TAG, "write to eventfd failed: %s(%d)", strerror(errno), errno);
+		WLog_ERR(TAG, "write to eventfd failed: %d", errno);
 		CloseHandle(e.evt);
 		if (nparam) {
 			free(e.params);
@@ -1344,7 +1354,6 @@ TOOLKIT_API int bus_endpt_poll(bus_endpt_t* endpt, int timeout)
 			rc = -1;
 		}
 	}
-
 	return rc;
 }
 

+ 28 - 15
libtoolkit/unix/bus_daemon.c

@@ -88,7 +88,7 @@ static unsigned int __stdcall thread_proc(void *param)
 	while (!daemon->lstop) {
 		ioqueue_poll(daemon->ioq, 10);
 	}
-
+	WLog_DBG(TAG, "%s exit ! thread id: %d", __FUNCTION__, GetCurrentThreadId());
 	return 0;
 }
 
@@ -129,17 +129,20 @@ static void add_unregistered_list(endpt_session_t *session)
 
 static void remove_session_list(endpt_session_t *session)
 {
-	WLog_DBG(TAG, "remove session(%d) from register list", session->epid);
 	bus_daemon_t *daemon = session->daemon;
-	daemon_lock(daemon);
-	list_del(&session->entry);
-	if (session->epid != BUS_INVALID_EPID) {
-		endpt_session_t *pos, *n;
-		list_for_each_entry_safe(pos, n, &daemon->registered_session_list, endpt_session_t, entry) {
-			queue_sys_pkt(pos, session->epid, BUS_STATE_OFF);
+	/*lstop condition just for clear*/
+	if (daemon->lstop == 0) {
+		WLog_DBG(TAG, "remove session(%d) from register list", session->epid);
+		daemon_lock(daemon);
+		list_del(&session->entry);
+		if (session->epid != BUS_INVALID_EPID) {
+			endpt_session_t* pos, * n;
+			list_for_each_entry_safe(pos, n, &daemon->registered_session_list, endpt_session_t, entry) {
+				queue_sys_pkt(pos, session->epid, BUS_STATE_OFF);
+			}
 		}
+		daemon_unlock(daemon);
 	}
-	daemon_unlock(daemon);
 }
 
 static void move_to_registered_session(endpt_session_t *session)
@@ -200,8 +203,8 @@ on_error:
 
 static void destroy_session(endpt_session_t *session)
 {
-	WLog_DBG(TAG, "enter {%s}", __FUNCTION__);
-	while (iobuffer_queue_count(session->tx_iobuf_queue) > 0) {
+	WLog_DBG(TAG, "==> {%s}", __FUNCTION__);
+	while (session->daemon->lstop == 0 && iobuffer_queue_count(session->tx_iobuf_queue) > 0) {
 		iobuffer_t *iobuf = iobuffer_queue_deque(session->tx_iobuf_queue);
 		endpt_session_t *ts = iobuffer_get_user_data(iobuf);
 		if (ts) {
@@ -211,7 +214,6 @@ static void destroy_session(endpt_session_t *session)
 		iobuffer_destroy(iobuf);
 	}
 	iobuffer_queue_destroy(session->tx_iobuf_queue);
-
 	DeleteCriticalSection(&session->lock);
 
 	if (session->type == TYPE_PIPE) {
@@ -223,6 +225,7 @@ static void destroy_session(endpt_session_t *session)
 	}
 
 	free(session);
+	WLog_DBG(TAG, "<== {%s}", __FUNCTION__);
 }
 
 IMPLEMENT_REF_COUNT_MT_STATIC(endpt_session, endpt_session_t, ref_cnt, destroy_session)
@@ -875,6 +878,7 @@ static daemon_accetpor_t* create_daemon_acceptor(bus_daemon_t *daemon, char *url
 
 static void destroy_daemon_acceptor(daemon_accetpor_t *dacceptor)
 {
+	WLog_DBG(TAG, "destroy daemon acceptor.");
 	int i;
 
 	for (i = 0; i < dacceptor->arr_ov->nelts; ++i)
@@ -980,7 +984,8 @@ TOOLKIT_API int bus_daemon_stop(bus_daemon_t *daemon)
 	int i;
 
 	// exit all worker thread
-	InterlockedExchange(&daemon->lstop, 1);
+	daemon->lstop = 1;
+	WLog_DBG(TAG, "set daemon stop flag: %d", daemon->lstop);
 	for (i = 0; i < daemon->arr_thread->nelts; ++i) {
 		HANDLE t = ARRAY_IDX(daemon->arr_thread, i, HANDLE);
 		WaitForSingleObject(t, INFINITE);
@@ -1006,7 +1011,14 @@ TOOLKIT_API int bus_daemon_stop(bus_daemon_t *daemon)
 			if (pos->type == TYPE_PIPE) {
 				ioqueue_file_close(&pos->pipe);
 			} else if (pos->type == TYPE_TCP) {
+				/*clear here*/
+				//endpt_session_inc_ref(pos);
 				ioqueue_tcpsock_close(&pos->tcp);
+				//{
+				//	iobuffer_queue_destroy(pos->tx_iobuf_queue);
+				//	DeleteCriticalSection(&pos->lock);
+				//	free(pos);
+				//}
 			}
 		}
 		list_for_each_entry(pos, &daemon->unregistered_session_list, endpt_session_t, entry) {
@@ -1018,11 +1030,12 @@ TOOLKIT_API int bus_daemon_stop(bus_daemon_t *daemon)
 		}
 	}
 
-	// poll until all pending io are aborted
+	WLog_DBG(TAG, "poll until all pending io are aborted: handle %d, msg: %d",
+		ioqueue_handler_empty(daemon->ioq), ioqueue_msg_empty(daemon->ioq));
 	while (!ioqueue_can_exit(daemon->ioq)) {
 		ioqueue_poll(daemon->ioq, 10);
 	}
-
+	WLog_DBG(TAG, "to destroy ioqueue.");
 	ioqueue_destroy(daemon->ioq);
 
 	return 0;

+ 20 - 1
libtoolkit/unix/core.c

@@ -2,6 +2,8 @@
 #include <errno.h>
 #include <fcntl.h>  /* O_CLOEXEC */
 #include <sys/ioctl.h>
+#include <sys/syscall.h>
+#include <sys/eventfd.h>
 
 int make_fd_cloexec(int fd, int set)
 {
@@ -44,4 +46,21 @@ int make_fd_nonblock(int fd, int set)
 		return -1;
 
 	return 0;
-}
+}
+
+
+int async_fd_create()
+{
+	int fd;
+
+	fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
+	if (fd == -1) {
+		return -1;
+	}
+	return fd;
+}
+
+void async_fd_close(int fd)
+{
+	close(fd);
+}

+ 3 - 0
libtoolkit/unix/core.h

@@ -1,6 +1,7 @@
 #ifndef TOOLKIT_UNIX_H
 #define TOOLKIT_UNIX_H
 
+#include "config.h"
 #include <pthread.h>
 
 typedef pthread_t toolkit_thread_t;
@@ -9,4 +10,6 @@ typedef pthread_mutex_t toolkit_mutex_t;
 int make_fd_cloexec(int fd, int set);
 int make_fd_nonblock(int fd, int set);
 
+int async_fd_create();
+void async_fd_close(int fd);
 #endif //TOOLKIT_UNIX_H

+ 137 - 54
libtoolkit/unix/evtpoll.c

@@ -7,6 +7,7 @@
 #include "refcnt.h"
 #include "evtpoll.h"
 #include "core.h"
+#include "queue.h"
 #include <winpr/wlog.h>
 #define TAG TOOLKIT_TAG("evtpoll")
 
@@ -21,10 +22,15 @@ struct event_epoll_data_s {
 };
 
 struct event_epoll_s {
-	int epfd;
+	int fd;
 	/*insterest*/
 	spinlock_t interest_list_lock;
 	struct list_head interest_list;
+
+	/*new mechanism*/
+	event_epoll_data_t** interests;
+	unsigned int cnt_of_intersests;
+	unsigned int nfds;
 };
 
 struct evtpoll_interest_entry_s {
@@ -84,6 +90,8 @@ int evtpoll_data_set(event_epoll_data_t* epoll_data, int type, int fd, void* dat
 	epoll_data->fd = fd;
 	epoll_data->type = type;
 	epoll_data->owner = data;
+
+	return 0;
 }
 
 int evtpoll_data_get_fd(const event_epoll_data_t* const epoll_data)
@@ -111,6 +119,62 @@ void* evtpoll_data_get_data(const event_epoll_data_t* const epoll_data)
 }
 
 
+static unsigned int next_power_of_two(unsigned int val) {
+	val -= 1;
+	val |= val >> 1;
+	val |= val >> 2;
+	val |= val >> 4;
+	val |= val >> 8;
+	val |= val >> 16;
+	val += 1;
+	return val;
+}
+
+static void resize_interests_capacity(evtpoll_t*ep, int len)
+{
+	event_epoll_data_t** interests;
+	void* fake_interest_list;
+	void* fake_interest_count;
+	unsigned int cnt_of_intersests;
+	unsigned int i;
+
+	if (len <= ep->cnt_of_intersests)
+		return;
+
+	/* Preserve fake interest list and count at the end of the interests */
+	if (ep->interests != NULL) {
+		fake_interest_list = ep->interests[ep->cnt_of_intersests];
+		fake_interest_count = ep->interests[ep->cnt_of_intersests + 1];
+	}
+	else {
+		fake_interest_list = NULL;
+		fake_interest_count = NULL;
+	}
+
+	cnt_of_intersests = next_power_of_two(len + 2) - 2;
+	interests = REALLOC(ep->interests, (cnt_of_intersests + 2) * sizeof(ep->interests[0]));
+	if (interests == NULL) {
+		WLog_ERR(TAG, "no enough memory!!");
+		abort();
+	}
+	for (i = ep->cnt_of_intersests; i < cnt_of_intersests; i++)
+		interests[i] = NULL;
+	interests[cnt_of_intersests] = fake_interest_list;
+	interests[cnt_of_intersests + 1] = fake_interest_count;
+
+	ep->interests = interests;
+	ep->cnt_of_intersests = cnt_of_intersests;
+}
+
+void evtpoll__add(evtpoll_t* ep, event_epoll_data_t* evtpoll_data)
+{
+	resize_interests_capacity(ep, evtpoll_data->fd + 1);
+	if (ep->interests[evtpoll_data->fd] == NULL) {
+		ep->interests[evtpoll_data->fd] = evtpoll_data;
+	}
+}
+
+
 evtpoll_t* evtpoll_create()
 {
 	int fd;
@@ -129,34 +193,32 @@ evtpoll_t* evtpoll_create()
 		}
 	}
 	if (fd == -1) {
-		WLog_ERR(TAG, "epoll fd create failed: %s", strerror(errno));
+		WLog_ERR(TAG, "epoll fd create failed: %d", (errno));
 		goto on_error;
 	}
-	ep->epfd = fd;
+	ep->fd = fd;
 
 	spinlock_init(&ep->interest_list_lock);
 	INIT_LIST_HEAD(&ep->interest_list);
+
+	ep->cnt_of_intersests = 0;
+	ep->interests = NULL;
+	ep->nfds = 0;
+
 	return ep;
 on_error:
 	free(ep);
 	return NULL;
 }
 
-void evtpoll_destroy(evtpoll_t* evt_poll)
-{
-	assert(evtpoll__interest_empty(evt_poll));
-	close(evt_poll->epfd);
-	free(evt_poll);
-}
-
 int evtpoll_get_epoll_fd(const evtpoll_t* const evt_poll)
 {
-	return evt_poll->epfd;
+	return evt_poll->fd;
 }
 
 int evtpoll_get_raw_fd(evtpoll_t* ep)
 {
-	return ep->epfd;
+	return ep->fd;
 }
 
 void evtpoll__interest_entry_set(
@@ -239,6 +301,7 @@ void evtpoll___del_interest_list(evtpoll_interest_t* inst)
 	spinlock_leave(&ep->interest_list_lock);
 }
 
+/*remove from inst list*/
 void evtpoll__interest_free(evtpoll_interest_t* inst)
 {
 	evtpoll___del_interest_list(inst);
@@ -259,10 +322,10 @@ static int evtpoll__ctl(evtpoll_t* ep, int event_mask, int ctrl_mod, int fd, evt
 	}
 	ret = epoll_ctl(evtpoll_get_epoll_fd(ep), ctrl_mod, fd, &ee);
 	if (ret == -1) {
-		WLog_ERR(TAG, "epoll ctl failed: %s", strerror(errno));
+		WLog_ERR(TAG, "epoll ctl failed: %d", (errno));
 	}
 	else {
-		WLog_INFO(TAG, "epoll_ctl(%d): OUT:%d, IN:%d, OTHERS:0x%08X, ADD:%d, MOD:%d, data:0x%X ret:%d",
+		WLog_INFO(TAG, "epoll_ctl(%d): OUT:%d, IN:%d, OTHERS:0x%08X, ADD:%d, MOD:%d, inst:0x%X ret:%d",
 			fd,
 			(event_mask & EPOLLOUT) > 0 ? 1 : 0, (event_mask & EPOLLIN) > 0 ? 1 : 0,
 			(event_mask & ~(EPOLLOUT | EPOLLIN)),
@@ -271,22 +334,23 @@ static int evtpoll__ctl(evtpoll_t* ep, int event_mask, int ctrl_mod, int fd, evt
 	return ret;
 }
 
-static int evtpoll__ctl_remove(evtpoll_t* ep, int event_mask, int fd, void* data)
-{
-	int ret = -1;
-	struct epoll_event ee;
-	ee.events = event_mask;
-	if (data)
-		ee.data.ptr = data;
-	else
-		ee.data.fd = fd;
-	ret = epoll_ctl(evtpoll_get_epoll_fd(ep), EPOLL_CTL_DEL, fd, &ee);
-	if (ret == 0 || (ret == -1 && errno == ENOENT)) {
-		return 0;
-	}
-	WLog_ERR(TAG, "epoll remove ctrl failed: %s", strerror(errno));
-	return -1;
-}
+/*no use for now.*/
+//static int evtpoll__ctl_remove(evtpoll_t* ep, int event_mask, int fd, void* data)
+//{
+//	int ret = -1;
+//	struct epoll_event ee;
+//	ee.events = event_mask;
+//	if (data)
+//		ee.data.ptr = data;
+//	else
+//		ee.data.fd = fd;
+//	ret = epoll_ctl(ep->fd, EPOLL_CTL_DEL, fd, &ee);
+//	if (ret == 0 || (ret == -1 && errno == ENOENT)) {
+//		return 0;
+//	}
+//	WLog_ERR(TAG, "epoll remove ctrl failed: %d", (errno));
+//	return -1;
+//}
 
 /*!
  * @brief
@@ -316,6 +380,7 @@ static evtpoll_interest_t* evtpoll__interest_create(int fd)
 	return inst;
 }
 
+/*free inst and its entry*/
 static void evtpoll__interest_destroy(evtpoll_interest_t* inst)
 {
 	int i;
@@ -353,6 +418,9 @@ static void evtpoll__interest_clear(evtpoll_t* ep)
 	list_for_each_safe(pos, tmp, &ep->interest_list) {
 		evtpoll_interest_t* node = list_entry(pos, evtpoll_interest_t, node);
 		list_del(pos);
+		if (node->events != 0) {
+			evtpoll_unsubscribe(node->owner, EV_READ_WRITE_WITH_LT_PURE, node->fd, 0, NULL, NULL);
+		}
 		evtpoll__interest_destroy(node);
 	}
 	spinlock_leave(&ep->interest_list_lock);
@@ -382,22 +450,24 @@ static int evtpoll__register_interest(evtpoll_t* ep, evtpoll_interest_t* inst)
 	return ret;
 }
 
-static int evtpoll__unregister_interest(evtpoll_t* ep, int fd)
+static void evtpoll__unregister_interest(evtpoll_interest_t* inst)
 {
-	int ret = 0;
-	evtpoll_interest_t* exist;
-	exist = evtpoll__find_interest(ep, fd);
-	if (exist) {
-		evtpoll__interest_free(exist);
-#if 1//DEBUG
-		exist = evtpoll__find_interest(ep, fd);
-		assert(!exist);
-#endif
-		evtpoll__interest_destroy(exist);
-	}
-	return ret;
+	evtpoll__interest_free(inst);
+	evtpoll__interest_destroy(inst);
 }
 
+/*remove inst from list and destroy it*/
+//static int evtpoll__unregister_interest(evtpoll_t* ep, int fd)
+//{
+//	int ret = 0;
+//	evtpoll_interest_t* exist;
+//	exist = evtpoll__find_interest(ep, fd);
+//	if (exist) {
+//		evtpoll__unregister_interest(exist);
+//	}
+//	return ret;
+//}
+
 int evtpoll_attach(evtpoll_t* ep, int interest_fd)
 {
 	int ret = 0;
@@ -424,9 +494,13 @@ int evtpoll_attach(evtpoll_t* ep, int interest_fd)
 	return 0;
 }
 
+/*unsubscribe R|W event and unregist it */
 static void evtpoll__detach(evtpoll_interest_t* inst)
 {
-	evtpoll__unregister_interest(inst->owner, inst);
+	if (inst->events != 0) {
+		evtpoll_unsubscribe(inst->owner, EV_READ_WRITE_WITH_LT_PURE, inst->fd, 0, NULL, NULL);
+	}
+	evtpoll__unregister_interest(inst);
 }
 
 void evtpoll_detach(evtpoll_t* ep, int interest_fd)
@@ -434,9 +508,6 @@ void evtpoll_detach(evtpoll_t* ep, int interest_fd)
 	evtpoll_interest_t* exist;
 	exist = evtpoll__find_interest(ep, interest_fd);
 	if (exist) {
-		if (exist->events != 0) {
-			evtpoll_unsubscribe(ep, EV_READ_WRITE_WITH_LT_PURE, exist->fd, 0);
-		}
 		evtpoll__detach(exist);
 	}
 }
@@ -643,7 +714,7 @@ int evtpoll_subscribe(evtpoll_t* ep, int event_mask, int fd, void* rdata, void*
 	return ret;
 }
 
-int evtpoll_unsubscribe(evtpoll_t* ep, int event_mask, int fd, int only_reset)
+int evtpoll_unsubscribe(evtpoll_t* ep, int event_mask, int fd, int only_reset, void** rdata, void** wdata)
 {
 	int ret;
 	evtpoll_interest_t* exist;
@@ -660,9 +731,11 @@ int evtpoll_unsubscribe(evtpoll_t* ep, int event_mask, int fd, int only_reset)
 	ret = 0;
 	
 	if (!ret && (event_mask & EV_READ)) {
+		evtpoll_interest_entry_t* entry =
+			ARRAY_IDX(exist->entries, EV_INTEREST_ENTRY_IN_IDX, evtpoll_interest_entry_t*);
+		if (rdata)
+			*rdata = entry->data;
 		if (only_reset) {
-			evtpoll_interest_entry_t* entry =
-				ARRAY_IDX(exist->entries, EV_INTEREST_ENTRY_IN_IDX, evtpoll_interest_entry_t*);
 			evtpoll__interest_entry_reset(entry);
 		}
 		else {
@@ -671,9 +744,11 @@ int evtpoll_unsubscribe(evtpoll_t* ep, int event_mask, int fd, int only_reset)
 	}
 
 	if (!ret && (event_mask & EV_WRITE)) {
+		evtpoll_interest_entry_t* entry =
+			ARRAY_IDX(exist->entries, EV_INTEREST_ENTRY_OUT_IDX, evtpoll_interest_entry_t*);
+		if (wdata)
+			*wdata = entry->data;
 		if (only_reset) {
-			evtpoll_interest_entry_t* entry =
-				ARRAY_IDX(exist->entries, EV_INTEREST_ENTRY_OUT_IDX, evtpoll_interest_entry_t*);
 			evtpoll__interest_entry_reset(entry);
 		}
 		else {
@@ -745,7 +820,7 @@ int evtpoll_wait(evtpoll_t* ep, struct epoll_event event_array[], int event_arra
 	do 
 	{
 
-		nfds = epoll_wait(ep->epfd, event_array, event_array_size, timeout);
+		nfds = epoll_wait(ep->fd, event_array, event_array_size, timeout);
 
 	} while (nfds == -1 && errno == EINTR);
 
@@ -754,7 +829,7 @@ int evtpoll_wait(evtpoll_t* ep, struct epoll_event event_array[], int event_arra
 		return 0;
 	}
 	if (nfds == -1) {
-		WLog_ERR(TAG, "epoll wait error: %s(%d)", strerror(errno), errno);
+		WLog_ERR(TAG, "epoll wait error: %d", errno);
 		return -1;
 	}
 
@@ -768,4 +843,12 @@ int evtpoll_loop(evtpoll_t* ep, int timeout)
 	return ret;
 }
 
+void evtpoll_destroy(evtpoll_t* evt_poll)
+{
+	evtpoll__interest_clear(evt_poll);
+	assert(evtpoll__interest_empty(evt_poll));
+	close(evt_poll->fd);
+	free(evt_poll);
+}
+
 #endif //NOT _WIN32

+ 8 - 1
libtoolkit/unix/evtpoll.h

@@ -34,6 +34,13 @@ level-triggered(LT): default
 extern "C" {
 #endif
 
+	struct event_epoll_data_s;
+	struct event_epoll_s;
+
+	typedef void(*evtpoll_data_cb)(struct event_epoll_s* ep,
+		struct event_epoll_data_s* d,
+		unsigned int events);
+
 	typedef struct event_epoll_data_s event_epoll_data_t;
 	typedef struct event_epoll_s evtpoll_t;
 
@@ -67,7 +74,7 @@ extern "C" {
 
 	int evtpoll_subscribe(evtpoll_t* ep, int event_mask, int fd, void* rdata, void* wdata);
 
-	int evtpoll_unsubscribe(evtpoll_t* ep, int event_mask, int fd, int only_reset);
+	int evtpoll_unsubscribe(evtpoll_t* ep, int event_mask, int fd, int only_reset, void** rdata, void** wdata);
 
 	int evtpoll_loop(evtpoll_t* ep, int timeout);
 

+ 177 - 29
libtoolkit/unix/ioqueue.c

@@ -8,6 +8,7 @@
 #include "bus.h"
 #include "array.h"
 #include "evtpoll.h"
+#include "core.h"
 
 #include <winpr/file.h>
 #include <winpr/handle.h>
@@ -16,6 +17,8 @@
 #include <winpr/string.h>
 #include <winpr/wlog.h>
 
+#include <sys/eventfd.h>
+
 #define TAG TOOLKIT_TAG("ioqueue")
 
 #ifndef SO_UPDATE_CONNECT_CONTEXT
@@ -39,7 +42,7 @@ struct ioqueue_t {
 	HANDLE iocp; //for build successfully temporary!!!!!!!!!!!!!!! TODO: delete it!!
 
 	evtpoll_t* ep;
-	
+	int msg_fd;
 	void *user_data;
 	/* timer */
 	spinlock_t tm_queue_lock;
@@ -56,6 +59,7 @@ struct ioqueue_t {
 };
 
 typedef struct ioqueue_msg {
+	OVERLAPPED ov; /*it's bound to append this structure just for recognize its type.*/
 	int msg_type;
 	param_size_t param1;
 	param_size_t param2;
@@ -220,11 +224,13 @@ static int is_os_gte_xp() /* is os version greater and equal than xp */
 
 static __inline LONG inc_msg_cnt(ioqueue_t *ioq) 
 { 
+	//return ++ioq->msg_cnt;
 	return InterlockedIncrement(&ioq->msg_cnt); 
 }
 
 static __inline LONG dec_msg_cnt(ioqueue_t *ioq) 
 { 
+	//return --ioq->msg_cnt;
 	return InterlockedDecrement(&ioq->msg_cnt); 
 }
 
@@ -247,6 +253,7 @@ static __inline void del_handler_list(ioqueue_handle_context *handle_ctx, ioqueu
 
 static void ioqueue_handle_context_free(ioqueue_handle_context *handle_ctx)
 {
+	WLog_DBG(TAG, "free ioqueue handle context: %d", handle_ctx->type);
 	if (handle_ctx->type == HANDLE_TYPE_UDPSOCK 
 		|| handle_ctx->type == HANDLE_TYPE_TCPSOCK 
 		|| handle_ctx->type == HANDLE_TYPE_ACCEPTOR) {
@@ -270,19 +277,25 @@ static void ioqueue_handle_context_free(ioqueue_handle_context *handle_ctx)
 	}
 	del_handler_list(handle_ctx, handle_ctx->owner);
 }
-
+//ioqueue_handle_context_dec_ref()
+//ioqueue_handle_context_inc_ref
 IMPLEMENT_REF_COUNT_MT(ioqueue_handle_context, ioqueue_handle_context, pending_ios, ioqueue_handle_context_free)
 
 static __inline LONG inc_pending_io(ioqueue_handle_context *handle_ctx)
 {
-	return inc_ref(ioqueue_handle_context, handle_ctx);
+	LONG l = inc_ref(ioqueue_handle_context, handle_ctx);
+	WLog_DBG(TAG, "increase pending io: %d, cnt:%ld", handle_ctx->type, l);
+	return l;
 }
 
 static __inline LONG dec_pending_io(ioqueue_handle_context *handle_ctx)
 {
-	return dec_ref(ioqueue_handle_context, handle_ctx);
+	LONG l = dec_ref(ioqueue_handle_context, handle_ctx);
+	WLog_DBG(TAG, "decrease pending io: %d, cnt:%ld", handle_ctx->type, l);
+	return 1;
 }
 
+/*
 static SOCKET new_socket()
 {
 	SOCKET sock = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, 
@@ -292,6 +305,7 @@ static SOCKET new_socket()
 	}
 	return sock;
 }
+*/
 
 static void delete_socket(SOCKET sock)
 {
@@ -312,9 +326,17 @@ TOOLKIT_API ioqueue_t *ioqueue_create()
 	if (!ioq->ep) {
 		goto on_error_0;
 	}
-
+	ioq->msg_fd = eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE);
+	if (ioq->msg_fd == -1) {
+		WLog_ERR(TAG, "create event fd failed: %d", errno);
+		ioq->msg_fd = 0;
+		goto on_error_1;
+	} 
+	if (evtpoll_attach(ioq->ep, ioq->msg_fd) != 0) {
+		goto on_error_2;
+	}
 	if (timer_heap_create(&ioq->tm_queue) != 0) {
-		goto on_error_3;
+		goto on_error_2;
 	}
 	spinlock_init(&ioq->tm_queue_lock);
 	spinlock_init(&ioq->connect_list_lock);
@@ -324,7 +346,9 @@ TOOLKIT_API ioqueue_t *ioqueue_create()
 
 	return ioq;
 
-on_error_3:
+on_error_2:
+	close(ioq->msg_fd);
+on_error_1:
 	evtpoll_destroy(ioq->ep);
 on_error_0:
 	free(ioq);
@@ -337,6 +361,8 @@ TOOLKIT_API void ioqueue_destroy(ioqueue_t *ioq)
 	assert(ioqueue_handler_empty(ioq));
 	assert(ioqueue_msg_empty(ioq));
 	timer_queue_destroy(ioq->tm_queue);
+	evtpoll_detach(ioq->ep, ioq->msg_fd);
+	close(ioq->msg_fd);
 	evtpoll_destroy(ioq->ep);
 	free(ioq);
 }
@@ -446,14 +472,18 @@ static int pre_dispatch_network(BOOL* ret, DWORD* dwBytesTransfer, ioqueue_overl
 		assert(((ioqueue_accept_overlapped_t*)io_ctx)->client == INVALID_SOCKET);
 		addrlen = sizeof(addr);
 		bzero(&addr, addrlen);
-		conn_socket = accept(handle_ctx->u.sock, (struct sockaddr*) & addr, &addrlen);
+		conn_socket = _accept(handle_ctx->u.sock, (struct sockaddr*) & addr, &addrlen);
 		if (conn_socket == -1) {
 			WLog_ERR(TAG, "accept connect socket failed: %d", errno);
 			return -1;
 		}
 		WLog_INFO(TAG, "new connected socket fd(%d) arrived at listen socket: %d from %s:%d",
 			conn_socket, handle_ctx->u.sock, inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
-
+		if (make_fd_cloexec(conn_socket, 1)) {
+			WLog_ERR(TAG, "enable cloexec failed: %d", errno);
+			closesocket(conn_socket);
+			return -1;
+		}
 		((ioqueue_accept_overlapped_t*)io_ctx)->client = conn_socket;
 		if (ret) *ret = TRUE;
 		if (dwBytesTransfer) *dwBytesTransfer = 0;
@@ -492,7 +522,7 @@ static int pre_dispatch_network(BOOL* ret, DWORD* dwBytesTransfer, ioqueue_overl
 						nwrite = 0;
 					}
 					else {
-						WLog_ERR(TAG, "write error: %s(%d)", strerror(errno), errno);
+						WLog_ERR(TAG, "write error: %d", errno);
 					}
 					break;
 				}
@@ -523,10 +553,7 @@ static int pre_dispatch_network(BOOL* ret, DWORD* dwBytesTransfer, ioqueue_overl
 			n = 0;
 			for (; n < overlapped->wsabuf.len;)
 			{
-				do  
-				{
-					nread = _recv(tcpsock->u.sock, overlapped->wsabuf.buf + n, overlapped->wsabuf.len - n, 0);
-				} while (nread < 0 && errno == EINTR);
+				nread = _recv(tcpsock->u.sock, overlapped->wsabuf.buf + n, overlapped->wsabuf.len - n, 0);
 				if (nread > 0) {
 					n += nread;
 					continue;
@@ -539,7 +566,7 @@ static int pre_dispatch_network(BOOL* ret, DWORD* dwBytesTransfer, ioqueue_overl
 					nread = 0;
 					break;
 				} else {
-					WLog_ERR(TAG, "read error: %s(%d)", strerror(errno), errno);
+					WLog_ERR(TAG, "read error: %d", errno);
 					break;
 				}
 			}
@@ -624,6 +651,8 @@ static void dispatch_network(BOOL ret, DWORD dwBytesTransfer, ioqueue_overlapped
 						ioqueue_sendn_overlapped_t *overlapped = (ioqueue_sendn_overlapped_t*)io_ctx;
 						overlapped->sended_bytes += dwBytesTransfer;
 						if (err == 0 && overlapped->sended_bytes < overlapped->total_bytes) {
+							int rc;
+							DWORD bytesWritten;
 							ioqueue_tcpsock_t* tcpsock = overlapped->base.handle_ctx;
 							overlapped->wsabuf.buf += dwBytesTransfer;
 							overlapped->wsabuf.len -= dwBytesTransfer;
@@ -634,7 +663,14 @@ static void dispatch_network(BOOL ret, DWORD dwBytesTransfer, ioqueue_overlapped
 							overlapped->base.ov.OffsetHigh = 0;
 							ioqueue_overlapped_set_mask(io_ctx, sizeof(ioqueue_sendn_overlapped_t));
 							WLog_WARN(TAG, "OV_SENDN: Must be due with this situation %d < %d", overlapped->sended_bytes, overlapped->total_bytes);
-							evtpoll_subscribe(ioqueue_tcpsock_get_owned_ioqueue(tcpsock)->ep, EV_WRITE, tcpsock->u.sock, NULL, io_ctx);
+							//rc = _send(tcpsock->u.sock, overlapped->wsabuf.buf, overlapped->wsabuf.len, 0);
+							if (0 != evtpoll_subscribe(ioqueue_tcpsock_get_owned_ioqueue(tcpsock)->ep, EV_WRITE, tcpsock->u.sock, NULL, io_ctx)) {
+									dec_pending_io(handle_ctx);
+									overlapped->on_send_callback(handle_ctx, io_ctx, overlapped->original_buf,
+										overlapped->sended_bytes, base_ov->user_data, -1);
+							}
+							//rc = WSASend(handle_ctx->u.sock, &overlapped->wsabuf, 1, &bytesWritten,
+							//	0, &overlapped->base.ov, NULL);
 							//if (rc != 0 && WSAGetLastError() != WSA_IO_PENDING) {
 							//	dec_pending_io(handle_ctx);
 							//	overlapped->on_send_callback(handle_ctx, io_ctx, overlapped->original_buf, 
@@ -642,7 +678,7 @@ static void dispatch_network(BOOL ret, DWORD dwBytesTransfer, ioqueue_overlapped
 							//}
 						} else {
 							ioqueue_tcpsock_t* tcpsock = overlapped->base.handle_ctx;
-							evtpoll_unsubscribe(ioqueue_tcpsock_get_owned_ioqueue(tcpsock)->ep, EV_WRITE, tcpsock->u.sock, 0);
+							evtpoll_unsubscribe(ioqueue_tcpsock_get_owned_ioqueue(tcpsock)->ep, EV_WRITE, tcpsock->u.sock, 0, NULL, NULL);
 							overlapped->on_send_callback(handle_ctx, io_ctx, overlapped->original_buf, 
 								overlapped->sended_bytes, base_ov->user_data, err);
 						}
@@ -831,36 +867,75 @@ static void dispatch_msg(ioqueue_t *ioq, int msg_type, param_size_t param1, para
 
 TOOLKIT_API int ioqueue_post_message(ioqueue_t *ioq, int msg_type, param_size_t param1, param_size_t param2)
 {
+	
 	ioqueue_msg *msg;
+	uint64_t wdata = 0;
+	int rc;
+	WLog_DBG(TAG, "==> ioqueue post message: %d", msg_type);
 	assert(ioq);
 	msg = MALLOC_T(ioqueue_msg);
+	if (msg == NULL)
+		return -1;
+	ioqueue_overlapped_set_mask((ioqueue_overlapped_t*)(uintptr_t)(msg), sizeof(OVERLAPPED));
 	msg->msg_type = msg_type;
 	msg->param1 = param1;
 	msg->param2 = param2;
 	msg->evt = NULL;
+	WLog_DBG(TAG, "subscribe msg: 0x%08X", msg);
+	if (0 != evtpoll_subscribe(ioq->ep, EV_READ, ioq->msg_fd, msg, NULL)) {
+		WLog_ERR(TAG, "ioqueue subscribe msg eventfd failed.");
+		free(msg);
+		return -1;
+	}
 	inc_msg_cnt(ioq);
-	if (!PostQueuedCompletionStatus(ioq->iocp, 0, (ULONG_PTR)msg, NULL)) {
+	wdata = 1;
+	do 
+	{
+		rc = write(ioq->msg_fd, &wdata, sizeof wdata);
+	} while (rc < 0 && rc == EINTR);
+	if (rc == -1) {
+		WLog_ERR(TAG, "write to eventfd failed: %d", errno);
 		dec_msg_cnt(ioq);
 		free(msg);
 		return -1;
 	}
+	WLog_DBG(TAG, "<== ioqueue post message: %d", msg_type);
 	return 0;
 }
 
 TOOLKIT_API int ioqueue_send_message(ioqueue_t *ioq, int msg_type, param_size_t param1, param_size_t param2)
 {
-	ioqueue_msg msg = {msg_type, param1, param2};
+	ioqueue_msg msg;
+	int rc;
+	uint64_t wdata = 0;
+	WLog_DBG(TAG, "==> ioqueue send message: %d", msg_type);
+	memset(&msg, 0, sizeof(ioqueue_msg));
+	msg.msg_type = msg_type;
+	msg.param1 = param1;
+	msg.param2 = param2;
+	ioqueue_overlapped_set_mask((ioqueue_overlapped_t*)(uintptr_t)(&msg), sizeof(OVERLAPPED));
 	assert(ioq);
+	if (0 != evtpoll_subscribe(ioq->ep, EV_READ, ioq->msg_fd, &msg, NULL)) {
+		WLog_ERR(TAG, "ioqueue subscribe msg eventfd failed.");
+		return -1;
+	}
 	msg.evt = CreateEventA(NULL, TRUE, FALSE, NULL);
 	inc_msg_cnt(ioq);
-	if (!PostQueuedCompletionStatus(ioq->iocp, 0, (ULONG_PTR)&msg, NULL)) {
-		CloseHandle(msg.evt);
+	wdata = 1;
+	do {
+		rc = write(ioq->msg_fd, &wdata, sizeof wdata);
+	} while (rc < 0 && rc == EINTR);
+	if (rc == -1) {
+		WLog_ERR(TAG, "write to eventfd failed: (%d)", errno);
 		dec_msg_cnt(ioq);
+		CloseHandle(msg.evt);
 		return -1;
 	}
+	WLog_DBG(TAG, "wait send message result: 0x%08X", &msg);
 	WaitForSingleObject(msg.evt, INFINITE);
 	CloseHandle(msg.evt);
 	dec_msg_cnt(ioq);
+	WLog_DBG(TAG, "<== ioqueue send message: %d", msg_type);
 	return 0;
 }
 
@@ -955,12 +1030,35 @@ TOOLKIT_API int ioqueue_poll(ioqueue_t* q, int timeout)
 				, pe->events & EPOLLOUT ? 1: 0 ,pe->events & EPOLLIN ? 1: 0);
 
 			while (n >= 0) {
-				ioqueue_overlapped_t* io_ctx = NULL;
-				n = evtpoll_deal(ioq->ep, pe, &io_ctx, 1);
+				LPOVERLAPPED iocp_pov = 0;
+				n = evtpoll_deal(ioq->ep, pe, &iocp_pov, 1);
 				if (!n) {
+					ioqueue_overlapped_t* io_ctx = (ioqueue_overlapped_t*)iocp_pov;
 					assert(io_ctx);
-					pre_dispatch_network(&ret, &dwBytesTransfer, io_ctx);
-					dispatch_network(ret, dwBytesTransfer, io_ctx);
+					if (ioqueue_overlapped_get_type(io_ctx) != EV_BUS_ENDPOINT) {
+						pre_dispatch_network(&ret, &dwBytesTransfer, io_ctx);
+						dispatch_network(ret, dwBytesTransfer, io_ctx);
+					}
+					else {
+						WLog_DBG(TAG, "msg arrived.");
+						uint64_t rdata;
+						ioqueue_msg* msg = (ioqueue_msg*)iocp_pov;
+						int msg_type = msg->msg_type;
+						int param1 = msg->param1;
+						int param2 = msg->param2;
+						HANDLE evt = msg->evt;
+						do {
+							ret = read(ioq->msg_fd, &rdata, sizeof rdata);
+						} while (ret < 0 && errno == EINTR);
+						if (!evt)
+							free(msg);
+						if (ret < 0) {
+							WLog_ERR(TAG, "read msg fd failed: %d", errno);
+							abort();
+						}
+						dispatch_msg(ioq, msg_type, param1, param2, evt);
+						dec_msg_cnt(ioq);
+					}
 					t++;
 					count++;
 				}
@@ -1056,15 +1154,19 @@ TOOLKIT_API int ioqueue_acceptor_create(ioqueue_t *ioq,
 	/*Warning: only the front third params are effective !!*/
 	acceptor->u.sock = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
 	if (acceptor->u.sock == INVALID_SOCKET) {
-		WLog_ERR(TAG, "accept socket create failed: %s", strerror(errno));
+		WLog_ERR(TAG, "accept socket create failed: %d", errno);
 		goto on_error;
 	}
 	nonblock_sock(acceptor->u.sock);
+	if (make_fd_cloexec(acceptor->u.sock, 1) != 0) {
+		WLog_ERR(TAG, "set accept socked fd cloexec failed: %d", errno);
+		goto on_error;
+	}
 	service.sin_family = AF_INET;
 	service.sin_port = htons(port);
 	service.sin_addr.s_addr = ip ? inet_addr(ip) : htonl(INADDR_ANY);
 	if (bind(acceptor->u.sock, (struct sockaddr*) & service, sizeof(struct sockaddr)) != 0) {
-		WLog_ERR(TAG, "bind sockect %d failed: %s", acceptor->u.sock, strerror(errno));
+		WLog_ERR(TAG, "bind sockect %d failed: %d", acceptor->u.sock, (errno));
 		goto on_error;
 	}
 	if (evtpoll_attach(ioq->ep, acceptor->u.sock)) {
@@ -1095,16 +1197,42 @@ TOOLKIT_API int ioqueue_acceptor_listen(ioqueue_acceptor_t* acceptor, int backlo
 TOOLKIT_API void ioqueue_acceptor_destroy(ioqueue_acceptor_t* acceptor)
 {
 	assert(acceptor);
+	WLog_DBG(TAG, "destroy ioqueue acceptor.");
 	dec_ref(ioqueue_handle_context, acceptor);
 }
 
 TOOLKIT_API void ioqueue_acceptor_close(ioqueue_acceptor_t* acceptor)
 {
+	WLog_DBG(TAG, "close ioqueue acceptor.");
 	SOCKET s;
 	assert(acceptor);
 	s = acceptor->u.sock;
 	if (s != INVALID_SOCKET) {
 		acceptor->u.sock = INVALID_SOCKET;
+		/*at Windows, IOCP would returned anything that make dec pending happen when close acceptor socket*/
+		/*we should let the poll thread to do this job!!!!!!*/
+		{
+			int rc;
+			void* io = NULL;
+			ioqueue_t* ioq = acceptor->owner;
+			assert(ioq);
+			rc = evtpoll_unsubscribe(ioq->ep, EV_ACCEPT, s, 0, &io, NULL);
+			if (rc == 0 && io != NULL) {
+				WLog_DBG(TAG, "cleaer pending state.");
+				ioqueue_overlapped_t* io_ctx = (ioqueue_overlapped_t*)io;
+				dispatch_network(FALSE, 0, io_ctx);
+
+				//ioqueue_accept_overlapped_t* overlapped = (ioqueue_accept_overlapped_t*)io;
+				//fastlock_enter(acceptor->ov_pending_list_lock);
+				//list_del(&overlapped->base.pending_entry);
+				//fastlock_leave(acceptor->ov_pending_list_lock);
+				//dec_pending_io(acceptor);
+			}
+			else {
+				WLog_WARN(TAG, "unsubcribe failed: %d", rc);
+			}
+			evtpoll_detach(ioq->ep, s);
+		}
 		closesocket(s);
 	}
 }
@@ -1172,7 +1300,7 @@ TOOLKIT_API int ioqueue_acceptor_accept(ioqueue_acceptor_t* acceptor, SOCKET *s,
 		if (FD_ISSET(acceptor->u.sock, &ex_set))
 			return -1;
 		if (FD_ISSET(acceptor->u.sock, &set)) {
-			SOCKET fd = accept(acceptor->u.sock, addr, addrlen);
+			SOCKET fd = _accept(acceptor->u.sock, addr, addrlen);
 			if (fd != INVALID_SOCKET) {
 				*s = fd;
 				return 0;
@@ -1759,8 +1887,28 @@ TOOLKIT_API void ioqueue_tcpsock_close(ioqueue_tcpsock_t *tcpsock)
 	SOCKET s;
 	assert(tcpsock);
 	s = tcpsock->u.sock;
-	if (s != INVALID_SOCKET) {
+	if (s != INVALID_SOCKET) { /*this condition maybe so important!!*/
 		tcpsock->u.sock = INVALID_SOCKET;
+		/*we should let the poll thread to do this job!!!!!!*/
+		{
+			int rc;
+			void* rdata = NULL;
+			void* wdata = NULL;
+			ioqueue_t* ioq = tcpsock->owner;
+			assert(ioq);
+			rc = evtpoll_unsubscribe(ioq->ep, EV_READ_WRITE_WITH_LT_PURE, s, 0, &rdata, &wdata);
+			if (rc == 0 && (rdata || wdata)) {
+				WLog_DBG(TAG, "clear tcpsocket state. r(0x%08X), w(0x%08X)", rdata, wdata);
+				if (rdata) {
+					ioqueue_overlapped_t* io_ctx = (ioqueue_overlapped_t*)rdata;
+					dispatch_network(FALSE, 0, io_ctx);
+				}
+				if (wdata) {
+					ioqueue_overlapped_t* io_ctx = (ioqueue_overlapped_t*)wdata;
+					dispatch_network(FALSE, 0, io_ctx);
+				}
+			}
+		}
 		closesocket(s);
 	}
 }

+ 5 - 0
libtoolkit/win/ioqueue.c

@@ -668,11 +668,15 @@ static void dispatch_msg(ioqueue_t *ioq, int msg_type, int param1, int param2, H
 		SetEvent(evt);
 }
 
+/*MSG_REMOVE_REGISTAR*/
 TOOLKIT_API int ioqueue_post_message(ioqueue_t *ioq, int msg_type, int param1, int param2)
 {
 	ioqueue_msg *msg;
 	assert(ioq);
 	msg = MALLOC_T(ioqueue_msg);
+	if (msg == NULL) {
+		return -1;
+	}
 	msg->msg_type = msg_type;
 	msg->param1 = param1;
 	msg->param2 = param2;
@@ -686,6 +690,7 @@ TOOLKIT_API int ioqueue_post_message(ioqueue_t *ioq, int msg_type, int param1, i
 	return 0;
 }
 
+/*It seems no use anywhere*/
 TOOLKIT_API int ioqueue_send_message(ioqueue_t *ioq, int msg_type, int param1, int param2)
 {
 	ioqueue_msg msg = {msg_type, param1, param2};

+ 34 - 15
spbase/SpBase.cpp

@@ -33,7 +33,9 @@
 
 
 #ifndef _WIN32
+
 static SpModule* g_module = NULL;
+
 #endif //NOT _WIN32
 
 SpModule *GetSpModule()
@@ -246,25 +248,23 @@ static HMODULE LoadModuleLibrary(sp_mod_t *mod)
 	return LoadLibraryA(tmp);
 }
 
+/*It seems never been used anywhere.*/
 SPBASE_API HINSTANCE SpLoadLibrary(const char *file)
 {
 	sp_env_t *env = sp_get_env();
 	if (env) {
 		char tmp[MAX_PATH];
-#ifdef _WIN32
-		sprintf(tmp, "%s\\%s", env->dir->dep_path, file);
-#else
-		sprintf(tmp, "%s/%s", env->dir->dep_path, file);
-#endif //_WIN32
+		sprintf(tmp, "%s" SPLIT_SLASH_STR "%s", env->dir->dep_path, file);
 		return LoadLibraryA(tmp);
 	} else {
 		return NULL;
 	}
 }
 
-#ifdef _WIN32
+/*only sphost would invoke it.*/
 extern "C" SPBASE_API int __stdcall SpExit(const char* mod_name)
 {
+#ifdef _WIN32
 	Dbg("Do SpExit!");
 	SetthreadGroup(GetCurrentThreadId(), mod_name);
 	CleanModuleThread(mod_name);
@@ -280,18 +280,27 @@ extern "C" SPBASE_API int __stdcall SpExit(const char* mod_name)
 
 	FreeLibrary(getEntityResource()->m_Module);
 	DestoryModuleInfo(mod_name);
+#else
+	if (sp_shm_is_newalloc())
+		return 0; /*spshell*/
+
+	if (g_module) {
+		delete g_module;
+		g_module = nullptr;
+	}
+
+#endif //_WIN32
 	return 0;
 }
-#endif //_WIN32
+//static void SpExit(void) __attribute__((destructor));
 
 extern "C" SPBASE_API int __stdcall SpRun(const char *mod_name, int epid, int range, int group)
 {
 	ErrorCodeEnum Error = Error_Unexpect;
 
-	if (!mod_name)
-		return Error_Bug;
-	if (epid == SP_INVALID_MOD_ID)
-		return Error_Bug;
+	if (!mod_name) { return Error_Bug; }
+		
+	if (epid == SP_INVALID_MOD_ID) { return Error_Bug;  }
 
 #ifdef _WIN32
 	if (findModuleByName(mod_name))		//¼ì²âʵÌåÊÇ·ñÒÑ´´½¨
@@ -379,14 +388,14 @@ extern "C" SPBASE_API int __stdcall SpRun(const char *mod_name, int epid, int ra
 	}
 	Error = g_module->Init(env->url);
 	if (Error) {
-		delete g_module;
-		g_module = NULL;
 		goto on_error;
 	}
 #endif //_WIN32
+	
 	sp_dbg_debug("before set thread priority");
 	SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL);
 	sp_dbg_debug("after set thread priority");
+
 #ifdef _WIN32
 	if (!SetSpModule(mod_name, curModule))
 		goto on_error;
@@ -402,13 +411,23 @@ extern "C" SPBASE_API int __stdcall SpRun(const char *mod_name, int epid, int ra
 
 	Error = g_module->Run();
 	g_module->Term();
-	FreeLibrary(hModule);
+	
 #endif //_WIN32
 
 	sp_shm_term();
-
 on_error:
 
+#ifndef _WIN32
+	if (hModule) {
+		FreeLibrary(hModule);
+		hModule = NULL;
+	}
+	if (g_module) {
+		delete g_module;
+		g_module = nullptr;
+	}
+#endif //NOT _WIN32
+
 	return Error;
 }
 

+ 18 - 3
spbase/SpModule.cpp

@@ -16,7 +16,10 @@
 
 #include <winpr/thread.h>
 
-SpModule::SpModule( sp_mod_t *mod, sp_cfg_shell_module_t *cfg_mod ) : m_arrEntity(NULL), m_iom(NULL), m_mod(mod), 
+SpModule::SpModule( sp_mod_t *mod, sp_cfg_shell_module_t *cfg_mod ) 
+	: m_arrEntity(NULL)
+	, m_iom(NULL)
+	, m_mod(mod), 
 m_anonymous_log(NULL), m_stub(NULL), m_cfg_mod(cfg_mod)
 {
 	m_dwEntityTls = TlsAlloc();
@@ -61,8 +64,8 @@ void SpModule::LogMessage( const LogTypeEnum LogType, const SeverityLevelEnum Le
 
 ErrorCodeEnum SpModule::Init( const char *url )
 {
-	sp_env_t *env = sp_get_env();
-	void *shm_addr = env->shm_addr;
+	//sp_env_t *env = sp_get_env();
+	//void *shm_addr = env->shm_addr;
 	int rc;
 
 	m_arrEntity = array_make(0, sizeof(SpEntity*));
@@ -94,6 +97,18 @@ VOID SpModule::Term()
 
 	RemoveEntityBase(NULL); // remove all
 
+	if (m_anonymous_log) {
+		Dbg("destroy log client.");
+		sp_log_client_destroy(m_anonymous_log);
+		m_anonymous_log = NULL;
+	}
+
+	if (m_iom) {
+		Dbg("destroy iom.");
+		sp_iom_destroy(m_iom);
+		m_iom = NULL;
+	}
+
 	array_free(m_arrEntity);
 }
 

+ 2 - 0
spbase/sp_env.h

@@ -26,6 +26,8 @@ typedef struct sp_env_t {
 } sp_env_t;
 
 SPBASE_API int sp_env_create(void *hint_addr, int range, sp_env_t **p_env);
+SPBASE_API void sp_env_destroy(sp_env_t* env);
+
 int sp_env_new_id(sp_env_t *env);
 
 SPBASE_API sp_env_t *sp_get_env();

+ 43 - 8
spbase/sp_iom.c

@@ -10,6 +10,21 @@
 #include "memutil.h"
 #include <time.h>
 
+#ifndef _WIN32
+#include <unistd.h>
+#include <sys/syscall.h>
+
+static int GetParentProcessID()
+{
+#if 0
+	return getppid();
+#else
+	return (int)syscall(SYS_getppid);
+#endif
+}
+
+#endif //NOT _WIN32
+
 #define POLL_INTERVAL	10
 
 #define IOM_T_EXIT			        0
@@ -373,25 +388,45 @@ static int sp_iom_poll(sp_iom_t *iom, int *timeout)
 int sp_iom_run(sp_iom_t *iom)
 {
 	int timeout = POLL_INTERVAL;
-	while (
 #ifdef _WIN32
-		InterlockedExchangeAdd((LONG*)&iom->stop, 0) == 0
+	while (InterlockedExchangeAdd((LONG*)&iom->stop, 0) == 0 || timer_queue_get_count(iom->tm_queue) > 0) {
+		int rc = sp_iom_poll(iom, &timeout);
+		if (rc >= 0) {
+			if (timeout > POLL_INTERVAL || timeout < 0)
+				timeout = POLL_INTERVAL;
+		}
+		else {
+			sp_dbg_debug("iom poll failed!");
+			ExitProcess(-1);
+			return rc;
+		}
+	}
+
 #else
-		/*the adapte func 'InterlockedExchangeAdd' implemented under winpr went wrong, maybe 64bit has responsibility*/
-		iom->stop == 0
-#endif //_WIN32
-		|| timer_queue_get_count(iom->tm_queue) > 0) 
-{
+	int parent_id = GetParentProcessID();
+	/*the adapte func 'InterlockedExchangeAdd' implemented under winpr went wrong, maybe 64bit has responsibility*/
+	while (iom->stop == 0 || timer_queue_get_count(iom->tm_queue) > 0) {
 		int rc = sp_iom_poll(iom, &timeout);
 		if (rc >= 0) {
 			if (timeout > POLL_INTERVAL || timeout < 0)
 				timeout = POLL_INTERVAL;
-		} else {
+		}
+		else {
 			sp_dbg_debug("iom poll failed!");
 			ExitProcess(-1);
 			return rc;
 		}
+		if (rc == 0) {
+			const int cur_ppid = GetParentProcessID();
+			if (cur_ppid != parent_id && cur_ppid == 1 /*init's pid*/) {
+				sp_dbg_warn("module process ? spshell has gone ?");
+				parent_id == cur_ppid; /*just test, or would print lots of log here.*/
+			}
+		}
 	}
+
+#endif //_WIN32
+
 	sp_dbg_debug("iom run exit ok!");
 	return 0;
 }

+ 23 - 6
spbase/sp_mod.c

@@ -1761,7 +1761,7 @@ static int unload_module(sp_mod_mgr_t *mgr, sp_mod_t *mod, int trigger_entity_id
 
 	if (rc == 0) {
 		int removed = 0;
-
+		/*TODO: the clear job depend on different result seems no different.*/
 		for (;;) {
 			HANDLE hs[] = {mod->evt_app_exit, mod->evt_wait_handle};
 			DWORD dwRet = WaitForMultipleObjects(array_size(hs), &hs[0], FALSE, PROCESS_TIMEOUT);
@@ -1771,7 +1771,7 @@ static int unload_module(sp_mod_mgr_t *mgr, sp_mod_t *mod, int trigger_entity_id
 				ResetEvent(mod->evt_wait_handle);
 				if (mod->wait_result != 0) {
 					HANDLE hprocess = mod->process.handle;
-					if (hprocess && process_monitor_remove(mgr->process_monitor, hprocess) == 0) {
+					if (hprocess && process_monitor_remove(mgr->process_monitor, &mod->process) == 0) {
 						removed = 1;
 #ifdef _WIN32
 						killModByPipe(findGroupProcessInfo(mod->cfg->group, mod->cfg->name), mod->cfg->name);
@@ -1784,7 +1784,7 @@ static int unload_module(sp_mod_mgr_t *mgr, sp_mod_t *mod, int trigger_entity_id
 					}
 				} else {
 					HANDLE hprocess = mod->process.handle;
-					if (hprocess && process_monitor_remove(mgr->process_monitor, hprocess) == 0) {
+					if (hprocess && process_monitor_remove(mgr->process_monitor, &mod->process) == 0) {
 						DWORD tmp;
 						removed = 1;
 						tmp = WaitForSingleObject(hprocess, PROCESS_EXIT_TIMEOUT);
@@ -1803,7 +1803,7 @@ static int unload_module(sp_mod_mgr_t *mgr, sp_mod_t *mod, int trigger_entity_id
 				}
 			} else {
 				HANDLE hprocess = mod->process.handle;
-				if (hprocess && process_monitor_remove(mgr->process_monitor, hprocess) == 0) {
+				if (hprocess && process_monitor_remove(mgr->process_monitor, &mod->process) == 0) {
 					removed = 1;
 #ifdef _WIN32
 					killModByPipe(findGroupProcessInfo(mod->cfg->group, mod->cfg->name), mod->cfg->name);
@@ -1853,6 +1853,7 @@ static int SpTerminateProcess(sp_mod_t* mod)
 }
 #endif //_WIN32
 
+/*remove from monitor and kill it directory!!*/
 static int terminate_module(sp_mod_mgr_t *mgr, sp_mod_t *mod, int trigger_entity_id)
 {
 	int rc = 0;
@@ -1865,7 +1866,7 @@ static int terminate_module(sp_mod_mgr_t *mgr, sp_mod_t *mod, int trigger_entity
 #endif //_WIN32
 	if (mod->state) {
 		if (mod->process.handle) {
-			if (process_monitor_remove(mgr->process_monitor, mod->process.handle) == 0) {
+			if (process_monitor_remove(mgr->process_monitor, &mod->process) == 0) {
 #ifdef _WIN32
 				SpTerminateProcess(mod);
 #else
@@ -1984,6 +1985,7 @@ static int start_entity(sp_mod_mgr_t *mgr, sp_entity_t *ent, const char *cmdline
 #else
 		sp_mod_mgr_lock(mgr);
 #endif //_WIN32
+
 		if (rc == 0) {
 			ent->state = EntityState_Idle;
 			ent->state_start_time = y2k_time_now();
@@ -2370,6 +2372,21 @@ int sp_mod_mgr_lost_entity(sp_mod_mgr_t *mgr, int entity_id, int trigger_entity_
 	return Error_Succeed;
 }
 
+int sp_mod_mgr_terminate_all_entity(sp_mod_mgr_t* mgr, int trigger_entity_id)
+{
+	int i;
+	int rc = 0;
+	assert(mgr);
+	for (i = 1; i < mgr->arr_ent->nelts; ++i) {
+		int res;
+		sp_entity_t* ent = ARRAY_IDX(mgr->arr_ent, i, sp_entity_t*);
+		res = sp_mod_mgr_terminate_entity2(mgr, ent, trigger_entity_id);
+		if (res != 0)
+			rc = res;
+	}
+	return rc;
+}
+
 int sp_mod_mgr_start_entity2(sp_mod_mgr_t *mgr, sp_entity_t *ent, const char *cmdline, int trigger_entity_id)
 {
 	int rc = 0;
@@ -2394,6 +2411,7 @@ int sp_mod_mgr_start_entity2(sp_mod_mgr_t *mgr, sp_entity_t *ent, const char *cm
 	return rc;
 }
 
+/*send {MOD_CMD_STOP} cmd and then {MOD_CMD_TERM} cmd*/
 int sp_mod_mgr_stop_entity2(sp_mod_mgr_t *mgr, sp_entity_t *ent, int trigger_entity_id, int cause_code)
 {
 	int rc;
@@ -2438,7 +2456,6 @@ int sp_mod_mgr_terminate_entity2(sp_mod_mgr_t *mgr, sp_entity_t *ent, int trigge
 	} else {
 		rc = Error_InvalidState;
 	}
-
 	return rc;
 }
 

+ 3 - 0
spbase/sp_mod.h

@@ -177,6 +177,9 @@ SPBASE_API int sp_mod_mgr_continue_entity(sp_mod_mgr_t *mgr, int entity_id, int
 SPBASE_API int sp_mod_mgr_test_entity(sp_mod_mgr_t *mgr, int entity_id, int trigger_entity_id, int test_type);
 SPBASE_API int sp_mod_mgr_lost_entity(sp_mod_mgr_t *mgr, int entity_id, int trigger_entity_id);
 
+SPBASE_API int sp_mod_mgr_terminate_all_entity(sp_mod_mgr_t* mgr, int trigger_entity_id);
+
+
 int sp_mod_mgr_terminate_entity2(sp_mod_mgr_t *mgr, sp_entity_t *entity, int trigger_entity_id);
 int sp_mod_mgr_start_entity2(sp_mod_mgr_t *mgr, sp_entity_t *entity, const char *cmdline, int trigger_entity_id);
 int sp_mod_mgr_stop_entity2(sp_mod_mgr_t *mgr, sp_entity_t *entity, int trigger_entity_id, int cause_code);

+ 2 - 11
spbase/sp_pst.c

@@ -582,12 +582,7 @@ void sp_pst_recover(const char *base_dir)
 	char szObject[MAX_PATH];
 	WIN32_FIND_DATAA fd;
 	int rc = 0;
-
-	#ifdef _WIN32
-	sprintf(szObject, "%s\\*", base_dir);
-    #else
-	sprintf(szObject, "%s/*", base_dir);
-	#endif
+	sprintf(szObject, "%s" SPLIT_SLASH_STR "*", base_dir);
 	hFind = FindFirstFileA(szObject, &fd);
 	if (hFind != INVALID_HANDLE_VALUE) {
 		do {
@@ -600,11 +595,7 @@ void sp_pst_recover(const char *base_dir)
 					strcpy(szEntity, szObject);
 					szEntity[strlen(szEntity)-1] = 0;
 					strcat(szEntity, fd.cFileName);
-					#ifdef _WIN32
-					strcat(szEntity, "\\*");
-                    #else
-					strcat(szEntity, "/*");
-					#endif
+					strcat(szEntity, SPLIT_SLASH_STR "*");
 					hChildFind = FindFirstFileA(szEntity, &fdChild);
 					if (hChildFind != INVALID_HANDLE_VALUE) {
 						do {

+ 2 - 1
spbase/sp_rpc.c

@@ -123,7 +123,8 @@ int sp_rpc_server_start(sp_rpc_server_t *server)
 
 int sp_rpc_server_stop(sp_rpc_server_t *server)
 {
-	if (!server->stop)
+	// BugFix [4/5/2020 11:55 Gifur]
+	if (/*!*/server->stop)
 		return Error_Bug;
 
 	server->stop = 1;

+ 14 - 2
spbase/sp_shm.c

@@ -10,6 +10,7 @@
 
 #define GET_ADDRESS(x) ((void*)((1+(x)) << 28))
 
+static int g_newcreator = -1;
 
 int sp_shm_get_range(int mask)
 {
@@ -44,8 +45,11 @@ void *sp_shm_init(int range, int newcreator)
 	for (i = 0; i < 7; ++i) {
 		if (range & (1 << (i+1))) {
 			address = GET_ADDRESS(i);
-			if (shm_mem_init2(MEM_KEY, address, newcreator) == 0)
+			if (shm_mem_init2(MEM_KEY, address, newcreator) == 0) {
+				if (address != NULL)
+					g_newcreator = newcreator;
 				return address;
+			}
 		}
 	}
 	return NULL;
@@ -54,6 +58,14 @@ void *sp_shm_init(int range, int newcreator)
 void sp_shm_term()
 {
 	/*we don't know why, this has been annotated at windows platform*/
-	shm_mem_destroy();
+	if(sp_shm_is_newalloc())
+		shm_mem_destroy();
+}
+
+int sp_shm_is_newalloc()
+{
+	if (g_newcreator == -1)
+		return 0;
+	return g_newcreator;
 }
 

+ 1 - 0
spbase/sp_shm.h

@@ -11,6 +11,7 @@ extern "C" {
 SPBASE_API int sp_shm_get_range(int mask);
 SPBASE_API void *sp_shm_init(int range, int newcreator);
 SPBASE_API void sp_shm_term();
+SPBASE_API int sp_shm_is_newalloc();
 
 #ifdef __cplusplus
 } // extern "C" {

+ 9 - 11
sphost/sphost.c

@@ -132,12 +132,13 @@ DWORD modRun(LPVOID param)
 	return 0;
 }
 
-DWORD modExit(LPVOID param)
-{
-	mod_exitInfo *curModInfo = (mod_exitInfo*)param;
-	SpExit(curModInfo->mod_name);
-	return 0;
-}
+/*It seems no use anywhere*/
+//DWORD modExit(LPVOID param)
+//{
+//	mod_exitInfo *curModInfo = (mod_exitInfo*)param;
+//	SpExit(curModInfo->mod_name);
+//	return 0;
+//}
 
 DWORD runThreadAndMonitor(LPVOID param)
 {
@@ -229,7 +230,7 @@ int main(int argc, char *argv[])
 #else
 	if (NULL == setlocale(LC_ALL, "zh_CN.UTF-8")) {
 		//zh_CN.UTF-8    zh_CN.GBK
-		sp_dbg_error("setlocale failed: %s", strerror(errno));
+		printf("setlocale failed: %s", strerror(errno));
 	}
 #endif //_WIN32
 
@@ -268,10 +269,7 @@ int main(int argc, char *argv[])
 			curModInfo->range = atoi(dstParam[3]);
 			strcpy(curModInfo->modMutexName, dstParam[4]);
 			curModInfo->group = atoi((dstParam[5]));
-			// 
-			// 			if (!strcmp("mod_Initializer", curModInfo->mod_name))
-			// 				MessageBox(0, curModInfo->mod_name, 0, 0);
-						//MessageBox(0, curModInfo->mod_name, 0, 0);
+			/*TODO: can use Dbg before SpRun invoke bcz dbg init inner.*/
 			Dbg("query Entity %s start, epid:%d, range:%d, group:%d", curModInfo->mod_name, curModInfo->epid, curModInfo->range, curModInfo->group);
 			CloseHandle(CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)&runThreadAndMonitor, curModInfo, 0, &curModInfo->threadId));
 

+ 102 - 7
spshell/app.cpp

@@ -89,7 +89,7 @@ static unsigned int __stdcall __startlist_proc(void *param)
 
 	if (!bAllSuc) {
 		sp_dbg_error("======================================================");
-		sp_dbg_error("!!!!!! 部分实体启动失败,请检查dbg\\spshell日志排除故障 !!!!!!");
+		sp_dbg_error("!!!!!! a few of entities started up failed! please get more detail from dbg\\spshell !!!!!!");
 		sp_dbg_error("======================================================");
 		Sleep(10000);
 		app_t *app = get_app_instance();
@@ -258,7 +258,6 @@ int app_init()
 #endif //_WIN32
 
 	urls[0] = env->url;
-#if 1
 	rc = bus_daemon_create(array_size(urls), urls, 2, &g_app.bus_daemon);
 	if (rc != 0) {
 		sp_dbg_error("create bus deamon failed!");
@@ -270,7 +269,6 @@ int app_init()
 		return rc;
 	}
 	sp_dbg_info("start bus daemon ok!");
-#endif
 
 	rc = sp_iom_create(env->url, SP_SHELL_MOD_ID, &g_app.iom);
 	if (rc != 0) {
@@ -286,7 +284,7 @@ int app_init()
 	}
 	sp_dbg_info("create svc ok!");
 
-	rc = sp_svc_start(g_app.svc);	//threadpool 任务池
+	rc = sp_svc_start(g_app.svc);	//threadpool
 	if (rc != 0) {
 		sp_dbg_error("start svc failed!");
 		return rc;
@@ -330,6 +328,7 @@ int app_init()
 
 	callback.on_info = &on_info;
 	callback.on_req = &on_req;
+	callback.on_destroy = NULL;
 	callback.user_data = NULL;
 	rc = sp_rpc_server_create(g_app.svc, &callback, &g_app.rpc_server);
 	if (rc != 0) {
@@ -353,7 +352,7 @@ int app_init()
 		sp_dbg_error("start silverlight policy server failed!");
 		return rc;
 	}
-	sp_dbg_info("create silverlight policy server ok!");
+	sp_dbg_info("create and start silverlight policy server ok!");
 #endif //_WIN32
 
 	//launch limited startup module processes
@@ -390,16 +389,112 @@ int app_init()
 
 int app_term()
 {
-#ifdef _WIN32
+	int rc;
+
+#ifdef _DEBUG
 	if (g_app.pConsole != NULL) {
 		g_app.pConsole->StopListen();
+		//delete g_app.pConsole;
 		g_app.pConsole = NULL;
 	}
+#endif //_DEBUG
+
+	/*kill all children process ?*/
+	rc = sp_mod_mgr_terminate_all_entity(sp_get_env()->mod_mgr, 0/*SP_SHELL_SVC_ID*/);
+	if (0 != rc) {
+		sp_dbg_error("terminate all entity failed! %d", rc);
+
+		sp_shm_term();
+		winsock_term();
+		return 0;
+	}
+	sp_dbg_info("terminate all entity ok!");
+
+#ifdef _WIN32
+	sp_dbg_info("stop silverlight policy server.");
+	sp_sps_stop(g_app.sps);
+	
+	sp_dbg_info("destroy silverlight policy server.");
+	sp_sps_destroy(g_app.sps);
+	g_app.sps = NULL;
+
 #endif //_WIN32
 
+	sp_dbg_info("stop rpc server.");
+	rc = sp_rpc_server_stop(g_app.rpc_server);
+	if (rc != 0) {
+		sp_dbg_error("stop rpc server failed! %d", rc);
+	}
+	else {
+		sp_dbg_info("stop rpc server ok!");
+	}
+	sp_dbg_info("destroy rpc server.");
+	sp_rpc_server_destroy(g_app.rpc_server);
+	g_app.rpc_server = NULL;
+
+	sp_dbg_info("destroy bcm daemon.");
+	sp_bcm_daemon_destroy(g_app.bcm_daemon);
+	g_app.bcm_daemon = NULL;
+
+	sp_dbg_info("destroy var daemon.");
+	sp_var_daemon_destroy(g_app.var_daemon);
+	g_app.var_daemon = NULL;
+
+	sp_dbg_info("destroy log daemon.");
 	log_destroy(g_app.log);
 
-	sp_dbg_term();
+	sp_dbg_info("term mod mgr.");
+	sp_mod_mgr_term(sp_get_env()->mod_mgr);
+
+	sp_dbg_info("stop svc.");
+	rc = sp_svc_stop(g_app.svc);
+	if (rc != 0) {
+		sp_dbg_error("stop svc failed!");
+	}
+	else {
+		sp_dbg_info("stop svc ok!");
+	}
+
+	sp_svc_destroy(g_app.svc); /*destroy iom and bus endpt*/
+	g_app.svc = NULL;
+	g_app.iom = NULL;
+
+	/*svc would destroy it.*/
+	//sp_dbg_info("destroy iom.");
+	//sp_iom_destroy(g_app.iom);
+
+	sp_dbg_info("stop bus daemon.");
+	rc = bus_daemon_stop(g_app.bus_daemon); /*destroy ioqueue*/
+	if (rc != 0) {
+		sp_dbg_error("stop bus daemon failed!");
+	}
+	else {
+		sp_dbg_info("stop bus daemon ok!");
+	}
+
+	sp_dbg_info("destroy bus daemon.");
+	rc = bus_daemon_destroy(g_app.bus_daemon);
+	if (rc != 0) {
+		sp_dbg_error("destroy bus daemon failed!");
+	}
+	else {
+		sp_dbg_info("destroy bus daemon ok!");
+	}
+	g_app.bus_daemon = NULL;
+
+#ifdef _WIN32
+	if (g_app.bsc_gui != NULL) {
+		sp_dbg_info("destroy gui.");
+		sp_gui_destroy(g_app.bsc_gui);
+		g_app.bsc_gui = NULL;
+	}
+#endif
+
+	sp_dbg_info("destroy env.");
+	sp_env_destroy(sp_get_env());
+
+	sp_dbg_info("destroy share memory.");
+	sp_shm_term();
 
 	winsock_term();
 

+ 4 - 1
spshell/spshell.cpp

@@ -767,6 +767,7 @@ int main(int argc, char **argv)
 	{
 		sp_dbg_fatal("detect duplicate spshell/sphost process, abort cur boot !!!");
 		Sleep(10000);
+		sp_dbg_term();
 		return -200;
 	}	
 
@@ -779,6 +780,7 @@ int main(int argc, char **argv)
 #endif //_WIN32
 		
 		Sleep(10000);
+		sp_dbg_term();
 		return -201;
 	} else {
 		sp_dbg_info("current process has been run with administrator or root privilege.");
@@ -864,7 +866,7 @@ int main(int argc, char **argv)
 	else
 	{
 		sp_dbg_error("======================================================");
-		sp_dbg_error("!!!!!! 启动失败,请检查dbg\\spshell日志排除故障 !!!!!!");
+		sp_dbg_error("!!!!!! Startup failed, get more detail information from dbg\\spshell !!!!!!");
 		sp_dbg_error("======================================================");
 
 		Sleep(10000);
@@ -880,6 +882,7 @@ int main(int argc, char **argv)
 
 #endif //_WIN32
 
+	sp_dbg_term();
 	return rc;
 }
 

+ 1 - 1
winpr/libwinpr/thread/process.c

@@ -468,7 +468,7 @@ BOOL TerminateProcess(HANDLE hProcess, UINT uExitCode)
 	if (kill(process->pid, SIGTERM))
 		return FALSE;
 
-	WLog_WARN(TAG, "%s: uExitCode not used!", __FUNCTION__);
+	WLog_WARN(TAG, "%s: uExitCode not actived for now!", __FUNCTION__);
 	return TRUE;
 }
 

+ 81 - 4
winpr/libwinpr/winsock/winsock.c

@@ -20,7 +20,7 @@
 #ifdef HAVE_CONFIG_H
 #include "config.h"
 #endif
-
+#include <assert.h>
 #include <winpr/crt.h>
 #include <winpr/synch.h>
 
@@ -296,6 +296,7 @@ INT winpr_inet_pton(INT Family, PCSTR pszAddrString, PVOID pAddrBuf)
 #include <netinet/in.h>
 #include <netinet/tcp.h>
 #include <net/if.h>
+#include <pthread.h>
 
 #include "../log.h"
 #define TAG WINPR_TAG("winsock")
@@ -304,16 +305,75 @@ INT winpr_inet_pton(INT Family, PCSTR pszAddrString, PVOID pAddrBuf)
 #define MSG_NOSIGNAL 0
 #endif
 
+static char* socked_list = NULL;
+volatile unsigned int socked_max_cnt = 0;
+
+static pthread_once_t wsa_is_initialized = PTHREAD_ONCE_INIT;
+static pthread_once_t wsa_is_uninitialized = PTHREAD_ONCE_INIT;
+
+static int WSAInitialize()
+{
+	WLog_DBG(TAG, "==> function: %s", __FUNCTION__);
+#ifdef F_MAXFD // on some BSD derivates
+	socked_max_cnt = fcntl(0, F_MAXFD);
+#else
+	socked_max_cnt = sysconf(_SC_OPEN_MAX);
+#endif
+	if (socked_max_cnt > 0) {
+		assert(socked_max_cnt < INT_MAX);
+		socked_list = malloc(sizeof(char) * socked_max_cnt);
+	}
+	if (socked_list != NULL)
+		memset(socked_list, '\0', sizeof(char) * socked_max_cnt);
+	else
+		socked_max_cnt = 0;
+	WLog_DBG(TAG, "<== function: %s %d", __FUNCTION__, socked_max_cnt);
+}
+
+static int WSAUnitialize()
+{
+	WLog_DBG(TAG, "==> function: %s", __FUNCTION__);
+	if (socked_max_cnt > 0) {
+		assert(socked_list != NULL);
+		int fd;
+		for (fd = 3; fd < socked_max_cnt; fd++) {
+			if (socked_list[fd] != '\0') {
+				if (0 == close(fd)) {
+					WLog_DBG(TAG, "close socket fd(%d)", fd);
+					socked_list[fd] = '\0';
+				}
+			}
+		}
+		free(socked_list);
+		socked_list = NULL;
+		socked_max_cnt = 0;
+	}
+	WLog_DBG(TAG, "<== function: %s", __FUNCTION__);
+}
+
 int WSAStartup(WORD wVersionRequired, LPWSADATA lpWSAData)
 {
 	ZeroMemory(lpWSAData, sizeof(WSADATA));
 	lpWSAData->wVersion = wVersionRequired;
 	lpWSAData->wHighVersion = MAKEWORD(2, 2);
+
+	if (0 != pthread_once(&wsa_is_initialized, WSAInitialize)) {
+		WLog_ERR(TAG, "pthread_once failed for WSAInitialize: %d", errno);
+	}
+	else {
+		WLog_DBG(TAG, "pthread_once for WSAInitialize succ.");
+	}
 	return 0; /* success */
 }
 
 int WSACleanup(void)
 {
+	if (0 != pthread_once(&wsa_is_uninitialized, WSAUnitialize)) {
+		WLog_ERR(TAG, "pthread_once failed for WSAUnitialize: %d", errno);
+	}
+	else {
+		WLog_DBG(TAG, "pthread_once for WSAUnitialize succ.");
+	}
 	return 0; /* success */
 }
 
@@ -1074,6 +1134,9 @@ SOCKET _accept(SOCKET s, struct sockaddr* addr, int* addrlen)
 	socklen_t s_addrlen = (socklen_t)*addrlen;
 	status = accept(fd, addr, &s_addrlen);
 	*addrlen = (socklen_t)s_addrlen;
+	if (status > 0 && socked_list != NULL) {
+		socked_list[status] = '-';
+	}
 	return status;
 }
 
@@ -1094,6 +1157,9 @@ int closesocket(SOCKET s)
 	int status;
 	int fd = (int)s;
 	status = close(fd);
+	if (fd > 0 && status == 0 && socked_list != NULL) {
+		socked_list[fd] = '\0';
+	}
 	return status;
 }
 
@@ -1105,7 +1171,9 @@ int _connect(SOCKET s, const struct sockaddr* name, int namelen)
 
 	if (status < 0)
 		return SOCKET_ERROR;
-
+	if (socked_list != NULL) {
+		socked_list[status] = '-';
+	}
 	return status;
 }
 
@@ -1206,7 +1274,10 @@ int _recv(SOCKET s, char* buf, int len, int flags)
 {
 	int status;
 	int fd = (int)s;
-	status = (int)recv(fd, (void*)buf, (size_t)len, flags);
+	do 
+	{
+		status = (int)recv(fd, (void*)buf, (size_t)len, flags);
+	} while (status < 0 && errno == EINTR);
 	return status;
 }
 
@@ -1238,7 +1309,10 @@ int _send(SOCKET s, const char* buf, int len, int flags)
 	int status;
 	int fd = (int)s;
 	flags |= MSG_NOSIGNAL;
-	status = (int)send(fd, (void*)buf, (size_t)len, flags);
+	do 
+	{
+		status = (int)send(fd, (void*)buf, (size_t)len, flags);
+	} while ((status < 0) && (errno == EINTR));
 	return status;
 }
 
@@ -1296,6 +1370,9 @@ SOCKET _socket(int af, int type, int protocol)
 		fprintf(stderr, "socket failed: %s\n", strerror(errno));
 		return INVALID_SOCKET;
 	}
+	if (socked_list != NULL) {
+		socked_list[fd] = '-';
+	}
 	s = (SOCKET)fd;
 	return s;
 }