sp_iom.c 11 KB


  1. #include "precompile.h"
  2. #include "sp_iom.h"
  3. #include "sp_def.h"
  4. #include "sp_dbg_export.h"
  5. #include "spinlock.h"
  6. #include "timerqueue.h"
  7. #include "array.h"
  8. #include "memutil.h"
  9. #include <time.h>
  10. #define POLL_INTERVAL 10
  11. #define IOM_T_EXIT 0
  12. #define IOM_T_GET_STATE 1
  13. #define IOM_T_SEND_INFO 2
  14. static int translate_error(int bus_error)
  15. {
  16. int error;
  17. switch (bus_error) {
  18. case BUS_E_OK:
  19. error = Error_Succeed;
  20. break;
  21. case BUS_E_FAIL:
  22. error = Error_Unexpect;
  23. break;
  24. case BUS_E_NETBROKEN:
  25. error = Error_NetBroken;
  26. break;
  27. case BUS_E_NOTFOUND:
  28. error = Error_NotExist;
  29. break;
  30. default:
  31. error = Error_Unexpect;
  32. break;
  33. }
  34. return error;
  35. }
  36. typedef struct pkt_handler_t {
  37. int key;
  38. sp_iom_on_pkt on_pkt;
  39. void *user_data;
  40. }pkt_handler_t;
  41. typedef struct sys_handler_t {
  42. int key;
  43. sp_iom_on_sys on_sys;
  44. void *user_data;
  45. }sys_handler_t;
  46. struct sp_iom_t
  47. {
  48. bus_endpt_t *endpt;
  49. timer_queue_t *tm_queue;
  50. spinlock_t tm_lock;
  51. spinlock_t pkt_handler_lock;
  52. spinlock_t sys_handler_lock;
  53. array_header_t *arr_pkt_handler;
  54. array_header_t *arr_sys_handler;
  55. int stop;
  56. int poll_thread_id;
  57. };
  58. static __inline void iom_pkt_handler_lock(sp_iom_t *iom)
  59. {
  60. spinlock_enter(&iom->pkt_handler_lock, -1);
  61. }
  62. static __inline void iom_pkt_handler_unlock(sp_iom_t *iom)
  63. {
  64. spinlock_leave(&iom->pkt_handler_lock);
  65. }
  66. static __inline void iom_sys_handler_lock(sp_iom_t *iom)
  67. {
  68. spinlock_enter(&iom->sys_handler_lock, -1);
  69. }
  70. static __inline void iom_sys_handler_unlock(sp_iom_t *iom)
  71. {
  72. spinlock_leave(&iom->sys_handler_lock);
  73. }
  74. static __inline void iom_tm_lock(sp_iom_t *iom)
  75. {
  76. spinlock_enter(&iom->tm_lock, -1);
  77. }
  78. static __inline void iom_tm_unlock(sp_iom_t *iom)
  79. {
  80. spinlock_leave(&iom->tm_lock);
  81. }
  82. static pkt_handler_t* find_pkt_handler(sp_iom_t *iom, int key, int *idx)
  83. {
  84. int i;
  85. for (i = 0; i < iom->arr_pkt_handler->nelts; ++i) {
  86. pkt_handler_t *t = ARRAY_IDX(iom->arr_pkt_handler, i, pkt_handler_t*);
  87. if (t->key == key) {
  88. if (idx)
  89. *idx = i;
  90. return t;
  91. }
  92. }
  93. return NULL;
  94. }
  95. static sys_handler_t* find_sys_handler(sp_iom_t *iom, int key, int *idx)
  96. {
  97. int i;
  98. for (i = 0; i < iom->arr_sys_handler->nelts; ++i) {
  99. sys_handler_t *t = ARRAY_IDX(iom->arr_sys_handler, i, sys_handler_t*);
  100. if (t->key == key) {
  101. if (idx)
  102. *idx = i;
  103. return t;
  104. }
  105. }
  106. return NULL;
  107. }
  108. static void on_pkt(bus_endpt_t *endpt, int epid, int type, iobuffer_t **p_pkt, void *user_data)
  109. {
  110. sp_iom_t *iom = (sp_iom_t *)user_data;
  111. int i;
  112. iom_pkt_handler_lock(iom);
  113. for (i = 0; i < iom->arr_pkt_handler->nelts; ++i) {
  114. pkt_handler_t *pkt_handler = ARRAY_IDX(iom->arr_pkt_handler, i, pkt_handler_t*);
  115. if (pkt_handler->on_pkt) {
  116. int read_state = iobuffer_get_read_state(*p_pkt);
  117. int write_state = iobuffer_get_write_state(*p_pkt);
  118. int svc_id;
  119. int from_svc_id;
  120. int pkt_id;
  121. int count;
  122. iobuffer_format_read(*p_pkt, "444", &from_svc_id, &svc_id, &pkt_id);
  123. count = pkt_handler->on_pkt(iom, svc_id, epid, from_svc_id, type, pkt_id, p_pkt, pkt_handler->user_data);
  124. if (count && p_pkt && *p_pkt) {
  125. iobuffer_restore_read_state(*p_pkt, read_state);
  126. iobuffer_restore_write_state(*p_pkt, write_state);
  127. } else {
  128. break;
  129. }
  130. }
  131. }
  132. iom_pkt_handler_unlock(iom);
  133. }
  134. static void on_msg(bus_endpt_t *endpt, int msg, int nparam, int params[], int *result, void *user_data)
  135. {
  136. sp_iom_t *iom = (sp_iom_t *)user_data;
  137. if (msg == IOM_T_SEND_INFO) {
  138. int pkt_type = params[0];
  139. int this_svc_id = params[1];
  140. int epid = params[2];
  141. int svc_id = params[3];
  142. int pkt_id = params[4];
  143. iobuffer_t *pkt = (iobuffer_t*)params[5];
  144. int read_state, write_state;
  145. int ret;
  146. read_state = iobuffer_get_read_state(pkt);
  147. write_state = iobuffer_get_write_state(pkt);
  148. iobuffer_write_head(pkt, IOBUF_T_I4, &pkt_id, 0);
  149. iobuffer_write_head(pkt, IOBUF_T_I4, &svc_id, 0);
  150. iobuffer_write_head(pkt, IOBUF_T_I4, &this_svc_id, 0);
  151. if (result) {
  152. int state;
  153. ret = bus_endpt_get_state(endpt, epid, &state);
  154. if (ret == 0)
  155. ret = (state == BUS_STATE_ON) ? 0 : -1;
  156. if (ret == 0)
  157. ret = bus_endpt_send_info(endpt, epid, pkt_type, pkt);
  158. *result = ret;
  159. if (ret != 0) {
  160. iobuffer_restore_read_state(pkt, read_state);
  161. iobuffer_restore_write_state(pkt, write_state);
  162. } else {
  163. iobuffer_dec_ref(pkt);
  164. }
  165. } else {
  166. bus_endpt_send_info(endpt, epid, pkt_type, pkt);
  167. iobuffer_dec_ref(pkt);
  168. }
  169. //sp_dbg_debug("on_msg send_info end, %d, %d, %d, %d", this_svc_id, epid, svc_id, pkt_id);
  170. } else if (msg == IOM_T_GET_STATE) {
  171. int epid = params[0];
  172. int *state = (int*)params[1];
  173. int *rc = (int*)params[2];
  174. *rc = bus_endpt_get_state(endpt, epid, state);
  175. } else if (msg == IOM_T_EXIT) {
  176. sp_iom_stop(iom);
  177. } else {
  178. assert(0);
  179. }
  180. }
  181. static void on_sys(bus_endpt_t *endpt, int epid, int state, void *user_data)
  182. {
  183. sp_iom_t *iom = (sp_iom_t *)user_data;
  184. int i;
  185. iom_sys_handler_lock(iom);
  186. for (i = 0; i < iom->arr_sys_handler->nelts; ++i) {
  187. sys_handler_t *sys_handler = ARRAY_IDX(iom->arr_sys_handler, i, sys_handler_t*);
  188. if (sys_handler->on_sys) {
  189. sys_handler->on_sys(iom, epid, state, sys_handler->user_data);
  190. }
  191. }
  192. iom_sys_handler_unlock(iom);
  193. }
  194. int sp_iom_create(const char *url, int epid, sp_iom_t **p_iom)
  195. {
  196. sp_iom_t *iom;
  197. bus_endpt_callback callback;
  198. int rc;
  199. iom = ZALLOC_T(sp_iom_t);
  200. callback.on_evt = NULL;
  201. callback.on_sys = &on_sys;
  202. callback.on_pkt = &on_pkt;
  203. callback.on_inf = &on_pkt;
  204. callback.on_msg = &on_msg;
  205. callback.user_data = iom;
  206. rc = bus_endpt_create(url, epid, &callback, &iom->endpt);
  207. if (rc != 0)
  208. goto on_error;
  209. timer_heap_create(&iom->tm_queue);
  210. spinlock_init(&iom->pkt_handler_lock);
  211. spinlock_init(&iom->sys_handler_lock);
  212. spinlock_init(&iom->tm_lock);
  213. iom->arr_pkt_handler = array_make(3, sizeof(pkt_handler_t*));
  214. iom->arr_sys_handler = array_make(3, sizeof(sys_handler_t*));
  215. iom->poll_thread_id = 0;
  216. *p_iom = iom;
  217. return 0;
  218. on_error:
  219. free(iom);
  220. return Error_Unexpect;
  221. }
  222. void sp_iom_destroy(sp_iom_t *iom)
  223. {
  224. timer_queue_destroy(iom->tm_queue);
  225. bus_endpt_destroy(iom->endpt);
  226. array_free(iom->arr_pkt_handler);
  227. array_free(iom->arr_sys_handler);
  228. free(iom);
  229. }
  230. int sp_iom_add_pkt_handler(sp_iom_t *iom, int key, sp_iom_on_pkt on_pkt, void *user_data)
  231. {
  232. int rc = 0;
  233. pkt_handler_t *pkt_handler;
  234. iom_pkt_handler_lock(iom);
  235. pkt_handler = find_pkt_handler(iom, key, NULL);
  236. if (!pkt_handler) {
  237. pkt_handler = MALLOC_T(pkt_handler_t);
  238. pkt_handler->key = key;
  239. pkt_handler->on_pkt = on_pkt;
  240. pkt_handler->user_data = user_data;
  241. ARRAY_PUSH(iom->arr_pkt_handler, pkt_handler_t*) = pkt_handler;
  242. } else {
  243. rc = Error_Duplication;
  244. }
  245. iom_pkt_handler_unlock(iom);
  246. return rc;
  247. }
  248. int sp_iom_remove_pkt_handler(sp_iom_t *iom, int key)
  249. {
  250. int rc = 0;
  251. pkt_handler_t *pkt_handler;
  252. int i;
  253. iom_pkt_handler_lock(iom);
  254. pkt_handler = find_pkt_handler(iom, key, &i);
  255. if (pkt_handler) {
  256. if (i < iom->arr_pkt_handler->nelts-1)
  257. ARRAY_IDX(iom->arr_pkt_handler, i, pkt_handler_t*) = ARRAY_IDX(iom->arr_pkt_handler, iom->arr_pkt_handler->nelts-1, pkt_handler_t*);
  258. array_pop(iom->arr_pkt_handler);
  259. } else {
  260. rc = Error_NotExist;
  261. }
  262. iom_pkt_handler_unlock(iom);
  263. return rc;
  264. }
  265. int sp_iom_add_sys_handler(sp_iom_t *iom, int key, sp_iom_on_sys on_sys, void *user_data)
  266. {
  267. int rc = 0;
  268. sys_handler_t *sys_handler;
  269. iom_sys_handler_lock(iom);
  270. sys_handler = find_sys_handler(iom, key, NULL);
  271. if (!sys_handler) {
  272. sys_handler = MALLOC_T(sys_handler_t);
  273. sys_handler->key = key;
  274. sys_handler->on_sys = on_sys;
  275. sys_handler->user_data = user_data;
  276. ARRAY_PUSH(iom->arr_sys_handler, sys_handler_t*) = sys_handler;
  277. } else {
  278. rc = Error_Duplication;
  279. }
  280. iom_sys_handler_unlock(iom);
  281. return rc;
  282. }
  283. int sp_iom_remove_sys_handler(sp_iom_t *iom, int key)
  284. {
  285. int rc = 0;
  286. sys_handler_t *sys_handler;
  287. int i;
  288. iom_sys_handler_lock(iom);
  289. sys_handler = find_sys_handler(iom, key, &i);
  290. if (sys_handler) {
  291. ARRAY_DEL(iom->arr_sys_handler, i, sys_handler_t*);
  292. } else {
  293. rc = Error_NotExist;
  294. }
  295. iom_sys_handler_unlock(iom);
  296. return rc;
  297. }
  298. static int sp_iom_poll(sp_iom_t *iom, int *timeout)
  299. {
  300. int rc;
  301. if (iom->poll_thread_id == 0) {
  302. iom->poll_thread_id = (int)GetCurrentThreadId();
  303. }
  304. rc = bus_endpt_poll(iom->endpt, *timeout);
  305. for (;;) {
  306. int cnt;
  307. long timerBeginTime, timerPlayTime;
  308. iom_tm_lock(iom);
  309. timerBeginTime = clock();
  310. cnt = timer_queue_poll_one(iom->tm_queue, NULL, timeout); // timeout返回下一个定时器超时间隔
  311. timerPlayTime = clock() - timerBeginTime;
  312. if (timerPlayTime > 500) //timer play time over than 500ms, it may make the main thread run slow
  313. sp_dbg_info("cur Timer %d has run %d ms", cnt, timerPlayTime);
  314. iom_tm_unlock(iom);
  315. if (!cnt) {
  316. //sp_dbg_debug("no timer execute current times.");
  317. break;
  318. }
  319. }
  320. return rc;
  321. }
  322. int sp_iom_run(sp_iom_t *iom)
  323. {
  324. int timeout = POLL_INTERVAL;
  325. while (
  326. #ifdef _WIN32
  327. InterlockedExchangeAdd((LONG*)&iom->stop, 0) == 0
  328. #else
  329. /*the adapte func 'InterlockedExchangeAdd' implemented under winpr went wrong, maybe 64bit has responsibility*/
  330. iom->stop == 0
  331. #endif //_WIN32
  332. || timer_queue_get_count(iom->tm_queue) > 0)
  333. {
  334. int rc = sp_iom_poll(iom, &timeout);
  335. if (rc >= 0) {
  336. if (timeout > POLL_INTERVAL || timeout < 0)
  337. timeout = POLL_INTERVAL;
  338. } else {
  339. sp_dbg_debug("iom poll failed!");
  340. ExitProcess(-1);
  341. return rc;
  342. }
  343. }
  344. sp_dbg_debug("iom run exit ok!");
  345. return 0;
  346. }
  347. int sp_iom_stop(sp_iom_t *iom)
  348. {
  349. sp_dbg_debug("set iom stop flag!");
  350. InterlockedExchange((LONG*)&iom->stop, 1);
  351. return 0;
  352. }
  353. int sp_iom_send(sp_iom_t *iom, int this_svc_id, int epid, int svc_id, int pkt_type, int pkt_id, iobuffer_t **p_pkt)
  354. {
  355. iobuffer_t *pkt = p_pkt ? *p_pkt : NULL;
  356. int rc;
  357. if (!pkt) {
  358. pkt = iobuffer_create(-1, -1);
  359. }
  360. {
  361. int params[] = {
  362. pkt_type,
  363. this_svc_id,
  364. epid,
  365. svc_id,
  366. pkt_id,
  367. (int)pkt,
  368. };
  369. rc = bus_endpt_send_msg(iom->endpt, IOM_T_SEND_INFO, array_size(params), params);
  370. }
  371. if (!p_pkt || !*p_pkt) {
  372. if (rc != 0) {
  373. iobuffer_dec_ref(pkt);
  374. }
  375. } else {
  376. if (rc == 0)
  377. *p_pkt = NULL;
  378. }
  379. if (rc != 0)
  380. rc = Error_IO;
  381. return rc;
  382. }
  383. int sp_iom_post(sp_iom_t *iom, int this_svc_id, int epid, int svc_id, int pkt_type, int pkt_id, iobuffer_t **p_pkt)
  384. {
  385. iobuffer_t *pkt = p_pkt ? *p_pkt : NULL;
  386. int rc;
  387. if (!pkt) {
  388. pkt = iobuffer_create(-1, -1);
  389. }
  390. {
  391. int params[] = {
  392. pkt_type,
  393. this_svc_id,
  394. epid,
  395. svc_id,
  396. pkt_id,
  397. (int)pkt,
  398. };
  399. rc = bus_endpt_post_msg(iom->endpt, IOM_T_SEND_INFO, array_size(params), params);
  400. }
  401. if (!p_pkt || !*p_pkt) {
  402. if (rc != 0)
  403. iobuffer_dec_ref(pkt);
  404. } else {
  405. if (rc == 0)
  406. *p_pkt = NULL;
  407. }
  408. if (rc != 0)
  409. rc = Error_Unexpect;
  410. return rc;
  411. }
  412. int sp_iom_get_state(sp_iom_t *iom, int epid, int *state)
  413. {
  414. int rc = 0;
  415. int rc1;
  416. int params[] = {epid, (int)state, (int)&rc};
  417. // use -1 for get state
  418. rc1 = bus_endpt_send_msg(iom->endpt, IOM_T_GET_STATE, array_size(params), params);
  419. if (rc != 0)
  420. rc = Error_Unexpect;
  421. return rc1 == 0 ? rc : Error_Unexpect;
  422. }
  423. int sp_iom_schedule_timer(sp_iom_t *iom, timer_entry *entry, unsigned int delay)
  424. {
  425. int rc;
  426. iom_tm_lock(iom);
  427. rc = timer_queue_schedule(iom->tm_queue, entry, delay);
  428. iom_tm_unlock(iom);
  429. return rc ? Error_Unexpect : 0;
  430. }
  431. int sp_iom_cancel_timer(sp_iom_t *iom, timer_entry *entry, int cancel)
  432. {
  433. int rc;
  434. iom_tm_lock(iom);
  435. rc = timer_queue_cancel(iom->tm_queue, entry, cancel);
  436. iom_tm_unlock(iom);
  437. return rc ? Error_Unexpect : 0;
  438. }
  439. int sp_iom_get_epid(sp_iom_t *iom)
  440. {
  441. return bus_endpt_get_epid(iom->endpt);
  442. }
  443. const char *sp_iom_get_comm_url(sp_iom_t *iom)
  444. {
  445. return bus_endpt_get_url(iom->endpt);
  446. }
  447. int sp_iom_post_quit(sp_iom_t *iom)
  448. {
  449. int rc = bus_endpt_post_msg(iom->endpt, IOM_T_EXIT, 0, 0);
  450. if (rc != 0)
  451. rc = Error_Unexpect;
  452. return rc;
  453. }
  454. int sp_iom_get_poll_thread_id(sp_iom_t *iom)
  455. {
  456. return iom->poll_thread_id;
  457. }