sp_rpc.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532
  1. #include "precompile.h"
  2. #include "sp_def.h"
  3. #include "sp_svc.h"
  4. #include "sp_rpc.h"
  5. #include "sp_dbg_export.h"
  6. #include "list.h"
  7. #include "memutil.h"
  8. #include "spinlock.h"
  9. #include "refcnt.h"
  10. #define BUCKET_SIZE 127
  11. /*
  12. Create +------------+ SENT +------------+ ANS +------------+ Destroy +------------+
  13. ------> | INIT | ----> | SENT | ----->| CALLED | -------->| TERM |
  14. +------------+ +------------+ +------------+ +------------+
  15. */
  16. #define STATE_INIT 0
  17. #define STATE_SENT 1
  18. #define STATE_CALLED 2
  19. #define STATE_TERM 3
  20. #define STATE_ERROR 4
  21. #define RPC_CMD_INFO 0
  22. #define RPC_CMD_REQ 1
  23. #define RPC_CMD_ANS 2
  24. struct sp_rpc_server_t
  25. {
  26. int stop;
  27. sp_rpc_server_callback cb;
  28. sp_svc_t *svc;
  29. DECLARE_REF_COUNT_MEMBER(ref_cnt);
  30. };
  31. DECLARE_REF_COUNT_STATIC(sp_rpc_server, sp_rpc_server_t)
  32. static void __threadpool_server_on_pkt(threadpool_t *threadpool, void *arg, int param1, int param2)
  33. {
  34. sp_rpc_server_t *server = (sp_rpc_server_t *)arg;
  35. iobuffer_t *pkt = (iobuffer_t*)param1;
  36. int epid;
  37. int svc_id;
  38. int pkt_type;
  39. int pkt_id;
  40. int cmd_type;
  41. iobuffer_read(pkt, IOBUF_T_I4, &epid, 0);
  42. iobuffer_read(pkt, IOBUF_T_I4, &svc_id, 0);
  43. iobuffer_read(pkt, IOBUF_T_I4, &pkt_type, 0);
  44. iobuffer_read(pkt, IOBUF_T_I4, &pkt_id, 0);
  45. cmd_type = SP_GET_TYPE(pkt_type);
  46. if (cmd_type == RPC_CMD_INFO) {
  47. server->cb.on_info(server, epid, svc_id, pkt_id, &pkt, server->cb.user_data);
  48. } else if (cmd_type == RPC_CMD_REQ) {
  49. int call_type;
  50. iobuffer_read(pkt, IOBUF_T_I4, &call_type, NULL);
  51. server->cb.on_req(server, epid, svc_id, pkt_id, call_type, &pkt, server->cb.user_data);
  52. } else {
  53. sp_dbg_warn("RPC CMD unknown types!");
  54. }
  55. sp_rpc_server_dec_ref(server); // @
  56. if (pkt)
  57. iobuffer_dec_ref(pkt);
  58. }
  59. static int server_on_pkt(sp_svc_t *svc, int epid, int svc_id, int pkt_type, int pkt_id, iobuffer_t **p_pkt, void *user_data)
  60. {
  61. sp_rpc_server_t *server = (sp_rpc_server_t*)user_data;
  62. int rc;
  63. iobuffer_t *pkt;
  64. pkt = *p_pkt;
  65. *p_pkt = NULL;
  66. iobuffer_write_head(pkt, IOBUF_T_I4, &pkt_id, 0);
  67. iobuffer_write_head(pkt, IOBUF_T_I4, &pkt_type, 0);
  68. iobuffer_write_head(pkt, IOBUF_T_I4, &svc_id, 0);
  69. iobuffer_write_head(pkt, IOBUF_T_I4, &epid, 0);
  70. sp_rpc_server_inc_ref(server); // @
  71. rc = threadpool_queue_workitem2(sp_svc_get_threadpool(svc), NULL, &__threadpool_server_on_pkt, server, (int)pkt, 0);
  72. if (rc != 0) {
  73. sp_rpc_server_dec_ref(server); // @
  74. iobuffer_dec_ref(pkt);
  75. }
  76. return FALSE;
  77. }
  78. int sp_rpc_server_create(sp_svc_t *svc, sp_rpc_server_callback *cb, sp_rpc_server_t **p_server)
  79. {
  80. sp_rpc_server_t *server = MALLOC_T(sp_rpc_server_t);
  81. server->stop = 0;
  82. memcpy(&server->cb, cb, sizeof(sp_rpc_server_callback));
  83. server->svc = svc;
  84. REF_COUNT_INIT(&server->ref_cnt);
  85. *p_server = server;
  86. return 0;
  87. }
  88. void sp_rpc_server_destroy(sp_rpc_server_t *server)
  89. {
  90. sp_rpc_server_dec_ref(server);
  91. }
  92. int sp_rpc_server_start(sp_rpc_server_t *server)
  93. {
  94. server->stop = 0;
  95. return sp_svc_add_pkt_handler(server->svc, (int)server, SP_PKT_RPC, &server_on_pkt, server);
  96. }
  97. int sp_rpc_server_stop(sp_rpc_server_t *server)
  98. {
  99. if (!server->stop)
  100. return Error_Bug;
  101. server->stop = 1;
  102. return sp_svc_remove_pkt_handler(server->svc, (int)server, SP_PKT_RPC);
  103. }
  104. sp_svc_t *sp_rpc_server_get_svc(sp_rpc_server_t *server)
  105. {
  106. return server->svc;
  107. }
  108. int sp_rpc_server_send_answer(sp_rpc_server_t *server, int epid, int svc_id, int rpc_id, iobuffer_t **ans_pkt)
  109. {
  110. return sp_svc_post(server->svc, epid, svc_id, SP_PKT_RPC | RPC_CMD_ANS, rpc_id, ans_pkt);
  111. }
  112. static void __sp_rpc_destroy(sp_rpc_server_t *server)
  113. {
  114. if (server->cb.on_destroy) {
  115. (*server->cb.on_destroy)(server, server->cb.user_data);
  116. }
  117. free(server);
  118. }
  119. IMPLEMENT_REF_COUNT_MT(sp_rpc_server, sp_rpc_server_t, ref_cnt, __sp_rpc_destroy)
  120. struct sp_rpc_client_t
  121. {
  122. struct hlist_node hentry; // element of sp_rpc_client_mgr_t->rpc_buckets[index]
  123. int state;
  124. int remote_epid;
  125. int remote_svc_id;
  126. unsigned int rpc_id;
  127. int call_type;
  128. spinlock_t lock;
  129. sp_rpc_client_callback cb;
  130. sp_rpc_client_mgr_t *mgr;
  131. DECLARE_REF_COUNT_MEMBER(ref_cnt);
  132. };
  133. DECLARE_REF_COUNT_STATIC(sp_rpc_client, sp_rpc_client_t)
  134. struct sp_rpc_client_mgr_t
  135. {
  136. struct hlist_head rpc_buckets[BUCKET_SIZE]; // list of sp_rpc_client_t
  137. sp_svc_t *svc;
  138. int rpc_cnt;
  139. int stop;
  140. int local_seq;
  141. sp_rpc_client_mgr_callback cb;
  142. CRITICAL_SECTION lock;
  143. DECLARE_REF_COUNT_MEMBER(ref_cnt);
  144. };
  145. DECLARE_REF_COUNT_STATIC(sp_rpc_client_mgr, sp_rpc_client_mgr_t)
  146. static __inline void mgr_lock(sp_rpc_client_mgr_t *mgr)
  147. {
  148. EnterCriticalSection(&mgr->lock);
  149. }
  150. static __inline void mgr_unlock(sp_rpc_client_mgr_t *mgr)
  151. {
  152. LeaveCriticalSection(&mgr->lock);
  153. }
  154. static __inline void client_lock(sp_rpc_client_t *client)
  155. {
  156. spinlock_enter(&client->lock, -1);
  157. }
  158. static __inline void client_unlock(sp_rpc_client_t *client)
  159. {
  160. spinlock_leave(&client->lock);
  161. }
  162. static void client_set_error(sp_rpc_client_t *client, int error);
  163. static void client_process_ans(sp_rpc_client_t *client, iobuffer_t **ans_pkt);
  164. static void __threadpool_mgr_on_req(threadpool_t *threadpool, void *arg, int param1, int param2)
  165. {
  166. sp_rpc_client_mgr_t *mgr = (sp_rpc_client_mgr_t *)arg;
  167. iobuffer_t *pkt = (iobuffer_t*)param1;
  168. int epid;
  169. int svc_id;
  170. int pkt_type;
  171. int pkt_id;
  172. int cmd_type;
  173. iobuffer_read(pkt, IOBUF_T_I4, &epid, 0);
  174. iobuffer_read(pkt, IOBUF_T_I4, &svc_id, 0);
  175. iobuffer_read(pkt, IOBUF_T_I4, &pkt_type, 0);
  176. iobuffer_read(pkt, IOBUF_T_I4, &pkt_id, 0);
  177. cmd_type = SP_GET_TYPE(pkt_type);
  178. if (cmd_type == RPC_CMD_REQ && mgr->cb.on_req)
  179. {
  180. int call_type;
  181. iobuffer_read(pkt, IOBUF_T_I4, &call_type, NULL);
  182. mgr->cb.on_req(mgr, epid, svc_id, pkt_id, call_type, &pkt, mgr->cb.user_data);
  183. }
  184. else
  185. {
  186. sp_dbg_warn("RPC CMD unknown types!");
  187. }
  188. sp_rpc_client_mgr_dec_ref(mgr); // @
  189. if (pkt)
  190. iobuffer_dec_ref(pkt);
  191. }
  192. static int mgr_on_pkt(sp_svc_t *svc,int epid, int svc_id, int pkt_type, int pkt_id, iobuffer_t **p_pkt, void *user_data)
  193. {
  194. sp_rpc_client_mgr_t *mgr = (sp_rpc_client_mgr_t*)user_data;
  195. if (SP_GET_TYPE(pkt_type) == RPC_CMD_ANS) {
  196. int rpc_id = pkt_id;
  197. int slot = ((unsigned int)rpc_id) % BUCKET_SIZE;
  198. sp_rpc_client_t *tpos;
  199. struct hlist_node *pos, *n;
  200. mgr_lock(mgr);
  201. hlist_for_each_entry_safe(tpos, pos, n, &mgr->rpc_buckets[slot], sp_rpc_client_t, hentry) {
  202. if (tpos->rpc_id == rpc_id) {
  203. client_process_ans(tpos, p_pkt);
  204. break;
  205. }
  206. }
  207. mgr_unlock(mgr);
  208. return FALSE;
  209. }
  210. else if (SP_GET_TYPE(pkt_type) == RPC_CMD_REQ)
  211. {
  212. int rc;
  213. iobuffer_t *pkt = *p_pkt;
  214. *p_pkt = NULL;
  215. iobuffer_write_head(pkt, IOBUF_T_I4, &pkt_id, 0);
  216. iobuffer_write_head(pkt, IOBUF_T_I4, &pkt_type, 0);
  217. iobuffer_write_head(pkt, IOBUF_T_I4, &svc_id, 0);
  218. iobuffer_write_head(pkt, IOBUF_T_I4, &epid, 0);
  219. sp_rpc_client_mgr_inc_ref(mgr);
  220. rc = threadpool_queue_workitem2(sp_svc_get_threadpool(svc), NULL, &__threadpool_mgr_on_req, mgr, (int)pkt, 0);
  221. if (rc != 0) {
  222. sp_rpc_client_mgr_dec_ref(mgr); // @
  223. iobuffer_dec_ref(pkt);
  224. }
  225. }
  226. return TRUE;
  227. }
  228. static void mgr_on_sys(sp_svc_t *svc,int epid, int state, void *user_data)
  229. {
  230. sp_rpc_client_mgr_t *mgr = (sp_rpc_client_mgr_t*)user_data;
  231. if (state == BUS_STATE_OFF) {
  232. int i;
  233. sp_rpc_client_t *tpos;
  234. struct hlist_node *pos, *n;
  235. mgr_lock(mgr);
  236. for (i = 0; i < BUCKET_SIZE; ++i) {
  237. hlist_for_each_entry_safe(tpos, pos, n, &mgr->rpc_buckets[i], sp_rpc_client_t, hentry) {
  238. if (tpos->remote_epid == epid) {
  239. client_set_error(tpos, Error_NetBroken);
  240. }
  241. }
  242. }
  243. mgr_unlock(mgr);
  244. }
  245. }
  246. int sp_rpc_client_mgr_create(sp_svc_t *svc, sp_rpc_client_mgr_callback *cb, sp_rpc_client_mgr_t **p_mgr)
  247. {
  248. int i;
  249. sp_rpc_client_mgr_t *mgr = MALLOC_T(sp_rpc_client_mgr_t);
  250. mgr->local_seq = 0;
  251. mgr->rpc_cnt = 0;
  252. mgr->stop = 0;
  253. mgr->svc = svc;
  254. memcpy(&mgr->cb, cb, sizeof(sp_rpc_client_mgr_callback));
  255. for (i = 0;i < BUCKET_SIZE; ++i) {
  256. INIT_HLIST_HEAD(&mgr->rpc_buckets[i]);
  257. }
  258. InitializeCriticalSection(&mgr->lock);
  259. REF_COUNT_INIT(&mgr->ref_cnt);
  260. *p_mgr = mgr;
  261. return 0;
  262. }
  263. // {bug} not delete rpc_buckets arrary
  264. void sp_rpc_client_mgr_destroy(sp_rpc_client_mgr_t *mgr)
  265. {
  266. sp_rpc_client_mgr_dec_ref(mgr);
  267. }
  268. int sp_rpc_client_mgr_start(sp_rpc_client_mgr_t *mgr)
  269. {
  270. mgr->stop = 0;
  271. sp_svc_add_pkt_handler(mgr->svc, (int)mgr, SP_PKT_RPC, &mgr_on_pkt, mgr);
  272. sp_svc_add_sys_handler(mgr->svc, (int)mgr, &mgr_on_sys, mgr);
  273. return 0;
  274. }
  275. int sp_rpc_client_mgr_stop(sp_rpc_client_mgr_t *mgr)
  276. {
  277. sp_svc_remove_pkt_handler(mgr->svc, (int)mgr, SP_PKT_RPC);
  278. sp_svc_remove_sys_handler(mgr->svc, (int)mgr);
  279. return 0;
  280. }
  281. sp_svc_t *sp_rpc_client_mgr_get_svc(sp_rpc_client_mgr_t *mgr)
  282. {
  283. return mgr->svc;
  284. }
  285. int sp_rpc_client_mgr_cancel_all(sp_rpc_client_mgr_t *mgr)
  286. {
  287. int i;
  288. mgr_lock(mgr);
  289. for (i = 0; i < BUCKET_SIZE; ++i) {
  290. sp_rpc_client_t *tpos;
  291. struct hlist_node *pos;
  292. hlist_for_each_entry(tpos, pos, &mgr->rpc_buckets[i], sp_rpc_client_t, hentry) {
  293. client_set_error(tpos, Error_Cancel);
  294. }
  295. }
  296. mgr_unlock(mgr);
  297. return 0;
  298. }
  299. int sp_rpc_client_mgr_get_client_cnt(sp_rpc_client_mgr_t *mgr)
  300. {
  301. return mgr->rpc_cnt;
  302. }
  303. int sp_rpc_client_mgr_one_way_call(sp_rpc_client_mgr_t *mgr, int epid, int svc_id, int call_type, iobuffer_t **info_pkt)
  304. {
  305. return sp_svc_post(mgr->svc, epid, svc_id, SP_PKT_RPC| RPC_CMD_INFO, call_type, info_pkt);
  306. }
  307. int sp_rpc_client_mgr_send_answer(sp_rpc_client_mgr_t *mgr, int epid, int svc_id, int rpc_id, iobuffer_t **ans_pkt)
  308. {
  309. return sp_svc_post(mgr->svc, epid, svc_id, SP_PKT_RPC | RPC_CMD_ANS, rpc_id, ans_pkt);
  310. }
  311. static void __sp_rpc_client_mgr_destroy(sp_rpc_client_mgr_t *mgr)
  312. {
  313. if (mgr->cb.on_destroy)
  314. mgr->cb.on_destroy(mgr, mgr->cb.user_data);
  315. DeleteCriticalSection(&mgr->lock);
  316. free(mgr);
  317. }
  318. IMPLEMENT_REF_COUNT_MT_STATIC(sp_rpc_client_mgr, sp_rpc_client_mgr_t, ref_cnt, __sp_rpc_client_mgr_destroy)
  319. int sp_rpc_client_create(sp_rpc_client_mgr_t *mgr, int epid, int svc_id, int call_type, sp_rpc_client_callback *cb, sp_rpc_client_t **p_client)
  320. {
  321. sp_rpc_client_t *client = MALLOC_T(sp_rpc_client_t);
  322. client->mgr = mgr;
  323. client->remote_epid = epid;
  324. client->remote_svc_id = svc_id;
  325. client->call_type = call_type;
  326. memcpy(&client->cb, cb, sizeof(sp_rpc_client_callback));
  327. client->rpc_id = (int)InterlockedIncrement((LONG*)&mgr->local_seq);
  328. spinlock_init(&client->lock);
  329. client->state = STATE_INIT;
  330. REF_COUNT_INIT(&client->ref_cnt);
  331. sp_rpc_client_mgr_inc_ref(mgr);
  332. sp_rpc_client_inc_ref(client);
  333. mgr_lock(mgr);
  334. hlist_add_head(&client->hentry, &mgr->rpc_buckets[client->rpc_id % BUCKET_SIZE]);
  335. client->mgr->rpc_cnt++;
  336. mgr_unlock(mgr);
  337. *p_client = client;
  338. return 0;
  339. }
  340. int sp_rpc_client_close(sp_rpc_client_t *client)
  341. {
  342. int rc;
  343. client_lock(client);
  344. if (client->state != STATE_TERM && client->state != STATE_ERROR) {
  345. client->state = STATE_ERROR;
  346. rc = 0;
  347. } else {
  348. rc = Error_Duplication;
  349. }
  350. client_unlock(client);
  351. return rc;
  352. }
  353. void sp_rpc_client_destroy(sp_rpc_client_t *client)
  354. {
  355. mgr_lock(client->mgr);
  356. client->mgr->rpc_cnt --;
  357. hlist_del(&client->hentry);
  358. mgr_unlock(client->mgr);
  359. sp_rpc_client_dec_ref(client);
  360. client_lock(client);
  361. client->state = STATE_TERM;
  362. client_unlock(client);
  363. sp_rpc_client_dec_ref(client);
  364. }
  365. int sp_rpc_client_async_call(sp_rpc_client_t *client, iobuffer_t **req_pkt)
  366. {
  367. sp_rpc_client_mgr_t *mgr = client->mgr;
  368. int rc = 0;
  369. if (client->state != STATE_INIT)
  370. return Error_Bug;
  371. client_lock(client);
  372. if (client->state == STATE_INIT) {
  373. client->state = STATE_SENT;
  374. sp_rpc_client_inc_ref(client); // @
  375. iobuffer_write_head(*req_pkt, IOBUF_T_I4, &client->call_type, 0);
  376. rc = sp_svc_post(mgr->svc, client->remote_epid, client->remote_svc_id, SP_PKT_RPC|RPC_CMD_REQ, client->rpc_id, req_pkt);
  377. if (rc != 0) {
  378. sp_rpc_client_dec_ref(client); // @
  379. client->state = STATE_ERROR;
  380. }
  381. } else {
  382. rc = Error_NetBroken;
  383. }
  384. client_unlock(client);
  385. return rc;
  386. }
  387. int sp_rpc_client_get_rpc_id(sp_rpc_client_t *client)
  388. {
  389. return client->rpc_id;
  390. }
  391. int sp_rpc_client_get_remote_epid(sp_rpc_client_t *client)
  392. {
  393. return client->remote_epid;
  394. }
  395. int sp_rpc_client_get_remote_svc_id(sp_rpc_client_t *client)
  396. {
  397. return client->remote_svc_id;
  398. }
  399. static void client_set_error(sp_rpc_client_t *client, int error)
  400. {
  401. if (client->state != STATE_ERROR && client->state != STATE_TERM) {
  402. client_lock(client);
  403. if (client->state != STATE_ERROR && client->state != STATE_TERM) {
  404. if (client->state == STATE_SENT) {
  405. if (client->cb.on_ans) {
  406. client->cb.on_ans(client, error, NULL, client->cb.user_data);
  407. }
  408. } else {
  409. client->state = STATE_ERROR;
  410. }
  411. }
  412. client_unlock(client);
  413. }
  414. sp_rpc_client_dec_ref(client); // @
  415. }
  416. static void client_process_ans(sp_rpc_client_t *client, iobuffer_t **ans_pkt)
  417. {
  418. if (client->state == STATE_SENT) {
  419. client_lock(client);
  420. if (client->state == STATE_SENT) {
  421. client->state = STATE_CALLED;
  422. if (client->cb.on_ans) {
  423. client->cb.on_ans(client, 0, ans_pkt, client->cb.user_data);
  424. }
  425. }
  426. client_unlock(client);
  427. }
  428. sp_rpc_client_dec_ref(client); // @
  429. }
  430. static void __client_destroy(sp_rpc_client_t *client)
  431. {
  432. if (client->cb.on_destroy)
  433. client->cb.on_destroy(client, client->cb.user_data);
  434. sp_rpc_client_mgr_dec_ref(client->mgr);
  435. free(client);
  436. }
  437. IMPLEMENT_REF_COUNT_MT_STATIC(sp_rpc_client, sp_rpc_client_t, ref_cnt, __client_destroy)