bus_daemon-unix.c 26 KB


  1. #include "precompile.h"
  2. #include "ioqueue.h"
  3. #include "bus.h"
  4. #include "bus_internal.h"
  5. #include "url.h"
  6. #include "refcnt.h"
  7. #include "list.h"
  8. #include "iobuffer.h"
  9. #include "array.h"
  10. #include <time.h>
  11. #include <winpr/synch.h>
  12. #include <winpr/interlocked.h>
  13. #include <winpr/string.h>
  14. #include <winpr/sysinfo.h>
  15. #include <winpr/thread.h>
  16. #include <winpr/winsock.h>
  17. #define TAG TOOLKIT_TAG("bus_deamon")
  18. #define MAX_THREADS 32
  19. #define DEFAULT_ACCEPT_OP_COUNT 1/*5*/
  20. #define MSG_REMOVE_REGISTAR 1
  21. typedef struct endpt_session_t endpt_session_t;
  22. typedef struct daemon_accetpor_t daemon_accetpor_t;
  23. struct endpt_session_t
  24. {
  25. DECLARE_REF_COUNT_MEMBER(ref_cnt);
  26. int epid;
  27. int registered;
  28. union {
  29. ioqueue_tcpsock_t tcp;
  30. ioqueue_file_t pipe;
  31. };
  32. CRITICAL_SECTION lock;
  33. time_t start_time;
  34. LONG err;
  35. int rx_pending_pkt_len;
  36. int type;
  37. int tx_pending;
  38. struct list_head entry;
  39. iobuffer_queue_t *tx_iobuf_queue;
  40. ioqueue_overlapped_t rx_overlapped;
  41. ioqueue_overlapped_t tx_overlapped;
  42. iobuffer_t *rx_pending_buf;
  43. iobuffer_t *tx_pending_buf;
  44. bus_daemon_t *daemon;
  45. daemon_accetpor_t *acceptor;
  46. };
  47. struct daemon_accetpor_t
  48. {
  49. DECLARE_REF_COUNT_MEMBER(ref_cnt);
  50. union {
  51. ioqueue_acceptor_t tcp_acceptor;
  52. ioqueue_pipe_acceptor_t pipe_acceptor;
  53. };
  54. int type;
  55. array_header_t *arr_ov;
  56. bus_daemon_t *daemon;
  57. };
  58. struct bus_daemon_t
  59. {
  60. ioqueue_t *ioq;
  61. volatile LONG lstop;
  62. CRITICAL_SECTION lock;
  63. int nthread;
  64. array_header_t *arr_uri;
  65. array_header_t *arr_acceptor;
  66. array_header_t *arr_thread;
  67. struct list_head registered_session_list;
  68. struct list_head unregistered_session_list;
  69. };
  70. DECLARE_REF_COUNT_STATIC(endpt_session, endpt_session_t)
  71. DECLARE_REF_COUNT_STATIC(daemon_accetpor, daemon_accetpor_t)
  72. static unsigned int __stdcall thread_proc(void *param)
  73. {
  74. bus_daemon_t *daemon = (bus_daemon_t*)param;
  75. SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL);
  76. while (!daemon->lstop) {
  77. ioqueue_poll(daemon->ioq, 10);
  78. }
  79. return 0;
  80. }
  81. static int start_accept(daemon_accetpor_t *acceptor, int idx);
  82. static int queue_ans_pkt(endpt_session_t *session, int rc);
  83. static int start_send_pkt(endpt_session_t *session, iobuffer_t **pkt);
  84. static int session_start_recv_hdr(endpt_session_t *session);
  85. static int queue_sys_pkt(endpt_session_t *session, int epid, int state);
  86. static void daemon_lock(bus_daemon_t *daemon)
  87. {
  88. EnterCriticalSection(&daemon->lock);
  89. }
  90. static void daemon_unlock(bus_daemon_t *daemon)
  91. {
  92. LeaveCriticalSection(&daemon->lock);
  93. }
  94. static void session_lock(endpt_session_t *session)
  95. {
  96. EnterCriticalSection(&session->lock);
  97. }
  98. static void session_unlock(endpt_session_t *session)
  99. {
  100. LeaveCriticalSection(&session->lock);
  101. }
  102. static void add_unregistered_list(endpt_session_t *session)
  103. {
  104. WLog_DBG(TAG, "add session(%d) to unregistered_list", session->epid);
  105. bus_daemon_t *daemon = session->daemon;
  106. daemon_lock(daemon);
  107. list_add_tail(&session->entry, &daemon->unregistered_session_list);
  108. daemon_unlock(daemon);
  109. }
  110. static void remove_session_list(endpt_session_t *session)
  111. {
  112. WLog_DBG(TAG, "remove session(%d) from register list", session->epid);
  113. bus_daemon_t *daemon = session->daemon;
  114. daemon_lock(daemon);
  115. list_del(&session->entry);
  116. if (session->epid != BUS_INVALID_EPID) {
  117. endpt_session_t *pos, *n;
  118. list_for_each_entry_safe(pos, n, &daemon->registered_session_list, endpt_session_t, entry) {
  119. queue_sys_pkt(pos, session->epid, BUS_STATE_OFF);
  120. }
  121. }
  122. daemon_unlock(daemon);
  123. }
  124. static void move_to_registered_session(endpt_session_t *session)
  125. {
  126. WLog_DBG(TAG, "session(%d) move to registered list", session->epid);
  127. bus_daemon_t *daemon = session->daemon;
  128. daemon_lock(daemon);
  129. {
  130. endpt_session_t *pos, *n;
  131. list_for_each_entry_safe(pos, n, &daemon->registered_session_list, endpt_session_t, entry) {
  132. /*tell other session, a new member joins us ???*/
  133. queue_sys_pkt(pos, session->epid, BUS_STATE_ON);
  134. }
  135. }
  136. list_move_tail(&session->entry, &daemon->registered_session_list);
  137. daemon_unlock(daemon);
  138. }
  139. static void move_to_unregistered_session(endpt_session_t *session)
  140. {
  141. bus_daemon_t *daemon = session->daemon;
  142. daemon_lock(daemon);
  143. list_move_tail(&session->entry, &daemon->unregistered_session_list);
  144. daemon_unlock(daemon);
  145. }
  146. static endpt_session_t* create_session(daemon_accetpor_t *acceptor, int type, int fd)
  147. {
  148. int rc;
  149. endpt_session_t *session;
  150. session = ZALLOC_T(endpt_session_t);
  151. if (type == TYPE_PIPE) {
  152. rc = ioqueue_pipe_acceptor_create_client(&acceptor->pipe_acceptor, (HANDLE)fd, &session->pipe);
  153. if (rc < 0)
  154. goto on_error;
  155. } else if (type == TYPE_TCP) {
  156. rc = ioqueue_acceptor_create_client(&acceptor->tcp_acceptor, fd, &session->tcp);
  157. if (rc < 0)
  158. goto on_error;
  159. }
  160. session->epid = BUS_INVALID_EPID;
  161. session->start_time = time(NULL);
  162. session->daemon = acceptor->daemon;
  163. session->acceptor = acceptor;
  164. session->type = type;
  165. REF_COUNT_INIT(&session->ref_cnt);
  166. InitializeCriticalSection(&session->lock);
  167. session->tx_iobuf_queue = iobuffer_queue_create();
  168. return session;
  169. on_error:
  170. free(session);
  171. return NULL;
  172. }
  173. static void destroy_session(endpt_session_t *session)
  174. {
  175. WLog_DBG(TAG, "enter {%s}", __FUNCTION__);
  176. while (iobuffer_queue_count(session->tx_iobuf_queue) > 0) {
  177. iobuffer_t *iobuf = iobuffer_queue_deque(session->tx_iobuf_queue);
  178. endpt_session_t *ts = iobuffer_get_user_data(iobuf);
  179. if (ts) {
  180. queue_ans_pkt(ts, BUS_E_NETBROKEN);
  181. endpt_session_dec_ref(ts);
  182. }
  183. iobuffer_destroy(iobuf);
  184. }
  185. iobuffer_queue_destroy(session->tx_iobuf_queue);
  186. DeleteCriticalSection(&session->lock);
  187. if (session->type == TYPE_PIPE) {
  188. ioqueue_file_destroy(&session->pipe);
  189. } else if (session->type == TYPE_TCP) {
  190. ioqueue_tcpsock_destroy(&session->tcp);
  191. } else {
  192. assert(0);
  193. }
  194. free(session);
  195. }
  196. IMPLEMENT_REF_COUNT_MT_STATIC(endpt_session, endpt_session_t, ref_cnt, destroy_session)
  197. static endpt_session_t *find_session(bus_daemon_t *daemon, int epid)
  198. {
  199. endpt_session_t *session = NULL, *pos;
  200. daemon_lock(daemon);
  201. list_for_each_entry(pos, &daemon->registered_session_list, endpt_session_t, entry) {
  202. if (pos->epid == epid) {
  203. endpt_session_inc_ref(pos);
  204. session = pos;
  205. break;
  206. }
  207. }
  208. daemon_unlock(daemon);
  209. #ifndef NDEBUG
  210. if (!session)
  211. WLog_ERR(TAG, "find session(%d) failed.", epid);
  212. else
  213. WLog_DBG(TAG, "find session(%d) here it is.", epid);
  214. #endif // !NDEBUG
  215. return session;
  216. }
  217. static void on_send_pkt(endpt_session_t *session, int err)
  218. {
  219. iobuffer_t *pkt;
  220. if(err != 0)
  221. WLog_DBG(TAG, "enter {%s} err: %d", __FUNCTION__, err);
  222. session_lock(session);
  223. pkt = session->tx_pending_buf;
  224. session->tx_pending_buf = NULL;
  225. session->tx_pending = 0;
  226. session_unlock(session);
  227. if (pkt) {
  228. if (iobuffer_get_user_data(pkt)) {
  229. endpt_session_t *ts = iobuffer_get_user_data(pkt);
  230. queue_ans_pkt(ts, err ? BUS_E_NETBROKEN : BUS_E_OK);
  231. endpt_session_dec_ref(ts);
  232. }
  233. iobuffer_destroy(pkt);
  234. pkt = NULL;
  235. }
  236. if (!err) {
  237. int rc = 0;
  238. session_lock(session);
  239. if (!session->tx_pending) {
  240. if (iobuffer_queue_count(session->tx_iobuf_queue)) {
  241. iobuffer_t *tpkt = iobuffer_queue_deque(session->tx_iobuf_queue);
  242. session->tx_pending = 1;
  243. rc = start_send_pkt(session, &tpkt);
  244. if (rc < 0) {
  245. session->tx_pending = 0;
  246. }
  247. if (tpkt) {
  248. pkt = tpkt;
  249. }
  250. }
  251. }
  252. session_unlock(session);
  253. if (rc < 0) {
  254. if (InterlockedCompareExchange((LONG*)&session->err, rc, 0) == 0) {
  255. remove_session_list(session);
  256. endpt_session_dec_ref(session);
  257. }
  258. }
  259. if (pkt) {
  260. if (iobuffer_get_user_data(pkt)) {
  261. endpt_session_t *ts = iobuffer_get_user_data(pkt);
  262. queue_ans_pkt(ts, err ? BUS_E_NETBROKEN : BUS_E_OK);
  263. endpt_session_dec_ref(ts);
  264. }
  265. iobuffer_destroy(pkt);
  266. pkt = NULL;
  267. }
  268. } else {
  269. session_lock(session);
  270. while (iobuffer_queue_count(session->tx_iobuf_queue) > 0) {
  271. iobuffer_t *iobuf = iobuffer_queue_deque(session->tx_iobuf_queue);
  272. endpt_session_t *ts = iobuffer_get_user_data(iobuf);
  273. if (ts) {
  274. session_unlock(session);
  275. queue_ans_pkt(ts, BUS_E_NETBROKEN);
  276. session_lock(session);
  277. endpt_session_dec_ref(ts);
  278. }
  279. iobuffer_destroy(iobuf);
  280. }
  281. session_unlock(session);
  282. }
  283. endpt_session_dec_ref(session);
  284. }
  285. static void on_pipe_send_pkt(ioqueue_file_t* file,
  286. ioqueue_overlapped_t *overlapped,
  287. void *buf,
  288. unsigned int transfer_bytes,
  289. void *user_data,
  290. int err)
  291. {
  292. on_send_pkt(user_data, err);
  293. }
  294. static void on_tcp_send_pkt(ioqueue_tcpsock_t* tcpsock,
  295. ioqueue_overlapped_t *overlapped,
  296. void *buf,
  297. unsigned int transfer_bytes,
  298. void *user_data,
  299. int err)
  300. {
  301. on_send_pkt(user_data, err);
  302. }
  303. static int start_send_pkt(endpt_session_t *session, iobuffer_t **pkt)
  304. {
  305. int rc = -1;
  306. int v;
  307. WLog_DBG(TAG, "==>session(%d) start send pkt(err=%d)", session->epid, session->err);
  308. if (session->err < 0)
  309. return rc;
  310. v = iobuffer_get_length(*pkt);
  311. iobuffer_write_head(*pkt, IOBUF_T_I4, &v, 0);
  312. endpt_session_inc_ref(session);
  313. session->tx_pending_buf = *pkt;
  314. *pkt = NULL;
  315. if (session->type == TYPE_PIPE) {
  316. rc = ioqueue_file_async_writen(&session->pipe,
  317. &session->tx_overlapped,
  318. iobuffer_data(session->tx_pending_buf, 0),
  319. iobuffer_get_length(session->tx_pending_buf),
  320. &on_pipe_send_pkt, session);
  321. } else if (session->type == TYPE_TCP) {
  322. rc = ioqueue_tcpsock_async_sendn(&session->tcp,
  323. &session->tx_overlapped,
  324. iobuffer_data(session->tx_pending_buf, 0),
  325. iobuffer_get_length(session->tx_pending_buf),
  326. &on_tcp_send_pkt, session);
  327. } else {
  328. assert(0);
  329. }
  330. if (rc < 0) {
  331. *pkt = session->tx_pending_buf;
  332. endpt_session_dec_ref(session);
  333. WLog_ERR(TAG, "session(%d) start_send_pkt failed: %d", session->epid, rc);
  334. }
  335. return rc;
  336. }
  337. static int queue_tx_pkt(endpt_session_t *session, iobuffer_t **pkt)
  338. {
  339. int rc = -1;
  340. session_lock(session);
  341. WLog_DBG(TAG, "==>session(%d) queue_tx_pkt, pending: %d", session->epid, session->tx_pending);
  342. if (session->tx_pending) {
  343. iobuffer_queue_enqueue(session->tx_iobuf_queue, *pkt);
  344. *pkt = NULL;
  345. rc = 0;
  346. } else {
  347. session->tx_pending = 1;
  348. rc = start_send_pkt(session, pkt);
  349. }
  350. session_unlock(session);
  351. if (rc < 0) {
  352. if (InterlockedCompareExchange((LONG*)&session->err, rc, 0) == 0) {
  353. ioqueue_post_message(session->daemon->ioq, MSG_REMOVE_REGISTAR, (int)session, 0);
  354. }
  355. }
  356. return rc;
  357. }
  358. static iobuffer_t *make_ans_pkt(int rc)
  359. {
  360. int v;
  361. iobuffer_t *pkt = iobuffer_create(-1, -1);
  362. v = BUS_TYPE_ERROR;
  363. iobuffer_write(pkt, IOBUF_T_I4, &v, 0);
  364. v = rc;
  365. iobuffer_write(pkt, IOBUF_T_I4, &v, 0);
  366. return pkt;
  367. }
  368. static int queue_ans_pkt(endpt_session_t *session, int rc)
  369. {
  370. WLog_DBG(TAG, "session(%d) queue ans pkt rc=%d", session->epid, rc);
  371. iobuffer_t *pkt = make_ans_pkt(rc);
  372. int err = queue_tx_pkt(session, &pkt);
  373. if (pkt) {
  374. iobuffer_destroy(pkt);
  375. }
  376. return err;
  377. }
  378. static iobuffer_t *make_ans_state_pkt(bus_daemon_t *daemon, int epid)
  379. {
  380. int v;
  381. endpt_session_t *ts;
  382. iobuffer_t *pkt;
  383. int state;
  384. ts = find_session(daemon, epid);
  385. if (!ts) {
  386. state = BUS_STATE_OFF;
  387. } else {
  388. state = BUS_STATE_ON;
  389. endpt_session_dec_ref(ts);
  390. ts = NULL;
  391. }
  392. pkt = iobuffer_create(-1, -1);
  393. v = BUS_TYPE_ENDPT_GET_STATE;
  394. iobuffer_write(pkt, IOBUF_T_I4, &v, 0);
  395. v = epid;
  396. iobuffer_write(pkt, IOBUF_T_I4, &v, 0);
  397. v = state;
  398. iobuffer_write(pkt, IOBUF_T_I4, &v, 0);
  399. return pkt;
  400. }
  401. static iobuffer_t *make_sys_pkt(int epid, int state)
  402. {
  403. int v;
  404. iobuffer_t *pkt = iobuffer_create(-1, -1);
  405. v = BUS_TYPE_SYSTEM;
  406. iobuffer_write(pkt, IOBUF_T_I4, &v, 0);
  407. v = epid;
  408. iobuffer_write(pkt, IOBUF_T_I4, &v, 0);
  409. v = state;
  410. iobuffer_write(pkt, IOBUF_T_I4, &v, 0);
  411. return pkt;
  412. }
  413. static int queue_sys_pkt(endpt_session_t *session, int epid, int state)
  414. {
  415. iobuffer_t *pkt = make_sys_pkt(epid, state);
  416. int err = queue_tx_pkt(session, &pkt);
  417. if (pkt) {
  418. iobuffer_destroy(pkt);
  419. }
  420. return err;
  421. }
  422. static int on_process_pkt(endpt_session_t *session, iobuffer_t **ppkt)
  423. {
  424. bus_daemon_t *daemon = session->daemon;
  425. iobuffer_t *pkt = *ppkt;
  426. int type;
  427. int read_state;
  428. int rc = -1;
  429. read_state = iobuffer_get_read_state(pkt);
  430. iobuffer_read(pkt, IOBUF_T_I4, &type, 0);
  431. WLog_DBG(TAG, "==> process pkt which type: 0x%08X", type);
  432. if (session->registered) {
  433. if (type == BUS_TYPE_ENDPT_UNREGISTER) {
  434. move_to_unregistered_session(session);
  435. session->registered = 0;
  436. queue_ans_pkt(session, BUS_E_OK);
  437. rc = -1;
  438. } else if (type == BUS_TYPE_ENDPT_GET_STATE) {
  439. int epid;
  440. iobuffer_t *ans_pkt;
  441. iobuffer_read(pkt, IOBUF_T_I4, &epid, 0);
  442. ans_pkt = make_ans_state_pkt(session->daemon, epid);
  443. rc = queue_tx_pkt(session, &ans_pkt);
  444. if (ans_pkt)
  445. iobuffer_dec_ref(ans_pkt);
  446. } else if (type == BUS_TYPE_PACKET) {
  447. int from_epid;
  448. int to_epid;
  449. int user_type;
  450. iobuffer_read(pkt, IOBUF_T_I4, &user_type, 0);
  451. iobuffer_read(pkt, IOBUF_T_I4, &from_epid, 0);
  452. iobuffer_read(pkt, IOBUF_T_I4, &to_epid, 0);
  453. iobuffer_restore_read_state(pkt, read_state);
  454. if (to_epid == session->epid) {
  455. endpt_session_inc_ref(session);
  456. iobuffer_set_user_data(pkt, session);
  457. rc = queue_tx_pkt(session, ppkt);
  458. if (rc < 0) {
  459. endpt_session_dec_ref(session);
  460. iobuffer_set_user_data(pkt, NULL);
  461. }
  462. } else {
  463. endpt_session_t *ts = find_session(session->daemon, to_epid);
  464. if (!ts) {
  465. rc = queue_ans_pkt(session, BUS_E_NOTFOUND);
  466. } else {
  467. if (ts->err) {
  468. rc = queue_ans_pkt(session, BUS_E_NETBROKEN);
  469. } else {
  470. endpt_session_inc_ref(session);
  471. iobuffer_set_user_data(pkt, session);
  472. rc = queue_tx_pkt(ts, ppkt);
  473. if (rc < 0) {
  474. endpt_session_dec_ref(session);
  475. iobuffer_set_user_data(pkt, NULL);
  476. rc = queue_ans_pkt(session, BUS_E_NETBROKEN);
  477. }
  478. }
  479. endpt_session_dec_ref(ts); // find session
  480. }
  481. }
  482. } else if (type == BUS_TYPE_EVENT) {
  483. int epid;
  484. int user_type;
  485. iobuffer_read(pkt, IOBUF_T_I4, &user_type, 0);
  486. iobuffer_read(pkt, IOBUF_T_I4, &epid, 0);
  487. iobuffer_restore_read_state(pkt, read_state);
  488. daemon_lock(session->daemon);
  489. {
  490. endpt_session_t *pos, *n;
  491. list_for_each_entry_safe(pos, n, &daemon->registered_session_list, endpt_session_t, entry) {
  492. if (pos != session) {
  493. iobuffer_t *tbuf = iobuffer_clone(pkt);
  494. queue_tx_pkt(pos, &tbuf);
  495. if (tbuf)
  496. iobuffer_destroy(tbuf);
  497. }
  498. }
  499. }
  500. daemon_unlock(session->daemon);
  501. rc = 0;
  502. } else if (type == BUS_TYPE_INFO) {
  503. int from_epid;
  504. int to_epid;
  505. int user_type;
  506. iobuffer_read(pkt, IOBUF_T_I4, &user_type, 0);
  507. iobuffer_read(pkt, IOBUF_T_I4, &from_epid, 0);
  508. iobuffer_read(pkt, IOBUF_T_I4, &to_epid, 0);
  509. iobuffer_restore_read_state(pkt, read_state);
  510. if (to_epid == session->epid) {
  511. queue_tx_pkt(session, ppkt);
  512. } else {
  513. endpt_session_t *ts = find_session(session->daemon, to_epid);
  514. if (ts) {
  515. if (!ts->err) {
  516. queue_tx_pkt(ts, ppkt);
  517. }
  518. endpt_session_dec_ref(ts); // find session
  519. }
  520. }
  521. rc = 0;
  522. }
  523. } else {
  524. if (type == BUS_TYPE_ENDPT_REGISTER) {
  525. int epid;
  526. endpt_session_t *ts;
  527. iobuffer_read(pkt, IOBUF_T_I4, &epid, 0);
  528. ts = find_session(session->daemon, epid);
  529. if (!ts) {
  530. session->registered = 1;
  531. session->epid = epid;
  532. rc = queue_ans_pkt(session, BUS_E_OK);
  533. if (rc == 0) {
  534. move_to_registered_session(session);
  535. }
  536. } else {
  537. endpt_session_dec_ref(ts);
  538. }
  539. }
  540. }
  541. if (*ppkt)
  542. iobuffer_restore_read_state(pkt, read_state);
  543. return rc;
  544. }
  545. static void on_recv_body(endpt_session_t *session, unsigned int transfer_bytes, int err)
  546. {
  547. int rc = -1;
  548. WLog_DBG(TAG, "==> on_recv_body(err=%d)", err);
  549. if (!err) {
  550. iobuffer_t *pkt = session->rx_pending_buf;
  551. session->rx_pending_buf = NULL;
  552. iobuffer_push_count(pkt, transfer_bytes);
  553. rc = on_process_pkt(session, &pkt);
  554. if (rc == 0) {
  555. rc = session_start_recv_hdr(session);
  556. }
  557. if (pkt)
  558. iobuffer_dec_ref(pkt);
  559. }
  560. if (rc < 0) {
  561. if (InterlockedCompareExchange(&session->err, rc, 0) == 0) {
  562. remove_session_list(session);
  563. endpt_session_dec_ref(session);
  564. }
  565. }
  566. endpt_session_dec_ref(session);
  567. }
  568. static void on_pipe_recv_body(ioqueue_file_t* file,
  569. ioqueue_overlapped_t *overlapped,
  570. void *buf,
  571. unsigned int transfer_bytes,
  572. void *user_data,
  573. int err)
  574. {
  575. on_recv_body(user_data, transfer_bytes, err);
  576. }
  577. static void on_tcp_recv_body(ioqueue_tcpsock_t* tcpsock,
  578. ioqueue_overlapped_t *overlapped,
  579. void *buf,
  580. unsigned int transfer_bytes,
  581. void *user_data,
  582. int err)
  583. {
  584. on_recv_body(user_data, transfer_bytes, err);
  585. }
  586. static int session_start_recv_body(endpt_session_t *session)
  587. {
  588. int rc = -1;
  589. WLog_DBG(TAG, "==> session(%d) start recv body(err=%d)", session->epid, session->err);
  590. if (session->err < 0)
  591. return rc;
  592. endpt_session_inc_ref(session);
  593. session->rx_pending_buf = iobuffer_create(-1, session->rx_pending_pkt_len);
  594. if (session->type == TYPE_PIPE) {
  595. rc = ioqueue_file_async_readn(&session->pipe,
  596. &session->rx_overlapped,
  597. iobuffer_data(session->rx_pending_buf, 0),
  598. session->rx_pending_pkt_len,
  599. &on_pipe_recv_body,
  600. session);
  601. } else if (session->type == TYPE_TCP) {
  602. rc = ioqueue_tcpsock_async_recvn(&session->tcp,
  603. &session->rx_overlapped,
  604. iobuffer_data(session->rx_pending_buf, 0),
  605. session->rx_pending_pkt_len,
  606. &on_tcp_recv_body,
  607. session);
  608. }
  609. if (rc < 0) {
  610. iobuffer_destroy(session->rx_pending_buf);
  611. session->rx_pending_buf = NULL;
  612. endpt_session_dec_ref(session);
  613. }
  614. return rc;
  615. }
  616. static void on_recv_hdr(endpt_session_t *session, int err)
  617. {
  618. int rc = -1;
  619. WLog_DBG(TAG, "==> on_recv_hdr(err=%d)", err);
  620. if (!err) {
  621. rc = session_start_recv_body(session);
  622. }
  623. if (rc < 0) {
  624. if (InterlockedCompareExchange(&session->err, rc, 0) == 0) {
  625. remove_session_list(session);
  626. endpt_session_dec_ref(session);
  627. }
  628. }
  629. endpt_session_dec_ref(session);
  630. }
  631. static void on_pipe_recv_hdr(ioqueue_file_t *file,
  632. ioqueue_overlapped_t *overlapped,
  633. void *buf,
  634. unsigned int transfer_bytes,
  635. void *user_data,
  636. int err)
  637. {
  638. on_recv_hdr(user_data, err);
  639. }
  640. static void on_tcp_recv_hdr(ioqueue_tcpsock_t *tcpsock,
  641. ioqueue_overlapped_t *overlapped,
  642. void *buf,
  643. unsigned int transfer_bytes,
  644. void *user_data,
  645. int err)
  646. {
  647. on_recv_hdr(user_data, err);
  648. }
  649. static int session_start_recv_hdr(endpt_session_t *session)
  650. {
  651. int rc = -1;
  652. WLog_DBG(TAG, "==> session(%d) start recv hdr(err=%d)", session->epid, session->err);
  653. if (session->err < 0)
  654. return rc;
  655. endpt_session_inc_ref(session);
  656. if (session->type == TYPE_PIPE) {
  657. rc = ioqueue_file_async_readn(&session->pipe,
  658. &session->rx_overlapped,
  659. &session->rx_pending_pkt_len,
  660. 4,
  661. &on_pipe_recv_hdr,
  662. session);
  663. } else if (session->type == TYPE_TCP) {
  664. rc = ioqueue_tcpsock_async_recvn(&session->tcp,
  665. &session->rx_overlapped,
  666. &session->rx_pending_pkt_len,
  667. 4,
  668. &on_tcp_recv_hdr,
  669. session);
  670. }
  671. if (rc < 0)
  672. endpt_session_dec_ref(session);
  673. return rc;
  674. }
  675. static int on_msg(unsigned short msg_id, param_size_t param1, param_size_t param2)
  676. {
  677. WLog_DBG(TAG, "==> on_msg(id=%d, param1=%d, param2=%d)", msg_id, param1, param2);
  678. if (msg_id == MSG_REMOVE_REGISTAR) {
  679. endpt_session_t *session = (endpt_session_t*)param1;
  680. remove_session_list(session);
  681. endpt_session_dec_ref(session);
  682. }
  683. return TRUE;
  684. }
  685. static int on_accept(daemon_accetpor_t *dacceptor, void *user_data, int fd, int err)
  686. {
  687. int idx = (int)user_data;
  688. WLog_DBG(TAG, "==> on_accept(fd=%d, err=%d)", fd, err);
  689. if (!err) {
  690. endpt_session_t *session = create_session(dacceptor, dacceptor->type, fd);
  691. if (session) {
  692. int rc;
  693. add_unregistered_list(session);
  694. rc = session_start_recv_hdr(session);
  695. if (rc < 0) {
  696. remove_session_list(session);
  697. endpt_session_dec_ref(session);
  698. }
  699. }
  700. start_accept(dacceptor, idx);
  701. }
  702. daemon_accetpor_dec_ref(dacceptor);
  703. return 1;
  704. }
  705. static int on_pipe_accept_callback(ioqueue_pipe_acceptor_t *acceptor,
  706. ioqueue_overlapped_t *overlapped,
  707. HANDLE pipe,
  708. void *user_data,
  709. int err)
  710. {
  711. daemon_accetpor_t *dacceptor = CONTAINING_RECORD(acceptor, daemon_accetpor_t, pipe_acceptor);
  712. return on_accept(dacceptor, user_data, (int)pipe, err);
  713. }
  714. static int on_tcp_accept_callback(ioqueue_acceptor_t *acceptor,
  715. ioqueue_overlapped_t *overlapped,
  716. SOCKET in_sock,
  717. void *user_data,
  718. int err)
  719. {
  720. daemon_accetpor_t *dacceptor = CONTAINING_RECORD(acceptor, daemon_accetpor_t, tcp_acceptor);
  721. return on_accept(dacceptor, user_data, in_sock, err);
  722. }
  723. static int start_accept(daemon_accetpor_t *acceptor, int idx)
  724. {
  725. int rc = -1;
  726. daemon_accetpor_inc_ref(acceptor);
  727. if (acceptor->type == TYPE_PIPE) {
  728. ioqueue_overlapped_t *ov = ARRAY_IDX(acceptor->arr_ov, idx, ioqueue_overlapped_t*);
  729. rc = ioqueue_pipe_acceptor_async_accept(&acceptor->pipe_acceptor, ov, &on_pipe_accept_callback, (void*)idx);
  730. } else if (acceptor->type == TYPE_TCP) {
  731. ioqueue_overlapped_t *ov = ARRAY_IDX(acceptor->arr_ov, idx, ioqueue_overlapped_t*);
  732. /*register epoll read event*/
  733. rc = ioqueue_acceptor_async_accept(&acceptor->tcp_acceptor, ov, &on_tcp_accept_callback, (void*)idx);
  734. } else {
  735. assert(0);
  736. }
  737. if (rc < 0)
  738. daemon_accetpor_dec_ref(acceptor);
  739. return rc;
  740. }
  741. static daemon_accetpor_t* create_daemon_acceptor(bus_daemon_t *daemon, char *url)
  742. {
  743. url_fields uf;
  744. int kk;
  745. int rc;
  746. daemon_accetpor_t *dacceptor = NULL;
  747. dacceptor = MALLOC_T(daemon_accetpor_t);
  748. if (url_parse(url, &uf) == 0) {
  749. if (_stricmp(uf.scheme, "tcp") == 0) {
  750. rc = ioqueue_acceptor_create(daemon->ioq, uf.host, uf.port, &dacceptor->tcp_acceptor);
  751. if (rc < 0) {
  752. free(dacceptor);
  753. url_free_fields(&uf);
  754. return NULL;
  755. }
  756. ioqueue_acceptor_listen(&dacceptor->tcp_acceptor, 10);
  757. dacceptor->type = TYPE_TCP;
  758. } else if (_stricmp(uf.scheme, "pipe") == 0) {
  759. rc = ioqueue_pipe_acceptor_create(daemon->ioq, uf.host, &dacceptor->tcp_acceptor);
  760. if (rc < 0) {
  761. free(dacceptor);
  762. url_free_fields(&uf);
  763. return NULL;
  764. }
  765. dacceptor->type = TYPE_PIPE;
  766. }
  767. url_free_fields(&uf);
  768. }
  769. dacceptor->daemon = daemon;
  770. REF_COUNT_INIT(&dacceptor->ref_cnt);
  771. dacceptor->arr_ov = array_make(DEFAULT_ACCEPT_OP_COUNT, sizeof(ioqueue_overlapped_t*));
  772. for (kk = 0; kk < DEFAULT_ACCEPT_OP_COUNT; ++kk)
  773. ARRAY_PUSH(dacceptor->arr_ov, ioqueue_overlapped_t*) = MALLOC_T(ioqueue_overlapped_t);
  774. return dacceptor;
  775. }
  776. static void destroy_daemon_acceptor(daemon_accetpor_t *dacceptor)
  777. {
  778. int i;
  779. for (i = 0; i < dacceptor->arr_ov->nelts; ++i)
  780. free(ARRAY_IDX(dacceptor->arr_ov, i, ioqueue_overlapped_t*));
  781. array_free(dacceptor->arr_ov);
  782. if (dacceptor->type == TYPE_PIPE) {
  783. ioqueue_pipe_acceptor_destroy(&dacceptor->pipe_acceptor);
  784. } else if (dacceptor->type == TYPE_TCP) {
  785. ioqueue_acceptor_destroy(&dacceptor->tcp_acceptor);
  786. }
  787. free(dacceptor);
  788. }
  789. IMPLEMENT_REF_COUNT_MT_STATIC(daemon_accetpor, daemon_accetpor_t, ref_cnt, destroy_daemon_acceptor)
  790. TOOLKIT_API int bus_daemon_create(int n_url, const char *urls[], int nthread, bus_daemon_t **p_daemon)
  791. {
  792. bus_daemon_t *daemon;
  793. int i;
  794. if (n_url == 0)
  795. return -1;
  796. if (nthread <= 0) {
  797. SYSTEM_INFO si;
  798. GetSystemInfo(&si);
  799. nthread = min(MAX_THREADS, si.dwNumberOfProcessors << 1);
  800. }
  801. //TODO: ÔÝÇÒÖÃΪ 1
  802. nthread = 1;
  803. WLog_DBG(TAG, "thread num: %d", nthread);
  804. daemon = MALLOC_T(bus_daemon_t);
  805. memset(daemon, 0, sizeof(bus_daemon_t));
  806. daemon->nthread = nthread;
  807. daemon->arr_acceptor = array_make(n_url, sizeof(daemon_accetpor_t*));
  808. daemon->arr_thread = array_make(nthread, sizeof(HANDLE));
  809. daemon->arr_uri = array_make(n_url, sizeof(char*));
  810. for (i = 0; i < n_url; ++i) {
  811. WLog_DBG(TAG, "urls[%d]: %s", i, urls[i]);
  812. ARRAY_PUSH(daemon->arr_uri, char*) = _strdup(urls[i]);
  813. }
  814. InitializeCriticalSection(&daemon->lock);
  815. INIT_LIST_HEAD(&daemon->registered_session_list);
  816. INIT_LIST_HEAD(&daemon->unregistered_session_list);
  817. *p_daemon = daemon;
  818. return 0;
  819. }
  820. TOOLKIT_API int bus_daemon_destroy(bus_daemon_t *daemon)
  821. {
  822. int i;
  823. DeleteCriticalSection(&daemon->lock);
  824. for (i = 0; i < daemon->arr_uri->nelts; ++i)
  825. free(ARRAY_IDX(daemon->arr_uri, i, char*));
  826. array_free(daemon->arr_uri);
  827. array_free(daemon->arr_acceptor);
  828. array_free(daemon->arr_thread);
  829. free(daemon);
  830. return 0;
  831. }
  832. TOOLKIT_API int bus_daemon_start(bus_daemon_t *daemon)
  833. {
  834. int i;
  835. daemon->ioq = ioqueue_create();
  836. if (!daemon->ioq) {
  837. WLog_ERR(TAG, "ioqueuq create failed!");
  838. return -1;
  839. }
  840. ioqueue_set_user_data(daemon->ioq, daemon);
  841. ioqueue_msg_add_handler(daemon->ioq, MSG_REMOVE_REGISTAR, 0, &on_msg);
  842. for (i = 0; i < daemon->arr_uri->nelts; ++i) {
  843. char *url = ARRAY_IDX(daemon->arr_uri, i, char*);
  844. daemon_accetpor_t *dacceptor = create_daemon_acceptor(daemon, url);
  845. if (dacceptor) {
  846. int kk;
  847. for (kk = 0; kk < DEFAULT_ACCEPT_OP_COUNT; ++kk) //ÊýÁ¿5£¿
  848. start_accept(dacceptor, kk);
  849. ARRAY_PUSH(daemon->arr_acceptor, daemon_accetpor_t*) = dacceptor;
  850. } else {
  851. WLog_ERR(TAG, "create daemon acceptor failed!");
  852. return -1;
  853. }
  854. }
  855. daemon->lstop = 0;
  856. for (i = 0; i < daemon->nthread; ++i) {
  857. HANDLE thread = (HANDLE)_beginthreadex(NULL, 0, &thread_proc, daemon, 0, NULL);
  858. if (thread) {
  859. ARRAY_PUSH(daemon->arr_thread, HANDLE) = thread;
  860. } else {
  861. return -1;
  862. }
  863. }
  864. return 0;
  865. }
  866. TOOLKIT_API int bus_daemon_stop(bus_daemon_t *daemon)
  867. {
  868. int i;
  869. // exit all worker thread
  870. InterlockedExchange(&daemon->lstop, 1);
  871. for (i = 0; i < daemon->arr_thread->nelts; ++i) {
  872. HANDLE t = ARRAY_IDX(daemon->arr_thread, i, HANDLE);
  873. WaitForSingleObject(t, INFINITE);
  874. CloseHandle(t);
  875. }
  876. array_clear(daemon->arr_thread);
  877. // close all pending handles
  878. {
  879. int i;
  880. endpt_session_t *pos;
  881. for (i = 0; i < daemon->arr_acceptor->nelts; ++i) {
  882. daemon_accetpor_t *dacceptor = ARRAY_IDX(daemon->arr_acceptor, i, daemon_accetpor_t*);
  883. if (dacceptor->type == TYPE_PIPE) {
  884. ioqueue_pipe_acceptor_close_pending_handle(&dacceptor->pipe_acceptor);
  885. } else if (dacceptor->type == TYPE_TCP) {
  886. ioqueue_acceptor_close(&dacceptor->tcp_acceptor);
  887. }
  888. daemon_accetpor_dec_ref(dacceptor);
  889. }
  890. array_clear(daemon->arr_acceptor);
  891. list_for_each_entry(pos, &daemon->registered_session_list, endpt_session_t, entry) {
  892. if (pos->type == TYPE_PIPE) {
  893. ioqueue_file_close(&pos->pipe);
  894. } else if (pos->type == TYPE_TCP) {
  895. ioqueue_tcpsock_close(&pos->tcp);
  896. }
  897. }
  898. list_for_each_entry(pos, &daemon->unregistered_session_list, endpt_session_t, entry) {
  899. if (pos->type == TYPE_PIPE) {
  900. ioqueue_file_close(&pos->pipe);
  901. } else if (pos->type == TYPE_TCP) {
  902. ioqueue_tcpsock_close(&pos->tcp);
  903. }
  904. }
  905. }
  906. // poll until all pending io are aborted
  907. while (!ioqueue_can_exit(daemon->ioq)) {
  908. ioqueue_poll(daemon->ioq, 10);
  909. }
  910. ioqueue_destroy(daemon->ioq);
  911. return 0;
  912. }