bus.c 25 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115
  1. #include "precompile.h"
  2. #include "bus.h"
  3. #include "sockutil.h"
  4. #include "url.h"
  5. #include "memutil.h"
  6. #include "spinlock.h"
  7. #include "list.h"
  8. #include "bus_internal.h"
  9. #define BUS_RESULT_DATA 1 // ==BUS_TYPE_PACKET, 回调callback.on_pkt, 未启用
  10. #define BUS_RESULT_INFO 2 // ==BUS_TYPE_INFO, 回调callback.on_inf
  11. #define BUS_RESULT_EVENT 3 // ==BUS_TYPE_EVENT, 回调callback.on_evt , 未启用
  12. #define BUS_RESULT_SYSTEM 4 // ==BUS_TYPE_SYSTEM, 回调callback.on_sys
  13. #define BUS_RESULT_MSG 5 // 发送包消息, 回调callback.on_msg
  14. #define BUS_RESULT_UNKNOWN 6
  15. typedef struct msg_t {
  16. struct list_head entry;
  17. int type;
  18. int nparam;
  19. int *params;
  20. HANDLE evt;
  21. int evt_result;
  22. }msg_t;
  23. struct bus_endpt_t {
  24. int type;
  25. int epid;
  26. union {
  27. HANDLE pipe_handle;
  28. SOCKET sock_handle;
  29. };
  30. char *url;
  31. bus_endpt_callback callback;
  32. struct list_head msg_list;
  33. spinlock_t msg_lock;
  34. HANDLE msg_sem;
  35. HANDLE tx_evt;
  36. HANDLE rx_evt;
  37. OVERLAPPED rx_overlapped;
  38. int rx_pending;
  39. int rx_pending_pkt_len;
  40. iobuffer_queue_t *rx_buf_queue;
  41. volatile int quit_flag;
  42. };
  43. static void free_msg(msg_t *msg)
  44. {
  45. free(msg->params);
  46. free(msg);
  47. }
  48. static int to_result(int pkt_type)
  49. {
  50. switch (pkt_type) {
  51. case BUS_TYPE_EVENT:
  52. return BUS_RESULT_EVENT;
  53. case BUS_TYPE_SYSTEM:
  54. return BUS_RESULT_SYSTEM;
  55. case BUS_TYPE_PACKET:
  56. return BUS_RESULT_DATA;
  57. case BUS_TYPE_INFO:
  58. return BUS_RESULT_INFO;
  59. default:
  60. break;
  61. }
  62. return BUS_RESULT_UNKNOWN;
  63. }
  64. static HANDLE create_pipe_handle(const char *name)
  65. {
  66. char tmp[MAX_PATH];
  67. HANDLE pipe = INVALID_HANDLE_VALUE;
  68. sprintf(tmp, "\\\\.\\pipe\\%s", name);
  69. for (;;) {
  70. pipe = CreateFileA(tmp,
  71. GENERIC_READ|GENERIC_WRITE,
  72. 0,
  73. NULL,
  74. OPEN_EXISTING,
  75. FILE_FLAG_OVERLAPPED,
  76. NULL);
  77. if (pipe == INVALID_HANDLE_VALUE) {
  78. if (GetLastError() == ERROR_PIPE_BUSY) {
  79. if (WaitNamedPipeA(name, 20000))
  80. continue;
  81. }
  82. }
  83. break;
  84. }
  85. return pipe;
  86. }
  87. static SOCKET create_socket_handle(const char *ip, int port)
  88. {
  89. SOCKET fd;
  90. struct sockaddr_in addr;
  91. fd = WSASocketA(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
  92. if (fd == INVALID_SOCKET)
  93. return fd;
  94. {
  95. BOOL f = TRUE;
  96. u_long l = TRUE;
  97. setsockopt(fd, SOL_SOCKET, SO_DONTLINGER, (char*)&f, sizeof(f));
  98. //setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&f, sizeof(f));
  99. ioctlsocket(fd, FIONBIO, &l);
  100. }
  101. memset(&addr, 0, sizeof(addr));
  102. addr.sin_family = AF_INET;
  103. addr.sin_port = htons(port);
  104. addr.sin_addr.s_addr = inet_addr(ip);
  105. if (connect(fd, (struct sockaddr*)&addr, sizeof(addr)) != 0) {
  106. if (WSAGetLastError() == WSAEWOULDBLOCK) {
  107. fd_set wr_set, ex_set;
  108. FD_ZERO(&wr_set);
  109. FD_ZERO(&ex_set);
  110. FD_SET(fd, &wr_set);
  111. FD_SET(fd, &ex_set);
  112. if (select(fd, NULL, &wr_set, &ex_set, NULL) > 0 && FD_ISSET(fd, &wr_set))
  113. return fd;
  114. }
  115. }
  116. if (fd != INVALID_SOCKET)
  117. closesocket(fd);
  118. //return INVALID_SOCKET;//{bug}
  119. return fd;
  120. }
  121. static int tcp_send_buf(bus_endpt_t *endpt, const char *buf, int n)
  122. {
  123. DWORD left = n;
  124. DWORD offset = 0;
  125. while (left > 0) {
  126. BOOL ret;
  127. WSABUF wsabuf;
  128. DWORD dwBytesTransfer;
  129. OVERLAPPED overlapped;
  130. memset(&overlapped, 0, sizeof(overlapped));
  131. overlapped.hEvent = endpt->tx_evt;
  132. ResetEvent(endpt->tx_evt);
  133. wsabuf.buf = (char*)buf+offset;
  134. wsabuf.len = left;
  135. ret = WSASend(endpt->sock_handle, &wsabuf, 1, &dwBytesTransfer, 0, &overlapped, NULL) == 0;
  136. if (!ret && WSAGetLastError() == WSA_IO_PENDING) {
  137. DWORD dwFlags = 0;
  138. ret = WSAGetOverlappedResult(endpt->sock_handle, &overlapped, &dwBytesTransfer, TRUE, &dwFlags);
  139. }
  140. if (ret && dwBytesTransfer) {
  141. offset += dwBytesTransfer;
  142. left -= dwBytesTransfer;
  143. } else {
  144. return -1;
  145. }
  146. }
  147. return 0;
  148. }
  149. static int pipe_send_buf(bus_endpt_t *endpt, const char *buf, int n)
  150. {
  151. DWORD left = n;
  152. DWORD offset = 0;
  153. while (left > 0) {
  154. BOOL ret;
  155. DWORD dwBytesTransfer;
  156. OVERLAPPED overlapped;
  157. memset(&overlapped, 0, sizeof(overlapped));
  158. overlapped.hEvent = endpt->tx_evt;
  159. ResetEvent(endpt->tx_evt);
  160. ret = WriteFile(endpt->pipe_handle, buf+offset, left, &dwBytesTransfer, &overlapped);
  161. if (!ret && GetLastError() == ERROR_IO_PENDING) {
  162. ret = GetOverlappedResult(endpt->pipe_handle, &overlapped, &dwBytesTransfer, TRUE);
  163. }
  164. if (ret && dwBytesTransfer) {
  165. offset += dwBytesTransfer;
  166. left -= dwBytesTransfer;
  167. } else {
  168. return -1;
  169. }
  170. }
  171. return 0;
  172. }
  173. static int send_buf(bus_endpt_t *endpt, const char *buf, int n)
  174. {
  175. if (endpt->type == TYPE_PIPE) {
  176. return pipe_send_buf(endpt, buf, n);
  177. } else if (endpt->type == TYPE_TCP) {
  178. return tcp_send_buf(endpt, buf, n);
  179. } else {
  180. return -1;
  181. }
  182. }
  183. static int send_pkt_raw(bus_endpt_t *endpt, iobuffer_t *pkt)
  184. {
  185. int pkt_len = iobuffer_get_length(pkt);
  186. int rc;
  187. iobuffer_write_head(pkt, IOBUF_T_I4, &pkt_len, 0);
  188. rc = send_buf(endpt, iobuffer_data(pkt, 0), iobuffer_get_length(pkt));
  189. return rc;
  190. }
  191. static int pipe_recv_buf(bus_endpt_t *endpt, char *buf, DWORD n)
  192. {
  193. DWORD left = n;
  194. DWORD offset = 0;
  195. while (left > 0) {
  196. BOOL ret;
  197. DWORD dwBytesTransfer;
  198. OVERLAPPED overlapped;
  199. memset(&overlapped, 0, sizeof(overlapped));
  200. overlapped.hEvent = endpt->rx_evt;
  201. ResetEvent(overlapped.hEvent);
  202. ret = ReadFile(endpt->pipe_handle, buf+offset, left, &dwBytesTransfer, &overlapped);
  203. if (!ret && GetLastError() == ERROR_IO_PENDING) {
  204. ret = GetOverlappedResult(endpt->pipe_handle, &overlapped, &dwBytesTransfer, TRUE);
  205. }
  206. if (ret && dwBytesTransfer) {
  207. offset += dwBytesTransfer;
  208. left -= dwBytesTransfer;
  209. } else {
  210. return -1;
  211. }
  212. }
  213. return 0;
  214. }
  215. static int tcp_recv_buf(bus_endpt_t *endpt, char *buf, DWORD n)
  216. {
  217. DWORD left = n;
  218. DWORD offset = 0;
  219. while (left > 0) {
  220. BOOL ret;
  221. DWORD dwFlags = 0;
  222. WSABUF wsabuf;
  223. DWORD dwBytesTransfer;
  224. OVERLAPPED overlapped;
  225. memset(&overlapped, 0, sizeof(overlapped));
  226. overlapped.hEvent = endpt->rx_evt;
  227. wsabuf.buf = buf +offset;
  228. wsabuf.len = left;
  229. ResetEvent(overlapped.hEvent);
  230. ret = WSARecv(endpt->sock_handle, &wsabuf, 1, &dwBytesTransfer, &dwFlags, &overlapped, NULL) == 0;
  231. if (!ret && WSAGetLastError() == WSA_IO_PENDING) {
  232. ret = WSAGetOverlappedResult(endpt->sock_handle, &overlapped, &dwBytesTransfer, TRUE, &dwFlags);
  233. }
  234. if (ret && dwBytesTransfer) {
  235. offset += dwBytesTransfer;
  236. left -= dwBytesTransfer;
  237. } else {
  238. return -1;
  239. }
  240. }
  241. return 0;
  242. }
  243. static int recv_buf(bus_endpt_t *endpt, char *buf, DWORD n)
  244. {
  245. if (endpt->type == TYPE_PIPE) {
  246. return pipe_recv_buf(endpt, buf, n);
  247. } else if (endpt->type == TYPE_TCP) {
  248. return tcp_recv_buf(endpt, buf, n);
  249. } else {
  250. return -1;
  251. }
  252. }
  253. static int recv_pkt_raw(bus_endpt_t *endpt, iobuffer_t **pkt)
  254. {
  255. int pkt_len;
  256. int rc = -1;
  257. rc = recv_buf(endpt, (char*)&pkt_len, 4);
  258. if (rc != 0)
  259. return rc;
  260. *pkt = iobuffer_create(-1, pkt_len);
  261. iobuffer_push_count(*pkt, pkt_len);
  262. if (pkt_len > 0) {
  263. rc = recv_buf(endpt, iobuffer_data(*pkt, 0), pkt_len);
  264. }
  265. if (rc < 0) {
  266. iobuffer_destroy(*pkt);
  267. *pkt = NULL;
  268. }
  269. return rc;
  270. }
  271. static int start_read_pkt(bus_endpt_t *endpt, iobuffer_t **p_pkt)
  272. {
  273. DWORD dwBytesTransferred;
  274. BOOL ret;
  275. int rc = 0;
  276. iobuffer_t *pkt = NULL;
  277. *p_pkt = NULL;
  278. ResetEvent(endpt->rx_evt);
  279. memset(&endpt->rx_overlapped, 0, sizeof(OVERLAPPED));
  280. endpt->rx_overlapped.hEvent = endpt->rx_evt;
  281. if (endpt->type == TYPE_PIPE) {
  282. ret = ReadFile(endpt->pipe_handle,
  283. &endpt->rx_pending_pkt_len,
  284. 4,
  285. &dwBytesTransferred,
  286. &endpt->rx_overlapped);
  287. } else if (endpt->type == TYPE_TCP) {
  288. DWORD dwFlags = 0;
  289. WSABUF wsabuf;
  290. wsabuf.buf = (char*)&endpt->rx_pending_pkt_len;
  291. wsabuf.len = 4;
  292. ret = WSARecv(endpt->sock_handle,
  293. &wsabuf,
  294. 1,
  295. &dwBytesTransferred,
  296. &dwFlags,
  297. &endpt->rx_overlapped,
  298. NULL) == 0;
  299. } else {
  300. return -1;
  301. }
  302. if (ret) {
  303. if (dwBytesTransferred == 0)
  304. return -1;
  305. if (dwBytesTransferred < 4) {
  306. rc = recv_buf(endpt, (char*)&endpt->rx_pending_pkt_len+dwBytesTransferred, 4-dwBytesTransferred);
  307. if (rc < 0)
  308. return rc;
  309. }
  310. pkt = iobuffer_create(0, endpt->rx_pending_pkt_len);
  311. if (endpt->rx_pending_pkt_len > 0) {
  312. rc = recv_buf(endpt, iobuffer_data(pkt, 0), endpt->rx_pending_pkt_len);
  313. if (rc < 0)
  314. {
  315. iobuffer_destroy(pkt);
  316. return rc;
  317. }
  318. iobuffer_push_count(pkt, endpt->rx_pending_pkt_len);
  319. }
  320. *p_pkt = pkt;
  321. } else {
  322. if (WSAGetLastError() == WSA_IO_PENDING) {
  323. endpt->rx_pending = 1;
  324. } else {
  325. return -1;
  326. }
  327. }
  328. return 0;
  329. }
  330. static int read_left_pkt(bus_endpt_t *endpt, iobuffer_t **p_pkt)
  331. {
  332. BOOL ret;
  333. int rc;
  334. DWORD dwBytesTransferred;
  335. iobuffer_t *pkt = NULL;
  336. if (endpt->type == TYPE_PIPE) {
  337. ret = GetOverlappedResult(endpt->pipe_handle, &endpt->rx_overlapped, &dwBytesTransferred, TRUE);
  338. } else if (endpt->type == TYPE_TCP) {
  339. DWORD dwFlags = 0;
  340. ret = WSAGetOverlappedResult(endpt->sock_handle, &endpt->rx_overlapped, &dwBytesTransferred, TRUE, &dwFlags);
  341. }
  342. if (!ret || dwBytesTransferred == 0) {
  343. DWORD dwError = GetLastError();
  344. return -1;
  345. }
  346. if (dwBytesTransferred < 4) {
  347. rc = recv_buf(endpt, (char*)&endpt->rx_pending_pkt_len+dwBytesTransferred, 4-dwBytesTransferred);
  348. if (rc < 0)
  349. return rc;
  350. }
  351. pkt = iobuffer_create(-1, endpt->rx_pending_pkt_len);
  352. rc = recv_buf(endpt, iobuffer_data(pkt, 0), endpt->rx_pending_pkt_len);
  353. if (rc < 0)
  354. {
  355. iobuffer_destroy(pkt);
  356. return rc;
  357. }
  358. iobuffer_push_count(pkt, endpt->rx_pending_pkt_len);
  359. *p_pkt = pkt;
  360. endpt->rx_pending = 0;
  361. return 0;
  362. }
  363. static int append_rx_pkt(bus_endpt_t *endpt, iobuffer_t *pkt)
  364. {
  365. int type;
  366. int read_state;
  367. read_state = iobuffer_get_read_state(pkt);
  368. iobuffer_read(pkt, IOBUF_T_I4, &type, 0);
  369. iobuffer_restore_read_state(pkt, read_state);
  370. if (type == BUS_TYPE_PACKET || type == BUS_TYPE_INFO || type == BUS_TYPE_EVENT || type == BUS_TYPE_SYSTEM) {
  371. iobuffer_queue_enqueue(endpt->rx_buf_queue, pkt);
  372. return 1;
  373. } else {
  374. return -1;
  375. }
  376. }
  377. TOOLKIT_API int bus_endpt_create(const char *url, int epid, const bus_endpt_callback *callback, bus_endpt_t **p_endpt)
  378. {
  379. bus_endpt_t *endpt = NULL;
  380. char *tmp_url;
  381. url_fields uf;
  382. int rc;
  383. int v;
  384. iobuffer_t *buf = NULL;
  385. iobuffer_t *ans_buf = NULL;
  386. if (!url)
  387. return -1;
  388. tmp_url = _strdup(url);
  389. if (url_parse(tmp_url, &uf) < 0) {
  390. free(tmp_url);
  391. return -1;
  392. }
  393. endpt = ZALLOC_T(bus_endpt_t);
  394. endpt->sock_handle = -1;
  395. endpt->url = tmp_url;
  396. if (_stricmp(uf.scheme, "tcp") == 0) {
  397. endpt->type = TYPE_TCP;
  398. endpt->sock_handle = create_socket_handle(uf.host, uf.port);
  399. if (endpt->sock_handle == INVALID_SOCKET)
  400. goto on_error;
  401. } else if (_stricmp(uf.scheme, "pipe") == 0) {
  402. endpt->type = TYPE_PIPE;
  403. endpt->pipe_handle = create_pipe_handle(uf.host);
  404. if (endpt->pipe_handle == INVALID_HANDLE_VALUE)
  405. goto on_error;
  406. } else {
  407. goto on_error;
  408. }
  409. endpt->epid = epid;
  410. endpt->tx_evt = CreateEventA(NULL, TRUE, FALSE, NULL);
  411. endpt->rx_evt = CreateEventA(NULL, TRUE, FALSE, NULL);
  412. endpt->rx_buf_queue = iobuffer_queue_create();
  413. endpt->msg_sem = CreateSemaphoreA(NULL, 0, 0x7fffffff, NULL);
  414. INIT_LIST_HEAD(&endpt->msg_list);
  415. spinlock_init(&endpt->msg_lock);
  416. memcpy(&endpt->callback, callback, sizeof(bus_endpt_callback));
  417. buf = iobuffer_create(-1, -1);
  418. v = BUS_TYPE_ENDPT_REGISTER;
  419. iobuffer_write(buf, IOBUF_T_I4, &v, 0);
  420. v = endpt->epid;
  421. iobuffer_write(buf, IOBUF_T_I4, &v, 0);
  422. rc = send_pkt_raw(endpt, buf);
  423. if (rc != 0)
  424. goto on_error;
  425. rc = recv_pkt_raw(endpt, &ans_buf);
  426. if (rc != 0) {
  427. DWORD dwError = GetLastError();
  428. goto on_error;
  429. }
  430. iobuffer_read(ans_buf, IOBUF_T_I4, &v, 0);
  431. iobuffer_read(ans_buf, IOBUF_T_I4, &rc, 0);
  432. if (rc != 0)
  433. goto on_error;
  434. url_free_fields(&uf);
  435. if (buf)
  436. iobuffer_destroy(buf);
  437. if (ans_buf)
  438. iobuffer_destroy(ans_buf);
  439. *p_endpt = endpt;
  440. return 0;
  441. on_error:
  442. if (endpt->type == TYPE_TCP) {
  443. closesocket(endpt->sock_handle);
  444. } else if (endpt->type == TYPE_PIPE) {
  445. CloseHandle(endpt->pipe_handle);
  446. }
  447. if (endpt->msg_sem)
  448. CloseHandle(endpt->msg_sem);
  449. if (endpt->tx_evt)
  450. CloseHandle(endpt->tx_evt);
  451. if (endpt->rx_evt)
  452. CloseHandle(endpt->rx_evt);
  453. if (endpt->rx_buf_queue)
  454. iobuffer_queue_destroy(endpt->rx_buf_queue);
  455. if (endpt->url)
  456. free(endpt->url);
  457. free(endpt);
  458. url_free_fields(&uf);
  459. if (buf)
  460. iobuffer_destroy(buf);
  461. if (ans_buf)
  462. iobuffer_destroy(ans_buf);
  463. return -1;
  464. }
  465. TOOLKIT_API void bus_endpt_destroy(bus_endpt_t *endpt)
  466. {
  467. int rc = -1;
  468. iobuffer_t *buf = NULL;
  469. iobuffer_t *ans_buf = NULL;
  470. int v;
  471. assert(endpt);
  472. buf = iobuffer_create(-1, -1);
  473. v = BUS_TYPE_ENDPT_UNREGISTER;
  474. iobuffer_write(buf, IOBUF_T_I4, &v, 0);
  475. v = endpt->epid;
  476. iobuffer_write(buf, IOBUF_T_I4, &v, 0);
  477. rc = send_pkt_raw(endpt, buf);
  478. if (rc != 0)
  479. goto on_error;
  480. rc = recv_pkt_raw(endpt, &ans_buf);
  481. if (rc != 0)
  482. goto on_error;
  483. iobuffer_read(ans_buf, IOBUF_T_I4, &v, 0);
  484. iobuffer_read(ans_buf, IOBUF_T_I4, &rc, 0);
  485. {
  486. msg_t *msg, *t;
  487. list_for_each_entry_safe(msg, t, &endpt->msg_list, msg_t, entry) {
  488. list_del(&msg->entry);
  489. if (msg->evt)
  490. CloseHandle(msg->evt);
  491. free(msg);
  492. }
  493. }
  494. on_error:
  495. if (buf)
  496. iobuffer_destroy(buf);
  497. if (ans_buf)
  498. iobuffer_destroy(ans_buf);
  499. if (endpt->type == TYPE_TCP) {
  500. closesocket(endpt->sock_handle);
  501. } else if (endpt->type == TYPE_PIPE) {
  502. CloseHandle(endpt->pipe_handle);
  503. }
  504. if (endpt->msg_sem)
  505. CloseHandle(endpt->msg_sem);
  506. if (endpt->tx_evt)
  507. CloseHandle(endpt->tx_evt);
  508. if (endpt->rx_evt)
  509. CloseHandle(endpt->rx_evt);
  510. if (endpt->rx_buf_queue)
  511. iobuffer_queue_destroy(endpt->rx_buf_queue);
  512. if (endpt->url)
  513. free(endpt->url);
  514. free(endpt);
  515. }
  516. // 1 : recv ok
  517. // 0 : time out
  518. // <0 : error
  519. static int bus_endpt_poll_internal(bus_endpt_t *endpt, int *result, int timeout)
  520. {
  521. iobuffer_t *pkt = NULL;
  522. int rc;
  523. BOOL ret;
  524. assert(endpt);
  525. // peek first packge type
  526. if (iobuffer_queue_count(endpt->rx_buf_queue) > 0) {
  527. iobuffer_t *pkt = iobuffer_queue_head(endpt->rx_buf_queue);
  528. int pkt_type;
  529. int read_state = iobuffer_get_read_state(pkt);
  530. iobuffer_read(pkt, IOBUF_T_I4, &pkt_type, NULL);
  531. iobuffer_restore_read_state(pkt, read_state);
  532. *result = to_result(pkt_type);
  533. if (*result == BUS_RESULT_UNKNOWN) {
  534. OutputDebugStringA("bug: unknown pkt type!\n");
  535. return -1;
  536. }
  537. return 1;
  538. }
  539. // no received package, try to receive one
  540. if (!endpt->rx_pending) {
  541. rc = start_read_pkt(endpt, &pkt);
  542. if (rc < 0)
  543. return rc;
  544. if (pkt) {
  545. OutputDebugStringA("pkt has read\n");
  546. rc = append_rx_pkt(endpt, pkt);
  547. if (rc < 0) {
  548. iobuffer_destroy(pkt);
  549. return -1;
  550. }
  551. } else {
  552. OutputDebugStringA("pending\n");
  553. }
  554. }
  555. // if receive is pending, wait for send or receive complete event
  556. if (!pkt) {
  557. HANDLE hs[] = {endpt->msg_sem, endpt->rx_evt };
  558. ret = WaitForMultipleObjects(array_size(hs), &hs[0], FALSE, (DWORD)timeout);
  559. if (ret == WAIT_TIMEOUT) {
  560. return 0;
  561. } else if (ret == WAIT_OBJECT_0) {
  562. *result = BUS_RESULT_MSG; // indicate send package event
  563. return 1;
  564. } else if (ret == WAIT_OBJECT_0 + 1) {
  565. rc = read_left_pkt(endpt, &pkt);
  566. if (rc <0)
  567. return rc;
  568. if (pkt) {
  569. rc = append_rx_pkt(endpt, pkt);
  570. if (rc < 0) {
  571. iobuffer_destroy(pkt);
  572. return -1;
  573. }
  574. }
  575. }
  576. } else {
  577. OutputDebugStringA("pkt has readed\n");
  578. }
  579. if (pkt) {
  580. int type;
  581. int read_state = iobuffer_get_read_state(pkt);
  582. iobuffer_read(pkt, IOBUF_T_I4, &type, 0);
  583. iobuffer_restore_read_state(pkt, read_state);
  584. *result = to_result(type);
  585. if (*result == BUS_RESULT_UNKNOWN) {
  586. OutputDebugStringA("bug: unknown pkt type!\n");
  587. return -1;
  588. }
  589. } else {
  590. return -1;
  591. }
  592. return 1;
  593. }
  594. static int recv_until(bus_endpt_t *endpt, int type, iobuffer_t **p_ansbuf)
  595. {
  596. int rc;
  597. iobuffer_t *ans_pkt = NULL;
  598. int ans_type;
  599. for (;;) {
  600. if (!endpt->rx_pending) {
  601. rc = start_read_pkt(endpt, &ans_pkt);
  602. if (rc < 0) {
  603. DWORD dwError = WSAGetLastError();
  604. break;
  605. }
  606. }
  607. if (!ans_pkt) {
  608. DWORD ret = WaitForSingleObject(endpt->rx_evt, INFINITE);
  609. if (ret != WAIT_OBJECT_0)
  610. return -1;
  611. rc = read_left_pkt(endpt, &ans_pkt);
  612. if (rc < 0)
  613. break;
  614. }
  615. if (ans_pkt) {
  616. int read_state = iobuffer_get_read_state(ans_pkt);
  617. iobuffer_read(ans_pkt, IOBUF_T_I4, &ans_type, 0);
  618. iobuffer_restore_read_state(ans_pkt, read_state);
  619. if (ans_type == type) {
  620. *p_ansbuf = ans_pkt;
  621. break;
  622. } else {
  623. rc = append_rx_pkt(endpt, ans_pkt);
  624. if (rc < 0) {
  625. iobuffer_destroy(ans_pkt);
  626. break;
  627. } else {
  628. ans_pkt = NULL;
  629. }
  630. }
  631. }
  632. }
  633. return rc;
  634. }
  635. static int recv_until_result(bus_endpt_t *endpt, int *p_result)
  636. {
  637. int rc;
  638. iobuffer_t *ans_pkt = NULL;
  639. int type, error;
  640. rc = recv_until(endpt, BUS_TYPE_ERROR, &ans_pkt);
  641. if (rc < 0)
  642. return rc;
  643. iobuffer_read(ans_pkt, IOBUF_T_I4, &type, 0);
  644. iobuffer_read(ans_pkt, IOBUF_T_I4, &error, 0);
  645. iobuffer_destroy(ans_pkt);
  646. *p_result = error;
  647. return rc;
  648. }
  649. static int recv_until_state(bus_endpt_t *endpt, int *p_state)
  650. {
  651. int rc;
  652. iobuffer_t *ans_pkt = NULL;
  653. int type, epid, state;
  654. rc = recv_until(endpt, BUS_TYPE_ENDPT_GET_STATE, &ans_pkt);
  655. if (rc < 0)
  656. return rc;
  657. iobuffer_read(ans_pkt, IOBUF_T_I4, &type, 0);
  658. iobuffer_read(ans_pkt, IOBUF_T_I4, &epid, 0);
  659. iobuffer_read(ans_pkt, IOBUF_T_I4, &state, 0);
  660. iobuffer_destroy(ans_pkt);
  661. *p_state = state;
  662. return rc;
  663. }
  664. TOOLKIT_API int bus_endpt_send_pkt(bus_endpt_t *endpt, int epid, int type, iobuffer_t *pkt)
  665. {
  666. int t;
  667. int rc;
  668. int read_state;
  669. int write_state;
  670. int error;
  671. assert(endpt);
  672. read_state = iobuffer_get_read_state(pkt);
  673. write_state = iobuffer_get_write_state(pkt);
  674. t = epid; // remote epid
  675. iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0);
  676. t = endpt->epid; // local epid
  677. iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0);
  678. t = type; // user type
  679. iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0);
  680. t = BUS_TYPE_PACKET; // type
  681. iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0);
  682. rc = send_pkt_raw(endpt, pkt);
  683. iobuffer_restore_read_state(pkt, read_state);
  684. iobuffer_restore_write_state(pkt, write_state);
  685. if (rc < 0)
  686. return rc;
  687. rc = recv_until_result(endpt, &error);
  688. if (rc == 0 && error != 0)
  689. rc = error;
  690. return rc;
  691. }
  692. TOOLKIT_API int bus_endpt_send_info(bus_endpt_t *endpt, int epid, int type, iobuffer_t *pkt)
  693. {
  694. int t;
  695. int rc;
  696. int read_state;
  697. int write_state;
  698. assert(endpt);
  699. read_state = iobuffer_get_read_state(pkt);
  700. write_state = iobuffer_get_write_state(pkt);
  701. t = epid; // remote epid
  702. iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0);
  703. t = endpt->epid; // local epid
  704. iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0);
  705. t = type; // user type
  706. iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0);
  707. t = BUS_TYPE_INFO; // type
  708. iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0);
  709. rc = send_pkt_raw(endpt, pkt);
  710. iobuffer_restore_read_state(pkt, read_state);
  711. iobuffer_restore_write_state(pkt, write_state);
  712. return rc;
  713. }
  714. TOOLKIT_API int bus_endpt_bcast_evt(bus_endpt_t *endpt, int type, iobuffer_t *pkt)
  715. {
  716. int t;
  717. int rc;
  718. int read_state;
  719. int write_state;
  720. assert(endpt);
  721. read_state = iobuffer_get_read_state(pkt);
  722. write_state = iobuffer_get_write_state(pkt);
  723. t = endpt->epid;
  724. iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0);
  725. t = type;
  726. iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0);
  727. t = BUS_TYPE_EVENT;
  728. iobuffer_write_head(pkt, IOBUF_T_I4, &t, 0);
  729. rc = send_pkt_raw(endpt, pkt);
  730. iobuffer_restore_read_state(pkt, read_state);
  731. iobuffer_restore_write_state(pkt, write_state);
  732. return rc;
  733. }
  734. static int bus_endpt_recv_pkt(bus_endpt_t *endpt, int *p_epid, int *p_type, iobuffer_t **p_pkt)
  735. {
  736. if (iobuffer_queue_count(endpt->rx_buf_queue) > 0) {
  737. iobuffer_t *pkt = iobuffer_queue_head(endpt->rx_buf_queue);
  738. int read_state = iobuffer_get_read_state(pkt);
  739. int pkt_type, usr_type, from_epid, to_epid;
  740. iobuffer_read(pkt, IOBUF_T_I4, &pkt_type, 0);
  741. if (pkt_type == BUS_TYPE_PACKET || pkt_type == BUS_TYPE_INFO) {
  742. iobuffer_read(pkt, IOBUF_T_I4, &usr_type, 0);
  743. iobuffer_read(pkt, IOBUF_T_I4, &from_epid, 0);
  744. iobuffer_read(pkt, IOBUF_T_I4, &to_epid, 0);
  745. if (p_epid)
  746. *p_epid = from_epid;
  747. if (p_type)
  748. *p_type = usr_type;
  749. iobuffer_queue_deque(endpt->rx_buf_queue);
  750. if (p_pkt) {
  751. *p_pkt = pkt;
  752. } else {
  753. iobuffer_destroy(pkt);
  754. }
  755. return 0;
  756. } else {
  757. iobuffer_restore_read_state(pkt, read_state);
  758. }
  759. }
  760. return -1;
  761. }
  762. static int bus_endpt_recv_evt(bus_endpt_t *endpt, int *p_epid, int *p_type, iobuffer_t **p_pkt)
  763. {
  764. if (iobuffer_queue_count(endpt->rx_buf_queue) > 0) {
  765. iobuffer_t *pkt = iobuffer_queue_head(endpt->rx_buf_queue);
  766. int read_state = iobuffer_get_read_state(pkt);
  767. int pkt_type, usr_type, from_epid;
  768. iobuffer_read(pkt, IOBUF_T_I4, &pkt_type, 0);
  769. if (pkt_type == BUS_TYPE_EVENT) {
  770. iobuffer_read(pkt, IOBUF_T_I4, &usr_type, 0);
  771. iobuffer_read(pkt, IOBUF_T_I4, &from_epid, 0);
  772. if (p_epid)
  773. *p_epid = from_epid;
  774. if (p_type)
  775. *p_type = usr_type;
  776. iobuffer_queue_deque(endpt->rx_buf_queue);
  777. if (p_pkt) {
  778. *p_pkt = pkt;
  779. } else {
  780. iobuffer_destroy(pkt);
  781. }
  782. return 0;
  783. } else {
  784. iobuffer_restore_read_state(pkt, read_state);
  785. }
  786. }
  787. return -1;
  788. }
  789. static int bus_endpt_recv_sys(bus_endpt_t *endpt, int *p_epid, int *p_state)
  790. {
  791. if (iobuffer_queue_count(endpt->rx_buf_queue) > 0) {
  792. iobuffer_t *pkt = iobuffer_queue_head(endpt->rx_buf_queue);
  793. int read_state = iobuffer_get_read_state(pkt);
  794. int pkt_type, epid, state;
  795. iobuffer_read(pkt, IOBUF_T_I4, &pkt_type, 0);
  796. if (pkt_type == BUS_TYPE_SYSTEM) {
  797. iobuffer_read(pkt, IOBUF_T_I4, &epid, 0);
  798. iobuffer_read(pkt, IOBUF_T_I4, &state, 0);
  799. if (p_epid)
  800. *p_epid = epid;
  801. if (p_state)
  802. *p_state = state;
  803. iobuffer_queue_deque(endpt->rx_buf_queue);
  804. iobuffer_destroy(pkt);
  805. return 0;
  806. } else {
  807. iobuffer_restore_read_state(pkt, read_state);
  808. }
  809. }
  810. return -1;
  811. }
  812. static int bus_endpt_recv_msg(bus_endpt_t *endpt, msg_t **p_msg)
  813. {
  814. int rc = -1;
  815. assert(endpt);
  816. assert(p_msg);
  817. spinlock_enter(&endpt->msg_lock, -1);
  818. if (!list_empty(&endpt->msg_list)) {
  819. msg_t *e = list_first_entry(&endpt->msg_list, msg_t, entry);
  820. list_del(&e->entry);
  821. rc = 0;
  822. *p_msg = e;
  823. }
  824. spinlock_leave(&endpt->msg_lock);
  825. return rc;
  826. }
  827. TOOLKIT_API int bus_endpt_get_state(bus_endpt_t *endpt, int epid, int *p_state)
  828. {
  829. iobuffer_t *buf = NULL;
  830. int v;
  831. int rc = -1;
  832. assert(endpt);
  833. buf = iobuffer_create(-1, -1);
  834. v = BUS_TYPE_ENDPT_GET_STATE;
  835. iobuffer_write(buf, IOBUF_T_I4, &v, 0);
  836. v = epid;
  837. iobuffer_write(buf, IOBUF_T_I4, &v, 0);
  838. rc = send_pkt_raw(endpt, buf);
  839. if (rc < 0)
  840. goto on_error;
  841. rc = recv_until_state(endpt, p_state);
  842. on_error:
  843. if(buf)
  844. iobuffer_destroy(buf);
  845. return rc;
  846. }
  847. TOOLKIT_API int bus_endpt_post_msg(bus_endpt_t *endpt, int msg, int nparam, int params[])
  848. {
  849. msg_t *e;
  850. assert(endpt);
  851. e = MALLOC_T(msg_t);
  852. e->type = msg;
  853. e->nparam = nparam;
  854. if (nparam) {
  855. e->params = (int*)malloc(sizeof(int)*nparam);
  856. memcpy(e->params, params, sizeof(int)*nparam);
  857. } else {
  858. e->params = NULL;
  859. }
  860. e->evt = NULL;
  861. spinlock_enter(&endpt->msg_lock, -1);
  862. list_add_tail(&e->entry, &endpt->msg_list);
  863. spinlock_leave(&endpt->msg_lock);
  864. ReleaseSemaphore(endpt->msg_sem, 1, NULL);
  865. return 0;
  866. }
  867. TOOLKIT_API int bus_endpt_send_msg(bus_endpt_t *endpt, int msg, int nparam, int params[])
  868. {
  869. msg_t e;
  870. assert(endpt);
  871. e.type = msg;
  872. e.nparam = nparam;
  873. if (nparam) {
  874. e.params = (int*)malloc(sizeof(int)*nparam);
  875. memcpy(e.params, params, sizeof(int)*nparam);
  876. } else {
  877. e.params = NULL;
  878. }
  879. e.evt_result = 0;
  880. e.evt = CreateEventA(NULL, TRUE, FALSE, NULL);
  881. spinlock_enter(&endpt->msg_lock, -1);
  882. list_add_tail(&e.entry, &endpt->msg_list);
  883. spinlock_leave(&endpt->msg_lock);
  884. ReleaseSemaphore(endpt->msg_sem, 1, NULL);
  885. WaitForSingleObject(e.evt, INFINITE);
  886. CloseHandle(e.evt);
  887. if (nparam) {
  888. free(e.params);
  889. }
  890. return e.evt_result;
  891. }
  892. TOOLKIT_API int bus_endpt_get_epid(bus_endpt_t *endpt)
  893. {
  894. return endpt->epid;
  895. }
  896. TOOLKIT_API const char *bus_endpt_get_url(bus_endpt_t *endpt)
  897. {
  898. return endpt->url;
  899. }
  900. TOOLKIT_API int bus_endpt_poll(bus_endpt_t *endpt, int timeout)
  901. {
  902. int result;
  903. int rc;
  904. int epid, type, state;
  905. iobuffer_t *pkt = NULL;
  906. rc = bus_endpt_poll_internal(endpt, &result, timeout);
  907. if (rc > 0) {
  908. if (result == BUS_RESULT_DATA) {
  909. bus_endpt_recv_pkt(endpt, &epid, &type, &pkt);
  910. if (endpt->callback.on_pkt)
  911. endpt->callback.on_pkt(endpt, epid, type, &pkt, endpt->callback.user_data);
  912. if (pkt)
  913. iobuffer_dec_ref(pkt);
  914. } else if (result == BUS_RESULT_INFO) {
  915. bus_endpt_recv_pkt(endpt, &epid, &type, &pkt);
  916. if (endpt->callback.on_inf)
  917. endpt->callback.on_inf(endpt, epid, type, &pkt, endpt->callback.user_data);
  918. if (pkt)
  919. iobuffer_dec_ref(pkt);
  920. } else if (result == BUS_RESULT_EVENT) {
  921. bus_endpt_recv_evt(endpt, &epid, &type, &pkt);
  922. if (endpt->callback.on_evt)
  923. endpt->callback.on_evt(endpt, epid, type, &pkt, endpt->callback.user_data);
  924. if (pkt)
  925. iobuffer_dec_ref(pkt);
  926. } else if (result == BUS_RESULT_SYSTEM) {
  927. bus_endpt_recv_sys(endpt, &epid, &state);
  928. if (endpt->callback.on_sys)
  929. endpt->callback.on_sys(endpt, epid, state, endpt->callback.user_data);
  930. } else if (result == BUS_RESULT_MSG) {
  931. msg_t *msg = NULL;
  932. bus_endpt_recv_msg(endpt, &msg);
  933. if (endpt->callback.on_msg) {
  934. endpt->callback.on_msg(endpt,
  935. msg->type,
  936. msg->nparam,
  937. msg->params,
  938. msg->evt ? &msg->evt_result : NULL,
  939. endpt->callback.user_data);
  940. if (msg->evt) {
  941. SetEvent(msg->evt);
  942. } else {
  943. free_msg(msg);
  944. }
  945. } else {
  946. if (msg->evt) {
  947. msg->evt_result = -1;
  948. SetEvent(msg->evt);
  949. } else {
  950. free_msg(msg);
  951. }
  952. }
  953. } else {
  954. assert(0);
  955. rc = -1;
  956. }
  957. }
  958. return rc;
  959. }
  960. TOOLKIT_API int bus_endpt_set_quit_flag(bus_endpt_t *endpt)
  961. {
  962. endpt->quit_flag = 1;
  963. return 0;
  964. }
  965. TOOLKIT_API int bus_endpt_get_quit_flag(bus_endpt_t *endpt)
  966. {
  967. return endpt->quit_flag;
  968. }