|
|
@@ -518,33 +518,31 @@ static int pre_dispatch_network(BOOL* ret, DWORD* dwBytesTransfer, ioqueue_overl
|
|
|
ioqueue_recvn_overlapped_t* overlapped = (ioqueue_recvn_overlapped_t*)io_ctx;
|
|
|
ioqueue_tcpsock_t* tcpsock = overlapped->base.handle_ctx;
|
|
|
n = 0;
|
|
|
- for (; n < overlapped->wsabuf.len;) {
|
|
|
- nread = read(tcpsock->u.sock, overlapped->wsabuf.buf + n, overlapped->wsabuf.len - n);
|
|
|
- if (nread < 0) {
|
|
|
- if (errno == EAGAIN) {
|
|
|
- /* that means we have read all data. So go back to the main loop. */
|
|
|
- nread = 0;
|
|
|
- }
|
|
|
- else if (errno == EINTR) {
|
|
|
- continue;
|
|
|
- }
|
|
|
- else {
|
|
|
- WLog_ERR(TAG, "read error: %d", errno);
|
|
|
- }
|
|
|
- break;
|
|
|
+ for (; n < overlapped->wsabuf.len;)
|
|
|
+ {
|
|
|
+ do
|
|
|
+ {
|
|
|
+ nread = _recv(tcpsock->u.sock, overlapped->wsabuf.buf + n, overlapped->wsabuf.len - n, 0);
|
|
|
+ } while (nread < 0 && errno == EINTR);
|
|
|
+ if (nread > 0) {
|
|
|
+ n += nread;
|
|
|
+ continue;
|
|
|
}
|
|
|
- else if (nread == 0) {
|
|
|
+ if (nread == 0) {
|
|
|
+ /*remote socket break*/
|
|
|
+ WLog_WARN(TAG, "remote socket exit.");
|
|
|
+ } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
|
|
+ nread = 0;
|
|
|
+ break;
|
|
|
+ } else {
|
|
|
+ WLog_ERR(TAG, "read error: %s(%d)", strerror(errno), errno);
|
|
|
break;
|
|
|
}
|
|
|
- else {
|
|
|
- n += nread;
|
|
|
- }
|
|
|
- }
|
|
|
- if (nread < 0) {
|
|
|
- WLog_WARN(TAG, "read op occurs exception");
|
|
|
}
|
|
|
WLog_INFO(TAG, "<OV_RECVN>: total_recv: %d/%d, cur_recv: %d", n, overlapped->wsabuf.len, nread);
|
|
|
- if (ret) *ret = nread < 0 ? FALSE : TRUE;
|
|
|
+ if (ret) {
|
|
|
+ *ret = (n == 0 && n <= nread) ? FALSE : TRUE;
|
|
|
+ }
|
|
|
if (dwBytesTransfer) *dwBytesTransfer = n;
|
|
|
err = 1;
|
|
|
}
|
|
|
@@ -630,6 +628,7 @@ static void dispatch_network(BOOL ret, DWORD dwBytesTransfer, ioqueue_overlapped
|
|
|
overlapped->base.ov.InternalHigh = 0;
|
|
|
overlapped->base.ov.Offset = 0;
|
|
|
overlapped->base.ov.OffsetHigh = 0;
|
|
|
+ ioqueue_overlapped_set_mask(io_ctx, sizeof(ioqueue_sendn_overlapped_t));
|
|
|
WLog_WARN(TAG, "OV_SENDN: Must be due with this situation %d < %d", overlapped->sended_bytes, overlapped->total_bytes);
|
|
|
evtpoll_subscribe(ioqueue_tcpsock_get_owned_ioqueue(tcpsock)->ep, EV_WRITE, tcpsock->u.sock, NULL, io_ctx);
|
|
|
//if (rc != 0 && WSAGetLastError() != WSA_IO_PENDING) {
|
|
|
@@ -673,6 +672,7 @@ static void dispatch_network(BOOL ret, DWORD dwBytesTransfer, ioqueue_overlapped
|
|
|
overlapped->base.ov.Offset = 0;
|
|
|
overlapped->base.ov.OffsetHigh = 0;
|
|
|
overlapped->dwFlags = 0;
|
|
|
+ ioqueue_overlapped_set_mask(io_ctx, sizeof(ioqueue_recvuntil_overlapped_t));
|
|
|
rc = WSARecv(handle_ctx->u.sock, &overlapped->wsabuf, 1, &bytesRead, &overlapped->dwFlags,
|
|
|
&overlapped->base.ov, NULL);
|
|
|
if (rc != 0 && WSAGetLastError() != WSA_IO_PENDING) {
|
|
|
@@ -706,6 +706,7 @@ static void dispatch_network(BOOL ret, DWORD dwBytesTransfer, ioqueue_overlapped
|
|
|
overlapped->base.ov.Offset = 0;
|
|
|
overlapped->base.ov.OffsetHigh = 0;
|
|
|
overlapped->dwFlags = 0;
|
|
|
+ ioqueue_overlapped_set_mask(io_ctx, sizeof(ioqueue_recvn_overlapped_t));
|
|
|
WLog_WARN(TAG, "OV_RECVN: Must be due with this situation %d < %d", overlapped->recved_bytes, overlapped->total_bytes);
|
|
|
evtpoll_subscribe(ioqueue_tcpsock_get_owned_ioqueue(tcpsock)->ep, EV_READ, tcpsock->u.sock, io_ctx, NULL);
|
|
|
//if (rc != 0 && WSAGetLastError() != WSA_IO_PENDING) {
|
|
|
@@ -753,6 +754,7 @@ static void dispatch_network(BOOL ret, DWORD dwBytesTransfer, ioqueue_overlapped
|
|
|
overlapped->base.ov.Offset += dwBytesTransfer;
|
|
|
if (overlapped->base.ov.Offset < dwBytesTransfer)
|
|
|
overlapped->base.ov.OffsetHigh += 1;
|
|
|
+ ioqueue_overlapped_set_mask(io_ctx, sizeof(ioqueue_readfilen_overlapped_t));
|
|
|
ret = ReadFile(handle_ctx->u.file, overlapped->buf+overlapped->recved_bytes, left, NULL, &overlapped->base.ov);
|
|
|
if (!ret && GetLastError() != ERROR_IO_PENDING) {
|
|
|
dec_pending_io(handle_ctx);
|
|
|
@@ -787,6 +789,7 @@ static void dispatch_network(BOOL ret, DWORD dwBytesTransfer, ioqueue_overlapped
|
|
|
overlapped->base.ov.Offset += dwBytesTransfer;
|
|
|
if (overlapped->base.ov.Offset < dwBytesTransfer)
|
|
|
overlapped->base.ov.OffsetHigh += 1;
|
|
|
+ ioqueue_overlapped_set_mask(io_ctx, sizeof(ioqueue_writefilen_overlapped_t));
|
|
|
ret = WriteFile(handle_ctx->u.file, overlapped->buf+overlapped->sended_bytes, left, NULL, &overlapped->base.ov);
|
|
|
if (!ret && GetLastError() != ERROR_IO_PENDING) {
|
|
|
dec_pending_io(handle_ctx);
|
|
|
@@ -928,6 +931,7 @@ TOOLKIT_API int ioqueue_poll(ioqueue_t* q, int timeout)
|
|
|
static int flag = 0;
|
|
|
bus_daemon_t* deamon;
|
|
|
struct epoll_event events[MAX_EPOLL_EVENT];
|
|
|
+ struct epoll_event* pe = NULL;
|
|
|
deamon = (bus_daemon_t*)ioqueue_get_user_data(ioq);
|
|
|
assert(deamon != NULL);
|
|
|
/* network and msg, dispatch until no events */
|
|
|
@@ -938,22 +942,22 @@ TOOLKIT_API int ioqueue_poll(ioqueue_t* q, int timeout)
|
|
|
//有时会出现惊群的问题!!
|
|
|
int epfd = evtpoll_get_epoll_fd(ioq->ep);
|
|
|
nfds = epoll_wait(epfd, events, MAX_EPOLL_EVENT, t ? 0 : timeout);
|
|
|
- if ((nfds < 0 && EINTR != errno) || nfds == 0) {
|
|
|
+ if ((nfds < 0 && EINTR == errno) || nfds == 0) {
|
|
|
t = 0;
|
|
|
}
|
|
|
for (i = 0; i < nfds; ++i) {
|
|
|
int n = 1;
|
|
|
- WLog_INFO(TAG, "poll events[%d] OUT:%d, IN:%d", i
|
|
|
- ,events[i].events & EPOLLOUT ? 1: 0
|
|
|
- ,events[i].events & EPOLLIN ? 1: 0);
|
|
|
+ pe = events + i;
|
|
|
+ WLog_INFO(TAG, "poll events[%d]::fd(0x%08X) OUT:%d, IN:%d", i, pe->data.fd
|
|
|
+ , pe->events & EPOLLOUT ? 1: 0 ,pe->events & EPOLLIN ? 1: 0);
|
|
|
|
|
|
- if (events[i].events & EPOLLOUT) {
|
|
|
+ if (pe->events & EPOLLOUT) {
|
|
|
flag = 1;
|
|
|
}
|
|
|
|
|
|
while (n >= 0) {
|
|
|
ioqueue_overlapped_t* io_ctx = NULL;
|
|
|
- n = evtpoll_deal(ioq->ep, &events[i], &io_ctx);
|
|
|
+ n = evtpoll_deal(ioq->ep, pe, &io_ctx);
|
|
|
if (n > 0) {
|
|
|
assert(io_ctx);
|
|
|
pre_dispatch_network(&ret, &dwBytesTransfer, io_ctx, epfd);
|
|
|
@@ -1064,8 +1068,10 @@ TOOLKIT_API int ioqueue_acceptor_create(ioqueue_t *ioq,
|
|
|
service.sin_family = AF_INET;
|
|
|
service.sin_port = htons(port);
|
|
|
service.sin_addr.s_addr = ip ? inet_addr(ip) : htonl(INADDR_ANY);
|
|
|
- if (bind(acceptor->u.sock, (struct sockaddr*)&service, sizeof(struct sockaddr)) != 0)
|
|
|
+ if (bind(acceptor->u.sock, (struct sockaddr*) & service, sizeof(struct sockaddr)) != 0) {
|
|
|
+ WLog_ERR(TAG, "bind sockect %d failed: %s", acceptor->u.sock, strerror(errno));
|
|
|
goto on_error;
|
|
|
+ }
|
|
|
if (evtpoll_attach(ioq->ep, acceptor->u.sock)) {
|
|
|
goto on_error;
|
|
|
}
|
|
|
@@ -1128,6 +1134,7 @@ TOOLKIT_API int ioqueue_acceptor_async_accept(ioqueue_acceptor_t* acceptor,
|
|
|
return -1;
|
|
|
overlapped = (ioqueue_accept_overlapped_t*)ov;
|
|
|
memset(overlapped, 0, sizeof(ioqueue_accept_overlapped_t));
|
|
|
+ ioqueue_overlapped_set_mask(ov, sizeof(ioqueue_accept_overlapped_t));
|
|
|
overlapped->client = INVALID_SOCKET;
|
|
|
fastlock_enter(acceptor->ov_pending_list_lock);
|
|
|
list_add_tail(&overlapped->base.pending_entry, &acceptor->ov_pending_list);
|
|
|
@@ -1202,6 +1209,7 @@ TOOLKIT_API int ioqueue_acceptor_create_client(ioqueue_acceptor_t* acceptor, SOC
|
|
|
}
|
|
|
add_handler_list(tcpsock, ioq);
|
|
|
inc_ref(ioqueue_handle_context, tcpsock);
|
|
|
+ WLog_DBG(TAG, "create a tcp session, fd=%d", s);
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
@@ -1328,6 +1336,7 @@ TOOLKIT_API int ioqueue_tcpsock_async_connect(ioqueue_tcpsock_t *tcpsock,
|
|
|
ioq = tcpsock->owner;
|
|
|
overlapped = (ioqueue_connect_overlapped_t*)ov;
|
|
|
memset(overlapped, 0, sizeof(ioqueue_connect_overlapped_t));
|
|
|
+ ioqueue_overlapped_set_mask(ov, sizeof(ioqueue_connect_overlapped_t));
|
|
|
fastlock_enter(tcpsock->ov_pending_list_lock);
|
|
|
list_add_tail(&overlapped->base.pending_entry, &tcpsock->ov_pending_list);
|
|
|
fastlock_leave(tcpsock->ov_pending_list_lock);
|
|
|
@@ -1455,6 +1464,7 @@ TOOLKIT_API int ioqueue_tcpsock_async_sendsome(ioqueue_tcpsock_t *tcpsock,
|
|
|
|
|
|
overlapped = (ioqueue_sendsome_overlapped_t*)ov;
|
|
|
memset(overlapped, 0, sizeof(ioqueue_sendsome_overlapped_t));
|
|
|
+ ioqueue_overlapped_set_mask(ov, sizeof(ioqueue_sendsome_overlapped_t));
|
|
|
fastlock_enter(tcpsock->ov_pending_list_lock);
|
|
|
list_add_tail(&overlapped->base.pending_entry, &tcpsock->ov_pending_list);
|
|
|
fastlock_leave(tcpsock->ov_pending_list_lock);
|
|
|
@@ -1496,9 +1506,10 @@ TOOLKIT_API int ioqueue_tcpsock_async_sendn(ioqueue_tcpsock_t *tcpsock,
|
|
|
ioq = ioqueue_tcpsock_get_owned_ioqueue(tcpsock);
|
|
|
if (ioq->stop)
|
|
|
return -1;
|
|
|
-
|
|
|
+ WLog_INFO(TAG, "fd(%d): ioqueue_tcpsock_async_sendn fired: buf len: %d", tcpsock->u.sock, len);
|
|
|
overlapped = (ioqueue_sendn_overlapped_t*)ov;
|
|
|
memset(overlapped, 0, sizeof(ioqueue_sendn_overlapped_t));
|
|
|
+ ioqueue_overlapped_set_mask(ov, sizeof(ioqueue_sendn_overlapped_t));
|
|
|
fastlock_enter(tcpsock->ov_pending_list_lock);
|
|
|
list_add_tail(&overlapped->base.pending_entry, &tcpsock->ov_pending_list);
|
|
|
fastlock_leave(tcpsock->ov_pending_list_lock);
|
|
|
@@ -1512,7 +1523,6 @@ TOOLKIT_API int ioqueue_tcpsock_async_sendn(ioqueue_tcpsock_t *tcpsock,
|
|
|
overlapped->sended_bytes = 0;
|
|
|
overlapped->total_bytes = len;
|
|
|
inc_pending_io(tcpsock);
|
|
|
- WLog_INFO(TAG, "ioqueue_tcpsock_async_sendn fired! sock: %d", tcpsock->u.sock);
|
|
|
rc = evtpoll_subscribe(ioq->ep, EV_WRITE, tcpsock->u.sock, NULL, ov);
|
|
|
if (!rc) {
|
|
|
return 0;
|
|
|
@@ -1598,6 +1608,7 @@ TOOLKIT_API int ioqueue_tcpsock_async_recvsome(ioqueue_tcpsock_t *tcpsock,
|
|
|
|
|
|
overlapped = (ioqueue_recvsome_overlapped_t*)ov;
|
|
|
memset(overlapped, 0, sizeof(ioqueue_recvsome_overlapped_t));
|
|
|
+ ioqueue_overlapped_set_mask(ov, sizeof(ioqueue_recvsome_overlapped_t));
|
|
|
fastlock_enter(tcpsock->ov_pending_list_lock);
|
|
|
list_add_tail(&overlapped->base.pending_entry, &tcpsock->ov_pending_list);
|
|
|
fastlock_leave(tcpsock->ov_pending_list_lock);
|
|
|
@@ -1643,6 +1654,7 @@ TOOLKIT_API int ioqueue_tcpsock_async_recvn(ioqueue_tcpsock_t *tcpsock,
|
|
|
|
|
|
overlapped = (ioqueue_recvn_overlapped_t*)ov;
|
|
|
memset(overlapped, 0, sizeof(ioqueue_recvn_overlapped_t));
|
|
|
+ ioqueue_overlapped_set_mask(ov, sizeof(ioqueue_recvn_overlapped_t));
|
|
|
fastlock_enter(tcpsock->ov_pending_list_lock);
|
|
|
list_add_tail(&overlapped->base.pending_entry, &tcpsock->ov_pending_list);
|
|
|
fastlock_leave(tcpsock->ov_pending_list_lock);
|
|
|
@@ -1694,6 +1706,7 @@ TOOLKIT_API int ioqueue_tcpsock_async_recvuntil(ioqueue_tcpsock_t *tcpsock,
|
|
|
|
|
|
overlapped = (ioqueue_recvuntil_overlapped_t*)ov;
|
|
|
memset(overlapped, 0, sizeof(ioqueue_recvuntil_overlapped_t));
|
|
|
+ ioqueue_overlapped_set_mask(ov, sizeof(ioqueue_recvuntil_overlapped_t));
|
|
|
fastlock_enter(tcpsock->ov_pending_list_lock);
|
|
|
list_add_tail(&overlapped->base.pending_entry, &tcpsock->ov_pending_list);
|
|
|
fastlock_leave(tcpsock->ov_pending_list_lock);
|
|
|
@@ -1892,6 +1905,7 @@ TOOLKIT_API int ioqueue_udpsock_async_sendto(ioqueue_udpsock_t* udpsock,
|
|
|
|
|
|
overlapped = (ioqueue_sendto_overlapped_t*)ov;
|
|
|
memset(overlapped, 0, sizeof(ioqueue_sendto_overlapped_t));
|
|
|
+ ioqueue_overlapped_set_mask(ov, sizeof(ioqueue_sendto_overlapped_t));
|
|
|
fastlock_enter(udpsock->ov_pending_list_lock);
|
|
|
list_add_tail(&overlapped->base.pending_entry, &udpsock->ov_pending_list);
|
|
|
fastlock_leave(udpsock->ov_pending_list_lock);
|
|
|
@@ -1946,6 +1960,7 @@ TOOLKIT_API int ioqueue_udpsock_async_recvfrom(ioqueue_udpsock_t* udpsock,
|
|
|
|
|
|
overlapped = (ioqueue_recvfrom_overlapped_t*)ov;
|
|
|
memset(overlapped, 0, sizeof(ioqueue_recvfrom_overlapped_t));
|
|
|
+ ioqueue_overlapped_set_mask(ov, sizeof(ioqueue_recvfrom_overlapped_t));
|
|
|
fastlock_enter(udpsock->ov_pending_list_lock);
|
|
|
list_add_tail(&overlapped->base.pending_entry, &udpsock->ov_pending_list);
|
|
|
fastlock_leave(udpsock->ov_pending_list_lock);
|
|
|
@@ -2135,6 +2150,7 @@ TOOLKIT_API int ioqueue_file_async_readsome_at(ioqueue_file_t* file,
|
|
|
|
|
|
overlapped = (ioqueue_readfilesome_overlapped_t*)ov;
|
|
|
memset(overlapped, 0, sizeof(ioqueue_readfilesome_overlapped_t));
|
|
|
+ ioqueue_overlapped_set_mask(ov, sizeof(ioqueue_readfilesome_overlapped_t));
|
|
|
fastlock_enter(file->ov_pending_list_lock);
|
|
|
list_add_tail(&overlapped->base.pending_entry, &file->ov_pending_list);
|
|
|
fastlock_leave(file->ov_pending_list_lock);
|
|
|
@@ -2182,6 +2198,7 @@ TOOLKIT_API int ioqueue_file_async_readn_at(ioqueue_file_t* file,
|
|
|
|
|
|
overlapped = (ioqueue_readfilen_overlapped_t*)ov;
|
|
|
memset(overlapped, 0, sizeof(ioqueue_readfilen_overlapped_t));
|
|
|
+ ioqueue_overlapped_set_mask(ov, sizeof(ioqueue_readfilen_overlapped_t));
|
|
|
fastlock_enter(file->ov_pending_list_lock);
|
|
|
list_add_tail(&overlapped->base.pending_entry, &file->ov_pending_list);
|
|
|
fastlock_leave(file->ov_pending_list_lock);
|
|
|
@@ -2342,6 +2359,7 @@ TOOLKIT_API int ioqueue_file_async_writesome_at(ioqueue_file_t* file,
|
|
|
|
|
|
overlapped = (ioqueue_writefilesome_overlapped_t*)ov;
|
|
|
memset(overlapped, 0, sizeof(ioqueue_writefilesome_overlapped_t));
|
|
|
+ ioqueue_overlapped_set_mask(ov, sizeof(ioqueue_writefilesome_overlapped_t));
|
|
|
fastlock_enter(file->ov_pending_list_lock);
|
|
|
list_add_tail(&overlapped->base.pending_entry, &file->ov_pending_list);
|
|
|
fastlock_leave(file->ov_pending_list_lock);
|
|
|
@@ -2389,6 +2407,7 @@ TOOLKIT_API int ioqueue_file_async_writen_at(ioqueue_file_t* file,
|
|
|
|
|
|
overlapped = (ioqueue_writefilen_overlapped_t*)ov;
|
|
|
memset(overlapped, 0, sizeof(ioqueue_writefilen_overlapped_t));
|
|
|
+ ioqueue_overlapped_set_mask(ov, sizeof(ioqueue_writefilen_overlapped_t));
|
|
|
fastlock_enter(file->ov_pending_list_lock);
|
|
|
list_add_tail(&overlapped->base.pending_entry, &file->ov_pending_list);
|
|
|
fastlock_leave(file->ov_pending_list_lock);
|
|
|
@@ -2593,6 +2612,7 @@ TOOLKIT_API int ioqueue_pipe_acceptor_async_accept(ioqueue_pipe_acceptor_t *acce
|
|
|
|
|
|
overlapped = (ioqueue_connectpipe_overlapped_t*)ov;
|
|
|
memset(overlapped, 0, sizeof(ioqueue_connectpipe_overlapped_t));
|
|
|
+ ioqueue_overlapped_set_mask(ov, sizeof(ioqueue_connectpipe_overlapped_t));
|
|
|
overlapped->client = CreateNamedPipeA(acceptor->u.pipe_name,
|
|
|
PIPE_ACCESS_DUPLEX|FILE_FLAG_OVERLAPPED, PIPE_TYPE_BYTE,
|
|
|
PIPE_UNLIMITED_INSTANCES, 3072, 3072, NMPWAIT_WAIT_FOREVER, NULL);
|
|
|
@@ -2724,8 +2744,13 @@ TOOLKIT_API void ioqueue_overlapped_set_mask(ioqueue_overlapped_t* io, int mask_
|
|
|
base_ov->ov.Internal = mask_value;
|
|
|
}
|
|
|
|
|
|
-TOOLKIT_API int ioqueue_overlapped_get_type(const ioqueue_overlapped_t* const io)
|
|
|
+/*at first, this function is only used evtpoll for dispatch, after then, bus_endpt seems involved*/
|
|
|
+TOOLKIT_API uint32_t ioqueue_overlapped_get_type(const ioqueue_overlapped_t* const io)
|
|
|
{
|
|
|
+ if (ioqueue_overlapped_get_mask(io) == sizeof(OVERLAPPED)) {
|
|
|
+ return EV_BUS_ENDPOINT;
|
|
|
+ }
|
|
|
+ /*idx of acceptor*/
|
|
|
int sub_type = 0;
|
|
|
const ioqueue_base_overlapped_t* base_ov = (ioqueue_base_overlapped_t*)io;
|
|
|
const ioqueue_handle_context* handle_ctx = base_ov->handle_ctx;
|