#include "precompile.h" #include "toolkit.h" #include "bus.h" #include "sockutil.h" #include "url.h" #include "spinlock.h" #include "list.h" #include "bus_internal.h" #include "evtpoll.h" #include "core.h" #include "dbgutil.h" #include #include #include #include #include #include #include "memutil.h" #define TAG TOOLKIT_TAG("bus") #define BUS_RESULT_DATA 1 // ==BUS_TYPE_PACKET, callback: callback.on_pkt, no use #define BUS_RESULT_INFO 2 // ==BUS_TYPE_INFO, callback: callback.on_inf #define BUS_RESULT_EVENT 3 // ==BUS_TYPE_EVENT, callback: callback.on_evt , no use #define BUS_RESULT_SYSTEM 4 // ==BUS_TYPE_SYSTEM, callback: callback.on_sys #define BUS_RESULT_MSG 5 // send package msg, callback: callback.on_msg #define BUS_RESULT_UNKNOWN 6 typedef struct msg_t { struct list_head entry; int type; int nparam; param_size_t* params; HANDLE evt; int evt_result; }msg_t; struct bus_endpt_t { int type; int epid; union { HANDLE pipe_handle; SOCKET sock_handle; }; char* url; /*define here or iom_t area, this is definitely a problem for now.*/ evtpoll_t* ep; int msg_fd; event_epoll_data_t* msg_sem; bus_endpt_callback callback; struct list_head msg_list; spinlock_t msg_lock; HANDLE tx_evt; //manually HANDLE rx_evt; //manually OVERLAPPED rx_overlapped; int rx_pending; int rx_pending_pkt_len; int rx_pending_pkt_uc_len; iobuffer_queue_t* rx_buf_queue; volatile int quit_flag; }; static void free_msg(msg_t* msg) { free(msg->params); free(msg); } static __inline int bus_endpoint__data_is_handle(const bus_endpt_t* endpt, void* data) { if ((endpt->type == TYPE_TCP && data == &endpt->sock_handle) || (endpt->type == TYPE_PIPE && data == &endpt->pipe_handle)) { return 1; } return 0; } static int to_result(int pkt_type) { switch (pkt_type) { case BUS_TYPE_EVENT: return BUS_RESULT_EVENT; case BUS_TYPE_SYSTEM: return BUS_RESULT_SYSTEM; case BUS_TYPE_PACKET: return BUS_RESULT_DATA; case BUS_TYPE_INFO: return BUS_RESULT_INFO; default: break; } return BUS_RESULT_UNKNOWN; } static HANDLE create_pipe_handle(const char* name) { char tmp[MAX_PATH]; SOCKET domain_socket; int len; struct sockaddr_un saddr, sun; HANDLE pipe = INVALID_HANDLE_VALUE; sprintf(tmp, "/tmp/%s", name); if (strlen(tmp) >= sizeof(saddr.sun_path)) { WLog_ERR(TAG, "pipe path name is longer than sun_path's pacaticy"); return -1; } /*create a UNIX domain stream socket*/ domain_socket = _socket(AF_UNIX, SOCK_STREAM, 0); if (domain_socket < 0) { WLog_ERR(TAG, "create domain socket failed: %d", errno); return INVALID_HANDLE_VALUE; } nonblock_sock(domain_socket); if (make_fd_cloexec(domain_socket, 1) != 0) { WLog_ERR(TAG, "set bus domain socked fd cloexec failed: %d", errno); closesocket(domain_socket); return INVALID_HANDLE_VALUE; } memset(&saddr, 0, sizeof(struct sockaddr_un)); saddr.sun_family = AF_UNIX; sprintf(saddr.sun_path, "/tmp/sphost.1.%08x", GetTickCount() * GetCurrentProcessId() % 0xffff); len = offsetof(struct sockaddr_un, sun_path) + strlen(saddr.sun_path); unlink(saddr.sun_path); if (_bind(domain_socket, (struct sockaddr*) & saddr, len) < 0) { WLog_ERR(TAG, "bind domain socket failed: %d", errno); closesocket(domain_socket); return INVALID_HANDLE_VALUE; } if (chmod(saddr.sun_path, S_IRWXU) < 0) { WLog_ERR(TAG, "chmod domain sun_path failed: %d", errno); closesocket(domain_socket); unlink(saddr.sun_path); return INVALID_HANDLE_VALUE; } memset(&sun, 0, sizeof(sun)); sun.sun_family = AF_UNIX; strcpy(sun.sun_path, tmp); len = offsetof(struct sockaddr_un, sun_path) + strlen(tmp); if (_connect(domain_socket, (struct sockaddr*) & sun, len) < 0) { WLog_ERR(TAG, "domain socket connet to server failed: %s(%d)", strerror(errno), errno); closesocket(domain_socket); unlink(saddr.sun_path); return INVALID_HANDLE_VALUE; } pipe = (HANDLE)domain_socket; return pipe; } static SOCKET create_socket_handle(const char* ip, int port) { SOCKET fd; struct sockaddr_in addr; /*Warning: only front three param is effective*/ fd = WSASocketA(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED); if (fd == INVALID_SOCKET) { return fd; } if (make_fd_cloexec(fd, 1)) { WLog_ERR(TAG, "set cloexec for socket fd failed: %d", errno); closesocket(fd); return INVALID_SOCKET; } { BOOL f = TRUE; u_long l = TRUE; setsockopt(fd, SOL_SOCKET, SO_DONTLINGER, (char*)&f, sizeof(f)); /*non block sock*/ _ioctlsocket(fd, FIONBIO, &l); } memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr.s_addr = inet_addr(ip); WLog_INFO(TAG, "fd(%d) start to connect...", fd); if (_connect(fd, (struct sockaddr*) & addr, sizeof(addr)) != 0) { if (errno == EINPROGRESS) { WLog_WARN(TAG, "in connect progress..."); fd_set wr_set, ex_set; FD_ZERO(&wr_set); FD_ZERO(&ex_set); FD_SET(fd, &wr_set); FD_SET(fd, &ex_set); if (_select(fd + 1, NULL, &wr_set, &ex_set, NULL) > 0 && FD_ISSET(fd, &wr_set)) { return fd; } } else { WLog_ERR(TAG, "_connect failed : %d", errno); } closesocket(fd); return INVALID_SOCKET; } WLog_WARN(TAG, "this should be appear hardly!!!"); return fd; } static int tcp_send_buf(bus_endpt_t* endpt, const char* buf, int n) { DWORD left = n; DWORD offset = 0; WLog_DBG(TAG, "==> fd(%d): tcp send buf len: %d", endpt->sock_handle, n); while (left > 0) { BOOL ret; WSABUF wsabuf; DWORD dwBytesTransfer; OVERLAPPED overlapped; memset(&overlapped, 0, sizeof(overlapped)); overlapped.hEvent = endpt->tx_evt; ResetEvent(endpt->tx_evt); wsabuf.buf = (char*)buf + offset; wsabuf.len = left; ret = FALSE; n = _send(endpt->sock_handle, wsabuf.buf, wsabuf.len, 0); if (n == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK) { fd_set wfds; struct timeval tv; int retval; FD_ZERO(&wfds); FD_SET(endpt->sock_handle, &wfds); tv.tv_sec = 5; tv.tv_usec = 0; retval = _select(endpt->sock_handle + 1, NULL, &wfds, NULL, &tv); if (retval == -1) { WLog_ERR(TAG, "select error, errno: %d", errno); } else if (retval && FD_ISSET(endpt->sock_handle, &wfds) > 0) { WLog_INFO(TAG, "can write"); n = _send(endpt->sock_handle, wsabuf.buf, wsabuf.len, 0); if (n >= 0) { ret = TRUE; dwBytesTransfer = n; } } else { WLog_WARN(TAG, "write timeout"); ret = TRUE; dwBytesTransfer = 0; } } else { WLog_ERR(TAG, "_send failed: %d", errno); } } else { ret = TRUE; dwBytesTransfer = n; } if (ret && dwBytesTransfer) { offset += dwBytesTransfer; left -= dwBytesTransfer; } else { WLog_DBG(TAG, "<== error fd(%d): tcp send buf left: %d", endpt->sock_handle, left); return -1; } } WLog_DBG(TAG, "<== fd(%d): tcp send buf len: %d", endpt->sock_handle, n); return 0; } static int pipe_send_buf(bus_endpt_t* endpt, const char* buf, int n) { DWORD left = n; DWORD offset = 0; while (left > 0) { BOOL ret; DWORD dwBytesTransfer; OVERLAPPED overlapped; memset(&overlapped, 0, sizeof(overlapped)); overlapped.hEvent = endpt->tx_evt; ResetEvent(endpt->tx_evt); ret = WriteFile(endpt->pipe_handle, buf + offset, left, &dwBytesTransfer, &overlapped); if (!ret && GetLastError() == ERROR_IO_PENDING) { ret = GetOverlappedResult(endpt->pipe_handle, &overlapped, &dwBytesTransfer, TRUE); } if (ret && dwBytesTransfer) { offset += dwBytesTransfer; left -= dwBytesTransfer; } else { return -1; } } return 0; } static int send_buf(bus_endpt_t* endpt, const char* buf, int n) { if (endpt->type == TYPE_PIPE) { return tcp_send_buf(endpt, buf, n); } else if (endpt->type == TYPE_TCP) { return tcp_send_buf(endpt, buf, n); } else { return -1; } } static int send_pkt_raw(bus_endpt_t* endpt, iobuffer_t* pkt) { int pkt_len = iobuffer_get_length(pkt); int rc; iobuffer_write_head(pkt, IOBUF_T_I4, &pkt_len, 0); rc = send_buf(endpt, iobuffer_data(pkt, 0), iobuffer_get_length(pkt)); return rc; } static int pipe_recv_buf(bus_endpt_t* endpt, char* buf, DWORD n) { DWORD left = n; DWORD offset = 0; while (left > 0) { BOOL ret; DWORD dwBytesTransfer; OVERLAPPED overlapped; memset(&overlapped, 0, sizeof(overlapped)); overlapped.hEvent = endpt->rx_evt; ResetEvent(overlapped.hEvent); ret = ReadFile(endpt->pipe_handle, buf + offset, left, &dwBytesTransfer, &overlapped); if (!ret && GetLastError() == ERROR_IO_PENDING) { ret = GetOverlappedResult(endpt->pipe_handle, &overlapped, &dwBytesTransfer, TRUE); } if (ret && dwBytesTransfer) { offset += dwBytesTransfer; left -= dwBytesTransfer; } else { return -1; } } return 0; } static int tcp_recv_buf(bus_endpt_t* endpt, char* buf, DWORD n) { DWORD left = n; DWORD offset = 0; WLog_DBG(TAG, "==> fd(%d): tcp recv buf len: %d", endpt->sock_handle, n); while (left > 0) { BOOL ret; int readn; DWORD dwBytesTransfer = 0; ret = FALSE; readn = _recv(endpt->sock_handle, buf + offset, left, 0); if (readn == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK) { fd_set rfds; struct timeval tv; int retval; FD_ZERO(&rfds); FD_SET(endpt->sock_handle, &rfds); tv.tv_sec = 5; tv.tv_usec = 0; retval = _select(endpt->sock_handle + 1, &rfds, NULL, NULL, &tv); if (retval == -1) { WLog_ERR(TAG, "select failed: %d", errno); } else if (retval && FD_ISSET(endpt->sock_handle, &rfds) > 0) { readn = _recv(endpt->sock_handle, buf + offset, left, 0); WLog_INFO(TAG, "sock fd have something to read now: %d", readn); if (readn >= 0) { ret = TRUE; dwBytesTransfer = readn; } } else { WLog_WARN(TAG, "read timeout: %d, left: %d", retval, left); ret = TRUE; } } } else { ret = TRUE; dwBytesTransfer = readn; } if (ret && dwBytesTransfer) { offset += dwBytesTransfer; left -= dwBytesTransfer; } else if (!ret) { WLog_ERR(TAG, "<== error fd(%d): tcp recv buf len: %d", endpt->sock_handle, left); return -1; } } WLog_DBG(TAG, "<== fd(%d): tcp recv buf left: %d", endpt->sock_handle, left); return 0; } /*read data block, n is the dream length of data to read which will store in buf*/ static int recv_buf(bus_endpt_t* endpt, char* buf, DWORD n) { if (endpt->type == TYPE_PIPE) { return tcp_recv_buf(endpt, buf, n); } else if (endpt->type == TYPE_TCP) { return tcp_recv_buf(endpt, buf, n); } else { return -1; } } static int recv_pkt_raw(bus_endpt_t* endpt, iobuffer_t** pkt) { int pkt_len; int rc = -1; rc = recv_buf(endpt, (char*)&pkt_len, 4); if (rc != 0) return rc; *pkt = iobuffer_create(-1, pkt_len); iobuffer_push_count(*pkt, pkt_len); if (pkt_len > 0) { rc = recv_buf(endpt, iobuffer_data(*pkt, 0), pkt_len); } if (rc < 0) { iobuffer_destroy(*pkt); *pkt = NULL; } return rc; } static int start_read_pkt(bus_endpt_t* endpt, iobuffer_t** p_pkt) { DWORD dwBytesTransferred; int ret; int rc = 0; iobuffer_t* pkt = NULL; int last_errno = 0; *p_pkt = NULL; WLog_DBG(TAG, "==>endpt(%d): start_read_pkt", endpt->epid); ResetEvent(endpt->rx_evt); WLog_DBG(TAG, "ResetEvent(endpt->rx_evt)."); endpt->rx_pending_pkt_uc_len = 0; endpt->rx_pending_pkt_len = 0; if (endpt->type == TYPE_PIPE) { WLog_DBG(TAG, "to _recv pipe."); ret = _recv(endpt->pipe_handle, (char*)&endpt->rx_pending_pkt_len, 4, 0); last_errno = errno; WLog_DBG(TAG, "_recv pipe return: %d,,,,,,,, (%d)", ret, last_errno); } else if (endpt->type == TYPE_TCP) { WLog_DBG(TAG, "to _recv."); ret = _recv(endpt->sock_handle, (char*)&endpt->rx_pending_pkt_len, 4, 0); last_errno = errno; WLog_DBG(TAG, "_recv return: %d, (%d)", ret, last_errno); } else { WLog_ERR(TAG, "<== endpt(%d): start_read_pkt unkonwn type", endpt->epid); return -1; } if (ret >= 0) { dwBytesTransferred = ret; endpt->rx_pending_pkt_uc_len = ret; if (dwBytesTransferred == 0) { WLog_ERR(TAG, "<== endpt(%d): start_read_pkt peer socket close.", endpt->epid); return -1; } if (dwBytesTransferred < 4) { WLog_DBG(TAG, "endpt(%d): receive buffer less than dream len", endpt->epid); rc = recv_buf(endpt, (char*)&endpt->rx_pending_pkt_len + dwBytesTransferred, 4 - dwBytesTransferred); if (rc < 0) { WLog_ERR(TAG, "<== endpt(%d): start_read_pkt recv_buf error.", endpt->epid); return rc; } } WLog_DBG(TAG, "iobuffer_create ???"); pkt = iobuffer_create(0, endpt->rx_pending_pkt_len); endpt->rx_pending_pkt_uc_len = 0; if (endpt->rx_pending_pkt_len > 0) { WLog_DBG(TAG, "endpt->rx_pending_pkt_len > 0 ???"); rc = recv_buf(endpt, iobuffer_data(pkt, 0), endpt->rx_pending_pkt_len); if (rc < 0) { WLog_DBG(TAG, "iobuffer_destroy ???"); iobuffer_destroy(pkt); WLog_ERR(TAG, "<== endpt(%d): start_read_pkt recv_buf error.", endpt->epid); return rc; } WLog_DBG(TAG, "iobuffer_push_count ???"); iobuffer_push_count(pkt, endpt->rx_pending_pkt_len); } WLog_DBG(TAG, "*p_pkt = pkt ???"); *p_pkt = pkt; WLog_ERR(TAG, "<== endpt(%d): start_read_pkt recv_buf succ.", endpt->epid); return 0; } else if(last_errno == EAGAIN || last_errno == EWOULDBLOCK) { WLog_DBG(TAG, "endpt(%d): set rx pending flag.", endpt->epid); endpt->rx_pending = 1; return 0; } WLog_ERR(TAG, "<== endpt(%d): start_read_pkt recv_buf failed.", endpt->epid); return -1; } static int read_left_pkt(bus_endpt_t* endpt, iobuffer_t** p_pkt) { int ret = -1; int rc; DWORD dwBytesTransferred; iobuffer_t* pkt = NULL; WLog_DBG(TAG, "==> fd(%d): read left pkt", endpt->sock_handle); if (endpt->type == TYPE_PIPE) { ret = _recv(endpt->pipe_handle, (char*)&endpt->rx_pending_pkt_len + endpt->rx_pending_pkt_uc_len, 4 - endpt->rx_pending_pkt_uc_len, 0); } else if (endpt->type == TYPE_TCP) { ret = _recv(endpt->sock_handle, (char*)&endpt->rx_pending_pkt_len + endpt->rx_pending_pkt_uc_len, 4 - endpt->rx_pending_pkt_uc_len, 0); } else { TOOLKIT_ASSERT(0); } if (ret < 0) { WLog_ERR(TAG, "<== fd(%d): read left pkt failed: ret %d, err: %d", endpt->sock_handle, ret, (errno)); return -1; } else if(ret == 0) { WLog_WARN(TAG, "<== fd(%d): peer socket close.", endpt->sock_handle); return -1; } dwBytesTransferred = endpt->rx_pending_pkt_uc_len + ret; endpt->rx_pending_pkt_uc_len = dwBytesTransferred; if (dwBytesTransferred < 4) { rc = recv_buf(endpt, (char*)&endpt->rx_pending_pkt_len + dwBytesTransferred, 4 - dwBytesTransferred); if (rc < 0) return rc; } /*the first 4 bytes indicates the length of content and then read the buffer content.*/ pkt = iobuffer_create(-1, endpt->rx_pending_pkt_len); WLog_DBG(TAG, "after read pkt len, start to read pkt's content: %d", endpt->rx_pending_pkt_len); rc = recv_buf(endpt, iobuffer_data(pkt, 0), endpt->rx_pending_pkt_len); if (rc < 0) { iobuffer_destroy(pkt); return rc; } iobuffer_push_count(pkt, endpt->rx_pending_pkt_len); *p_pkt = pkt; WLog_DBG(TAG, "endpt(%d): reset rx_pending and return!", endpt->epid); endpt->rx_pending = 0; return 0; } static int append_rx_pkt(bus_endpt_t* endpt, iobuffer_t* pkt) { int type; int read_state; read_state = iobuffer_get_read_state(pkt); iobuffer_read(pkt, IOBUF_T_I4, &type, 0); iobuffer_restore_read_state(pkt, read_state); if (type == BUS_TYPE_PACKET || type == BUS_TYPE_INFO || type == BUS_TYPE_EVENT || type == BUS_TYPE_SYSTEM) { iobuffer_queue_enqueue(endpt->rx_buf_queue, pkt); WLog_DBG(TAG, "<== append_rx_pkt finished"); return 1; } else { WLog_DBG(TAG, "<== append_rx_pkt failed!"); return -1; } } TOOLKIT_API int bus_endpt_create(const char* url, int epid, const bus_endpt_callback* callback, bus_endpt_t** p_endpt) { bus_endpt_t* endpt = NULL; char* tmp_url; url_fields uf; int rc; int v; iobuffer_t* buf = NULL; iobuffer_t* ans_buf = NULL; if (!url) return -1; tmp_url = _strdup(url); if (url_parse(tmp_url, &uf) < 0) { free(tmp_url); WLog_ERR(TAG, "url parse failed!"); return -1; } endpt = ZALLOC_T(bus_endpt_t); endpt->sock_handle = -1; endpt->msg_fd = -1; endpt->ep = NULL; endpt->url = tmp_url; if (_stricmp(uf.scheme, "tcp") == 0) { endpt->type = TYPE_TCP; endpt->sock_handle = create_socket_handle(uf.host, uf.port); if (endpt->sock_handle == INVALID_SOCKET) goto on_error; WLog_INFO(TAG, "bus endpt socket fd: %d", endpt->sock_handle); } else if (_stricmp(uf.scheme, "pipe") == 0) { endpt->type = TYPE_PIPE; endpt->pipe_handle = create_pipe_handle(uf.host); if (endpt->pipe_handle == INVALID_HANDLE_VALUE) goto on_error; WLog_INFO(TAG, "bus endpt pipe fd: %d", endpt->pipe_handle); } else { goto on_error; } endpt->ep = evtpoll_create(); if (!endpt->ep) { WLog_ERR(TAG, "evtpoll create failed!"); goto on_error; } endpt->epid = epid; endpt->tx_evt = CreateEventA(NULL, TRUE, FALSE, NULL); endpt->rx_evt = CreateEventA(NULL, TRUE, FALSE, NULL); endpt->rx_buf_queue = iobuffer_queue_create(); endpt->msg_fd = eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE); if (endpt->msg_fd == -1) { WLog_ERR(TAG, "create event fd failed: %d", errno); endpt->msg_fd = 0; goto on_error; } INIT_LIST_HEAD(&endpt->msg_list); spinlock_init(&endpt->msg_lock); memcpy(&endpt->callback, callback, sizeof(bus_endpt_callback)); buf = iobuffer_create(-1, -1); v = BUS_TYPE_ENDPT_REGISTER; iobuffer_write(buf, IOBUF_T_I4, &v, 0); v = endpt->epid; iobuffer_write(buf, IOBUF_T_I4, &v, 0); WLog_DBG(TAG, "%s::start to send_pkt_raw...", __FUNCTION__); rc = send_pkt_raw(endpt, buf); WLog_INFO(TAG, "%s::send_pkt_raw return %d", __FUNCTION__, rc); if (rc != 0) goto on_error; rc = recv_pkt_raw(endpt, &ans_buf); WLog_INFO(TAG, "%s::recv_pkt_raw return %d", __FUNCTION__, rc); if (rc != 0) { goto on_error; } iobuffer_read(ans_buf, IOBUF_T_I4, &v, 0); iobuffer_read(ans_buf, IOBUF_T_I4, &rc, 0); if (rc != 0) goto on_error; url_free_fields(&uf); if (buf) iobuffer_destroy(buf); if (ans_buf) iobuffer_destroy(ans_buf); if (-1 == evtpoll_attach(endpt->ep, endpt->msg_fd)) { goto on_error; } if (evtpoll_subscribe(endpt->ep, EV_READ, endpt->msg_fd, &endpt->msg_fd, NULL)) { WLog_ERR(TAG, "epoll subscribe bus endpt eventfd failed."); goto on_error_msg; } /* subscribe read event [3/27/2020 Gifur] */ if (evtpoll_attach(endpt->ep, endpt->sock_handle)) { WLog_ERR(TAG, "epoll attch bus endpt failed."); goto on_error_msg; } if (evtpoll_subscribe(endpt->ep, EV_READ, endpt->sock_handle, &endpt->sock_handle, NULL)) { WLog_ERR(TAG, "epoll subscribe bus endpt failed."); goto on_error_handle; } *p_endpt = endpt; return 0; on_error_handle: evtpoll_detach(endpt->ep, endpt->sock_handle); on_error_msg: evtpoll_detach(endpt->ep, endpt->msg_fd); on_error: if (endpt->type == TYPE_TCP) { closesocket(endpt->sock_handle); } else if (endpt->type == TYPE_PIPE) { close(endpt->pipe_handle); } if (endpt->msg_fd > 0) close(endpt->msg_fd); if (endpt->tx_evt) CloseHandle(endpt->tx_evt); if (endpt->rx_evt) CloseHandle(endpt->rx_evt); if (endpt->rx_buf_queue) iobuffer_queue_destroy(endpt->rx_buf_queue); if (endpt->url) free(endpt->url); free(endpt); url_free_fields(&uf); if (buf) iobuffer_destroy(buf); if (ans_buf) iobuffer_destroy(ans_buf); if (endpt->ep != NULL) { evtpoll_destroy(endpt->ep); } return -1; } TOOLKIT_API void bus_endpt_destroy(bus_endpt_t* endpt) { int rc = -1; iobuffer_t* buf = NULL; iobuffer_t* ans_buf = NULL; int v; TOOLKIT_ASSERT(endpt); buf = iobuffer_create(-1, -1); v = BUS_TYPE_ENDPT_UNREGISTER; iobuffer_write(buf, IOBUF_T_I4, &v, 0); v = endpt->epid; iobuffer_write(buf, IOBUF_T_I4, &v, 0); rc = send_pkt_raw(endpt, buf); if (rc != 0) goto on_error; rc = recv_pkt_raw(endpt, &ans_buf); if (rc != 0) goto on_error; iobuffer_read(ans_buf, IOBUF_T_I4, &v, 0); iobuffer_read(ans_buf, IOBUF_T_I4, &rc, 0); on_error: spinlock_enter(&endpt->msg_lock, -1); if (!list_empty(&endpt->msg_list)) { msg_t* msg, * t; list_for_each_entry_safe(msg, t, &endpt->msg_list, msg_t, entry) { list_del(&msg->entry); if (msg->evt) CloseHandle(msg->evt); if (msg->type == 2/*IOM_T_SEND_INFO*/) { iobuffer_t* pkt = (iobuffer_t*)msg->params[5]; if (pkt) iobuffer_dec_ref(pkt); } free_msg(msg); } } spinlock_leave(&endpt->msg_lock); if (buf) iobuffer_destroy(buf); if (ans_buf) iobuffer_destroy(ans_buf); if (endpt->type == TYPE_TCP) { evtpoll_detach(endpt->ep, endpt->sock_handle); closesocket(endpt->sock_handle); } else if (endpt->type == TYPE_PIPE) { evtpoll_detach(endpt->ep, endpt->pipe_handle); closesocket(endpt->pipe_handle); } if (endpt->msg_fd) { evtpoll_detach(endpt->ep, endpt->msg_fd); close(endpt->msg_fd); } if (endpt->tx_evt) CloseHandle(endpt->tx_evt); if (endpt->rx_evt) CloseHandle(endpt->rx_evt); if (endpt->rx_buf_queue) iobuffer_queue_destroy(endpt->rx_buf_queue); TOOLKIT_ASSERT(endpt->ep); evtpoll_destroy(endpt->ep); if (endpt->url) free(endpt->url); free(endpt); } // 1 : recv ok // 0 : time out // <0 : error static int bus_endpt_poll_internal(bus_endpt_t* endpt, int* result, int timeout) { iobuffer_t* pkt = NULL; int rc; BOOL ret; TOOLKIT_ASSERT(endpt); // peek first packge type if (iobuffer_queue_count(endpt->rx_buf_queue) > 0) { pkt = iobuffer_queue_head(endpt->rx_buf_queue); } else { // no received package, try to receive one if (!endpt->rx_pending) { rc = start_read_pkt(endpt, &pkt); if (rc < 0) { WLog_ERR(TAG, "start read pkt failed."); return rc; } if (pkt) { WLog_INFO(TAG, "pkt has read"); rc = append_rx_pkt(endpt, pkt); // append pkt to rx_buf_queue if (rc < 0) { iobuffer_destroy(pkt); return -2; } } } else { //WLog_ERR(TAG, "is pending now."); } // if receive is pending, wait for send or receive complete event if (!pkt) { //WLog_DBG(TAG, "wait msg sem or received event. tiemout: %d", timeout); { int nfds; int ret; int i; struct epoll_event events[MAX_EPOLL_EVENT]; struct epoll_event* pe; nfds = evtpoll_wait(endpt->ep, events, MAX_EPOLL_EVENT, timeout); if (nfds == TOOLKIT_ETIMEDOUT) { //WLog_DBG(TAG, "epoll wait timeout."); return 0; //timeout } if (nfds == -1) { return -3; } WLog_DBG(TAG, "epoll wait return nfd: %d", nfds); for (i = 0; i < nfds; ++i) { void* pdata = NULL; pe = events + i; WLog_INFO(TAG, "loop events[%d]::fd(0x%08X) OUT:%d, IN:%d", i, pe->data.fd, pe->events & EPOLLOUT ? 1 : 0, pe->events & EPOLLIN ? 1 : 0); TOOLKIT_ASSERT(pe->events & EPOLLIN); ret = evtpoll_deal(endpt->ep, pe, &pdata, 0); if (!ret) { TOOLKIT_ASSERT(pdata); if(bus_endpoint__data_is_handle(endpt, pdata)) { rc = read_left_pkt(endpt, &pkt); if (rc < 0) return rc; if (pkt) { rc = append_rx_pkt(endpt, pkt); if (rc < 0) { iobuffer_destroy(pkt); return -4; } } } else if (pdata == &endpt->msg_fd) { uint64_t rdata; WLog_DBG(TAG, "message arrive."); do { ret = read(endpt->msg_fd, &rdata, sizeof rdata); } while (ret < 0 && errno == EINTR); if (ret < 0) { WLog_ERR(TAG, "read msg fd failed: %d", (errno)); abort(); } *result = BUS_RESULT_MSG; return 1; } } } } } else { WLog_ERR(TAG, "pkt has readed"); } } if (pkt) { int type; int read_state = iobuffer_get_read_state(pkt); iobuffer_read(pkt, IOBUF_T_I4, &type, 0); iobuffer_restore_read_state(pkt, read_state); *result = to_result(type); if (*result == BUS_RESULT_UNKNOWN) { WLog_ERR(TAG, "bug: unknown pkt type!"); return -5; } return 1; } return -6; } static int recv_until(bus_endpt_t* endpt, int type, iobuffer_t** p_ansbuf) { int rc; iobuffer_t* ans_pkt = NULL; int ans_type; WLog_DBG(TAG, "==>endpt(%d): recv until type: 0x%08X", endpt->epid, type); for (;;) { if (!endpt->rx_pending) { rc = start_read_pkt(endpt, &ans_pkt); if (rc < 0) { break; } } else { WLog_DBG(TAG, "endpt(%d) is pending", endpt->epid); } if (!ans_pkt) { int nfds; int ret; int i, flag = 0; struct epoll_event events[MAX_EPOLL_EVENT]; struct epoll_event* pe; nfds = evtpoll_wait(endpt->ep, events, MAX_EPOLL_EVENT, -1); if (nfds == TOOLKIT_ETIMEDOUT) { continue; } if (nfds == -1) { return -1; } WLog_DBG(TAG, "epoll wait return nfd: %d", nfds); for (i = 0; i < nfds; ++i) { void* pdata = NULL; pe = events + i; ret = evtpoll_deal(endpt->ep, pe, &pdata, 0); if (!ret && bus_endpoint__data_is_handle(endpt, pdata)) { flag = 1; break; } } if (flag) { rc = read_left_pkt(endpt, &ans_pkt); if (rc < 0) { break; } } } if (ans_pkt) { int read_state = iobuffer_get_read_state(ans_pkt); iobuffer_read(ans_pkt, IOBUF_T_I4, &ans_type, 0); iobuffer_restore_read_state(ans_pkt, read_state); if (ans_type == type) { *p_ansbuf = ans_pkt; break; } else { rc = append_rx_pkt(endpt, ans_pkt); if (rc < 0) { iobuffer_destroy(ans_pkt); break; } else { ans_pkt = NULL; } } } } return rc; } static int recv_until_result(bus_endpt_t* endpt, int* p_result) { int rc; iobuffer_t* ans_pkt = NULL; int type, error; rc = recv_until(endpt, BUS_TYPE_ERROR, &ans_pkt); if (rc < 0) return rc; iobuffer_read(ans_pkt, IOBUF_T_I4, &type, 0); iobuffer_read(ans_pkt, IOBUF_T_I4, &error, 0); iobuffer_destroy(ans_pkt); *p_result = error; return rc; } static int recv_until_state(bus_endpt_t* endpt, int* p_state) { int rc; iobuffer_t* ans_pkt = NULL; int type, epid, state; rc = recv_until(endpt, BUS_TYPE_ENDPT_GET_STATE, &ans_pkt); if (rc < 0) return rc; iobuffer_read(ans_pkt, IOBUF_T_I4, &type, 0); iobuffer_read(ans_pkt, IOBUF_T_I4, &epid, 0); iobuffer_read(ans_pkt, IOBUF_T_I4, &state, 0); iobuffer_destroy(ans_pkt); WLog_DBG(TAG, "state address: 0x%08X", p_state); *p_state = state; return rc; } TOOLKIT_API int bus_endpt_send_pkt(bus_endpt_t* endpt, int epid, int type, iobuffer_t* pkt) { int t; int rc; int read_state; int write_state; int error; char bussinessId[LINKINFO_BUSSID_LEN]; char traceId[LINKINFO_TRACEID_LEN]; char spanId[LINKINFO_SPANID_LEN]; char parentSpanId[LINKINFO_PARENTSPANID_LEN]; TOOLKIT_ASSERT(endpt); read_state = iobuffer_get_read_state(pkt); write_state = iobuffer_get_write_state(pkt); /** 这里塞的是完整的固定长度*/ iobuffer_get_linkInfo(pkt, bussinessId, traceId, spanId, parentSpanId); iobuffer_write_head(pkt, IOBUF_T_BUF, parentSpanId, sizeof(parentSpanId)); iobuffer_write_head(pkt, IOBUF_T_BUF, spanId, sizeof(spanId)); iobuffer_write_head(pkt, IOBUF_T_BUF, traceId, sizeof(traceId)); iobuffer_write_head(pkt, IOBUF_T_BUF, bussinessId, sizeof(bussinessId)); t = epid; // remote epid iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0); t = endpt->epid; // local epid iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0); t = type; // user type iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0); t = BUS_TYPE_PACKET; // type iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0); rc = send_pkt_raw(endpt, pkt); iobuffer_restore_read_state(pkt, read_state); iobuffer_restore_write_state(pkt, write_state); if (rc < 0) return rc; rc = recv_until_result(endpt, &error); if (rc == 0 && error != 0) rc = error; return rc; } TOOLKIT_API int bus_endpt_send_info(bus_endpt_t* endpt, int epid, int type, iobuffer_t* pkt) { int t; int rc; int read_state; int write_state; char bussinessId[LINKINFO_BUSSID_LEN]; char traceId[LINKINFO_TRACEID_LEN]; char spanId[LINKINFO_SPANID_LEN]; char parentSpanId[LINKINFO_PARENTSPANID_LEN]; TOOLKIT_ASSERT(endpt); WLog_DBG(TAG, "==> endpt(%d) send info: %d, 0x%08X.", endpt->epid, epid, type); read_state = iobuffer_get_read_state(pkt); write_state = iobuffer_get_write_state(pkt); iobuffer_get_linkInfo(pkt, bussinessId, traceId, spanId, parentSpanId); iobuffer_write_head(pkt, IOBUF_T_BUF, parentSpanId, sizeof(parentSpanId)); iobuffer_write_head(pkt, IOBUF_T_BUF, spanId, sizeof(spanId)); iobuffer_write_head(pkt, IOBUF_T_BUF, traceId, sizeof(traceId)); iobuffer_write_head(pkt, IOBUF_T_BUF, bussinessId, sizeof(bussinessId)); t = epid; // remote epid iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0); t = endpt->epid; // local epid iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0); t = type; // user type iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0); t = BUS_TYPE_INFO; // type iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0); rc = send_pkt_raw(endpt, pkt); iobuffer_restore_read_state(pkt, read_state); iobuffer_restore_write_state(pkt, write_state); return rc; } TOOLKIT_API int bus_endpt_bcast_evt(bus_endpt_t* endpt, int type, iobuffer_t* pkt) { int t; int rc; int read_state; int write_state; TOOLKIT_ASSERT(endpt); read_state = iobuffer_get_read_state(pkt); write_state = iobuffer_get_write_state(pkt); t = endpt->epid; iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0); t = type; iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0); t = BUS_TYPE_EVENT; iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0); rc = send_pkt_raw(endpt, pkt); iobuffer_restore_read_state(pkt, read_state); iobuffer_restore_write_state(pkt, write_state); return rc; } static int bus_endpt_recv_pkt(bus_endpt_t* endpt, int* p_epid, int* p_type, iobuffer_t** p_pkt) { if (iobuffer_queue_count(endpt->rx_buf_queue) > 0) { iobuffer_t* pkt = iobuffer_queue_head(endpt->rx_buf_queue); int read_state = iobuffer_get_read_state(pkt); int pkt_type, usr_type, from_epid, to_epid, link_id; char bussinessId[LINKINFO_BUSSID_LEN+1]; char traceId[LINKINFO_TRACEID_LEN+1]; char spanId[LINKINFO_SPANID_LEN+1]; char parentSpanId[LINKINFO_PARENTSPANID_LEN+1]; int readLen = 0; memset(bussinessId, 0, LINKINFO_BUSSID_LEN+1); memset(traceId, 0, LINKINFO_TRACEID_LEN+1); memset(spanId, 0, LINKINFO_SPANID_LEN+1); memset(parentSpanId, 0, LINKINFO_PARENTSPANID_LEN+1); iobuffer_read(pkt, IOBUF_T_I4, &pkt_type, 0); if (pkt_type == BUS_TYPE_PACKET || pkt_type == BUS_TYPE_INFO) { iobuffer_read(pkt, IOBUF_T_I4, &usr_type, 0); iobuffer_read(pkt, IOBUF_T_I4, &from_epid, 0); iobuffer_read(pkt, IOBUF_T_I4, &to_epid, 0); readLen = LINKINFO_BUSSID_LEN; iobuffer_read(pkt, IOBUF_T_BUF, bussinessId, &readLen); readLen = LINKINFO_TRACEID_LEN; iobuffer_read(pkt, IOBUF_T_BUF, traceId, &readLen); readLen = LINKINFO_SPANID_LEN; iobuffer_read(pkt, IOBUF_T_BUF, spanId, &readLen); readLen = LINKINFO_PARENTSPANID_LEN; iobuffer_read(pkt, IOBUF_T_BUF, parentSpanId, &readLen); iobuffer_set_linkInfo(pkt, bussinessId, traceId, spanId, parentSpanId); //WLog_DBG(TAG, "bussinessId:%s,traceId:%s,spanId:%s,parentSpanId:%s", bussinessId, traceId, spanId, parentSpanId); if (p_epid) *p_epid = from_epid; if (p_type) *p_type = usr_type; iobuffer_queue_deque(endpt->rx_buf_queue); if (p_pkt) { *p_pkt = pkt; } else { iobuffer_destroy(pkt); } return 0; } else { iobuffer_restore_read_state(pkt, read_state); } } return -1; } static int bus_endpt_recv_evt(bus_endpt_t* endpt, int* p_epid, int* p_type, iobuffer_t** p_pkt) { if (iobuffer_queue_count(endpt->rx_buf_queue) > 0) { iobuffer_t* pkt = iobuffer_queue_head(endpt->rx_buf_queue); int read_state = iobuffer_get_read_state(pkt); int pkt_type, usr_type, from_epid; iobuffer_read(pkt, IOBUF_T_I4, &pkt_type, 0); if (pkt_type == BUS_TYPE_EVENT) { iobuffer_read(pkt, IOBUF_T_I4, &usr_type, 0); iobuffer_read(pkt, IOBUF_T_I4, &from_epid, 0); if (p_epid) *p_epid = from_epid; if (p_type) *p_type = usr_type; iobuffer_queue_deque(endpt->rx_buf_queue); if (p_pkt) { *p_pkt = pkt; } else { iobuffer_destroy(pkt); } return 0; } else { iobuffer_restore_read_state(pkt, read_state); } } return -1; } static int bus_endpt_recv_sys(bus_endpt_t* endpt, int* p_epid, int* p_state) { if (iobuffer_queue_count(endpt->rx_buf_queue) > 0) { iobuffer_t* pkt = iobuffer_queue_head(endpt->rx_buf_queue); int read_state = iobuffer_get_read_state(pkt); int pkt_type, epid, state; iobuffer_read(pkt, IOBUF_T_I4, &pkt_type, 0); if (pkt_type == BUS_TYPE_SYSTEM) { iobuffer_read(pkt, IOBUF_T_I4, &epid, 0); iobuffer_read(pkt, IOBUF_T_I4, &state, 0); if (p_epid) *p_epid = epid; if (p_state) *p_state = state; iobuffer_queue_deque(endpt->rx_buf_queue); iobuffer_destroy(pkt); return 0; } else { iobuffer_restore_read_state(pkt, read_state); } } return -1; } static int bus_endpt_recv_msg(bus_endpt_t* endpt, msg_t** p_msg) { int rc = -1; TOOLKIT_ASSERT(endpt); TOOLKIT_ASSERT(p_msg); spinlock_enter(&endpt->msg_lock, -1); if (!list_empty(&endpt->msg_list)) { msg_t* e = list_first_entry(&endpt->msg_list, msg_t, entry); list_del(&e->entry); rc = 0; *p_msg = e; } spinlock_leave(&endpt->msg_lock); return rc; } TOOLKIT_API int bus_endpt_get_state(bus_endpt_t* endpt, int epid, int* p_state) { iobuffer_t* buf = NULL; int v; int rc = -1; TOOLKIT_ASSERT(endpt); buf = iobuffer_create(-1, -1); v = BUS_TYPE_ENDPT_GET_STATE; iobuffer_write(buf, IOBUF_T_I4, &v, 0); v = epid; iobuffer_write(buf, IOBUF_T_I4, &v, 0); rc = send_pkt_raw(endpt, buf); if (rc < 0) { WLog_ERR(TAG, "send pkt raw failed."); goto on_error; } rc = recv_until_state(endpt, p_state); on_error: if (buf) iobuffer_destroy(buf); return rc; } TOOLKIT_API int bus_endpt_post_msg(bus_endpt_t* endpt, int msg, int nparam, param_size_t params[]) { msg_t* e; int rc; uint64_t wdata = 0; TOOLKIT_ASSERT(endpt); WLog_DBG(TAG, "==> endpt(%d) post msg: %d", endpt->epid, msg); e = MALLOC_T(msg_t); e->type = msg; e->nparam = nparam; if (nparam) { e->params = (param_size_t*)malloc(sizeof(param_size_t) * nparam); memcpy(e->params, params, sizeof(param_size_t) * nparam); } else { e->params = NULL; } e->evt = NULL; spinlock_enter(&endpt->msg_lock, -1); list_add_tail(&e->entry, &endpt->msg_list); spinlock_leave(&endpt->msg_lock); wdata = 1; do { rc = write(endpt->msg_fd, &wdata, sizeof wdata); } while (rc < 0 && rc == EINTR); if (rc == -1) { WLog_ERR(TAG, "<== endpt(%d) post msg: %d failed: %d", endpt->epid, msg, errno); return -1; } WLog_DBG(TAG, "<== endpt(%d) post msg: %d", endpt->epid, msg); return 0; } TOOLKIT_API int bus_endpt_send_msg(bus_endpt_t* endpt, int msg, int nparam, param_size_t params[]) { msg_t e; int rc; uint64_t wdata = 0; TOOLKIT_ASSERT(endpt); WLog_DBG(TAG, "==> endpt(%d) send msg: epid %d, param counts %d", endpt->epid, msg, nparam); e.type = msg; e.nparam = nparam; if (nparam) { e.params = (param_size_t*)malloc(sizeof(param_size_t) * nparam); memcpy(e.params, params, sizeof(param_size_t) * nparam); } else { e.params = NULL; } e.evt_result = 0; e.evt = CreateEventA(NULL, TRUE, FALSE, NULL); TOOLKIT_ASSERT(e.evt != NULL); spinlock_enter(&endpt->msg_lock, -1); list_add_tail(&e.entry, &endpt->msg_list); spinlock_leave(&endpt->msg_lock); wdata = 1; do { rc = write(endpt->msg_fd, &wdata, sizeof wdata); } while (rc < 0 && rc == EINTR); if (rc == -1) { WLog_ERR(TAG, "write to eventfd failed: %d", errno); CloseHandle(e.evt); if (nparam) { free(e.params); } WLog_DBG(TAG, "<== error endpt(%d) send msg: %d", endpt->epid, msg); return -1; } WaitForSingleObject(e.evt, INFINITE); CloseHandle(e.evt); if (nparam) { free(e.params); } WLog_DBG(TAG, "<== endpt(%d) send msg: epid %d, evt_res: %d", endpt->epid, msg, e.evt_result); return e.evt_result; } TOOLKIT_API int bus_endpt_get_epid(bus_endpt_t* endpt) { return endpt->epid; } TOOLKIT_API const char* bus_endpt_get_url(bus_endpt_t* endpt) { return endpt->url; } TOOLKIT_API int bus_endpt_poll(bus_endpt_t* endpt, int timeout) { int result; int rc; int epid, type, state; iobuffer_t* pkt = NULL; rc = bus_endpt_poll_internal(endpt, &result, timeout); if (rc > 0) { if (result == BUS_RESULT_DATA) { bus_endpt_recv_pkt(endpt, &epid, &type, &pkt); if (endpt->callback.on_pkt) endpt->callback.on_pkt(endpt, epid, type, &pkt, endpt->callback.user_data); if (pkt) iobuffer_dec_ref(pkt); } else if (result == BUS_RESULT_INFO) { bus_endpt_recv_pkt(endpt, &epid, &type, &pkt); if (endpt->callback.on_inf) endpt->callback.on_inf(endpt, epid, type, &pkt, endpt->callback.user_data); if (pkt) iobuffer_dec_ref(pkt); } else if (result == BUS_RESULT_EVENT) { bus_endpt_recv_evt(endpt, &epid, &type, &pkt); if (endpt->callback.on_evt) endpt->callback.on_evt(endpt, epid, type, &pkt, endpt->callback.user_data); if (pkt) iobuffer_dec_ref(pkt); } else if (result == BUS_RESULT_SYSTEM) { bus_endpt_recv_sys(endpt, &epid, &state); if (endpt->callback.on_sys) endpt->callback.on_sys(endpt, epid, state, endpt->callback.user_data); } else if (result == BUS_RESULT_MSG) { msg_t* msg = NULL; bus_endpt_recv_msg(endpt, &msg); if (endpt->callback.on_msg) { endpt->callback.on_msg(endpt, msg->type, msg->nparam, msg->params, msg->evt ? &msg->evt_result : NULL, endpt->callback.user_data); if (msg->evt) { WLog_DBG(TAG, "after recv msg, send finished evt."); SetEvent(msg->evt); } else { WLog_DBG(TAG, "free msg"); free_msg(msg); } } else { if (msg->evt) { msg->evt_result = -1; WLog_DBG(TAG, "after on msg failed, send finished evt."); SetEvent(msg->evt); } else { WLog_DBG(TAG, "free msg"); free_msg(msg); } } } else { TOOLKIT_ASSERT(0); rc = -1; } } return rc; } TOOLKIT_API int bus_endpt_set_quit_flag(bus_endpt_t* endpt) { endpt->quit_flag = 1; return 0; } TOOLKIT_API int bus_endpt_get_quit_flag(bus_endpt_t* endpt) { return endpt->quit_flag; }