bizchan.cpp 42 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614
  1. #include "bizchan.h"
  2. #include "quicklz.h"
  3. #ifdef RVC_OS_WIN
  4. #define WIN32_LEAN_AND_MEAN
  5. #include <Windows.h>
  6. #include <WinSock2.h>
  7. #include <WinSock.h>
  8. #include <process.h>
  9. #else
  10. #include <sys/socket.h>
  11. #include <fcntl.h>
  12. #include <errno.h>
  13. #include <unistd.h>
  14. #include <pthread.h>
  15. #include <netinet/in.h>
  16. #include <string.h>
  17. #include <arpa/inet.h>
  18. //#include "winpr/winsock.h"
  19. //#include "winpr/file.h"
  20. #endif
  21. #include <stdlib.h>
  22. #include <stdio.h>
  23. #include <assert.h>
  24. #include "chan_protocol.h"
  25. //#include "acmstrdec.h"
  26. //#include "acmstrenc.h"
  27. #include "openssl/rc4.h"
  28. #include "ListEntry.h"
  29. #include <screencodec.h>
  30. #include "jpeg2k.h"
  31. #ifdef RVC_OS_WIN
  32. #include <DbgHelp.h>
  33. #pragma comment(lib, "dbghelp.lib")
  34. #endif
  35. #define PING_INTERVAL 10000 // 10s
  36. #define MAX_TIMEOUT 60000 //超时时间60S,20170313修改,解决排队机闪呼的问题
  37. #define DEFAULT_RX_BUF_SIZE 8192
  38. #define KEY_LEN 16
  39. #ifndef RVC_OS_WIN
  40. #include "SpBase.h"
  41. typedef int SOCKET;
  42. #ifndef INVALID_SOCKET
  43. #define INVALID_SOCKET (SOCKET)(~0)
  44. #endif
  45. #ifndef SOCKET_ERROR
  46. #define SOCKET_ERROR (-1)
  47. #endif
  48. #define max(a,b) (((a) > (b)) ? (a) : (b))
  49. #define min(a,b) (((a) < (b)) ? (a) : (b))
  50. unsigned long GetTickCount()
  51. {
  52. struct timespec ts;
  53. clock_gettime(CLOCK_MONOTONIC, &ts);
  54. return (ts.tv_sec * 1000 + ts.tv_nsec / 1000000);
  55. }
  56. #endif
  57. typedef struct recv_info_t {
  58. int offset;
  59. char *buf;
  60. int buf_len;
  61. char *unzip_buf;
  62. int unzip_len;
  63. qlz_state_decompress decompress_state;
  64. }recv_info_t;
  65. typedef struct send_buf_node {
  66. LIST_ENTRY entry;
  67. char *buf;
  68. int left;
  69. int sended;
  70. int need_encrypt;
  71. }send_buf_node;
  72. typedef struct send_info_t {
  73. //send_buf_node *send_list;
  74. LIST_ENTRY send_list;
  75. #ifdef RVC_OS_WIN
  76. CRITICAL_SECTION lock;
  77. #else
  78. pthread_mutexattr_t attr;
  79. pthread_mutex_t lock;
  80. #endif
  81. //qlz_state_compress compress_state;
  82. }send_info_t;
  83. typedef struct img_recv_t {
  84. char *data;
  85. int length;
  86. int offset;
  87. int id;
  88. }img_recv_t;
  89. struct bizchan_t
  90. {
  91. bizchan_config_t config;
  92. OnRecvPacket winsync_on_recv_cb;
  93. OnMode mode_cb;
  94. void *winsync_user_data;
  95. bizchan_callback_t cb;
  96. void *tag;
  97. #ifdef RVC_OS_WIN
  98. HANDLE work_thread;
  99. HANDLE evt;
  100. #else
  101. pthread_t work_thread;
  102. int evt[2];//pipe
  103. #endif
  104. volatile int stop_flag;
  105. recv_info_t recv_info;
  106. send_info_t send_info;
  107. int screen_img_id;
  108. int photo_img_id;
  109. int b_primary_server;
  110. int remote_video_rtp_port;
  111. int remote_video_desc;
  112. char remote_client_id[32];
  113. int connected;
  114. char local_pwd[KEY_LEN];
  115. char remote_pwd[KEY_LEN];
  116. RC4_KEY local_key;
  117. RC4_KEY remote_key;
  118. LARGE_INTEGER last_remote_active_time;
  119. LARGE_INTEGER last_local_active_time;
  120. screen_decoder_session_t *dec_session;
  121. };
  122. typedef int (*lpfn_cryptionfun)(unsigned char * out ,int outLen,const unsigned char * in,int inLen);
  123. //typedef int (*lpfn_decodestring)(unsigned char * out ,int outLen,const unsigned char * in,int inLen);
  124. //typedef int (*lpfn_encodestring)(unsigned char * out ,int outLen,const unsigned char * in,int inLen);
  125. //
  126. //typedef int (*lpfn_decodestring_mobile)(unsigned char * out ,int outLen,const unsigned char * in,int inLen);
  127. //typedef int (*lpfn_encodestring_mobile)(unsigned char * out ,int outLen,const unsigned char * in,int inLen);
  128. static lpfn_cryptionfun decodestring = NULL;
  129. static lpfn_cryptionfun encodestring = NULL;
  130. static lpfn_cryptionfun decodestring_mobile = NULL;
  131. static lpfn_cryptionfun encodestring_mobile = NULL;
  132. static __inline unsigned int hash32_buf(const void *bf, size_t len, unsigned int hash)
  133. {
  134. const unsigned char *s = (const unsigned char*)bf;
  135. while (len-- != 0) /* "nemesi": k=257, r=r*257 */
  136. hash = hash * 257 + *s++;
  137. return (hash * 257);
  138. }
  139. static void generate_rand_key(char *key, int size)
  140. {
  141. int i;
  142. srand(GetTickCount()*33);
  143. for (i = 0; i < size; ++i) {
  144. key[i] = (char)(rand() & 0xff);
  145. }
  146. #ifdef RVC_OS_WIN
  147. srand(GetCurrentProcessId() * 33);
  148. #else
  149. srand(getpid() * 33);
  150. #endif
  151. for (i = 0; i < size; ++i) {
  152. key[i] ^= (char)(rand() & 0xff);
  153. }
  154. }
  155. static __inline unsigned int hash_key(const char *key, int size)
  156. {
  157. return hash32_buf(key, size, 0);
  158. }
  159. static __inline int check_hash(char *key, int size, int hash_code)
  160. {
  161. return hash32_buf(key, size, 0) == hash_code;
  162. }
  163. static __inline void GetTick(LARGE_INTEGER *last, LARGE_INTEGER *lt)
  164. {
  165. #ifdef RVC_OS_WIN
  166. DWORD dwNow = GetTickCount();
  167. if (last->LowPart > dwNow) {
  168. lt->LowPart = dwNow;
  169. lt->HighPart = last->HighPart + 1;
  170. } else {
  171. lt->LowPart = dwNow;
  172. lt->HighPart = last->HighPart;
  173. }
  174. #else
  175. DWORD dwNow = GetTickCount();
  176. if (last->u.LowPart > dwNow) {
  177. lt->u.LowPart = dwNow;
  178. lt->u.HighPart = last->u.HighPart + 1;
  179. }
  180. else {
  181. lt->u.LowPart = dwNow;
  182. lt->u.HighPart = last->u.HighPart;
  183. }
  184. #endif
  185. }
  186. BIZCHAN_API(int) bizchan_lib_init()
  187. {
  188. #ifdef RVC_OS_WIN
  189. WSADATA wsaData;
  190. return WSAStartup(0x0202, &wsaData);
  191. #else
  192. return 0;
  193. #endif
  194. }
  195. BIZCHAN_API(int) bizchan_lib_term()
  196. {
  197. #ifdef RVC_OS_WIN
  198. return WSACleanup();
  199. #else
  200. return 0;
  201. #endif
  202. }
  203. static int config_copy(bizchan_t* chan, const bizchan_config_t *src, bizchan_config_t *dst)
  204. {
  205. memcpy(dst, src, sizeof(bizchan_config_t));
  206. dst->proxy_server = strdup(src->proxy_server);
  207. if (src->bak_proxy_server != NULL) {
  208. dst->bak_proxy_server = strdup(src->bak_proxy_server);
  209. }
  210. if (src->session_id != NULL) {
  211. dst->session_id = strdup(src->session_id);
  212. }
  213. if (src->agent_id != NULL) {
  214. dst->agent_id = strdup(src->agent_id);
  215. }
  216. if (src->client_id != NULL) {
  217. dst->client_id = strdup(src->client_id);
  218. }
  219. return 0;
  220. }
  221. static int config_check(const bizchan_config_t *config)
  222. {
  223. if (!config->proxy_server)
  224. return -1;
  225. if (config->proxy_server_port <= 0 || config->proxy_server_port >= 0xffff)
  226. return -1;
  227. if (config->bak_proxy_server) {
  228. if (config->bak_proxy_server_port <= 0 || config->bak_proxy_server_port >= 0xffff)
  229. return -1;
  230. }
  231. if (!config->session_id)
  232. return -1;
  233. if (!config->agent_id)
  234. return -1;
  235. return 0;
  236. }
  237. static void config_free(bizchan_config_t *config)
  238. {
  239. free(config->proxy_server);
  240. free(config->bak_proxy_server);
  241. free(config->session_id);
  242. free(config->agent_id);
  243. free(config->client_id);
  244. }
  245. static int callback_check(const bizchan_callback_t *cb)
  246. {
  247. if (!cb->on_close)
  248. return -1;
  249. if (!cb->on_connect)
  250. return -1;
  251. if (!cb->on_recv_pkt)
  252. return -1;
  253. return 0;
  254. }
  255. static void invoke_on_connect(bizchan_t *chan, int error)
  256. {
  257. if (error == 0)
  258. chan->cb.on_connect(chan,
  259. error,
  260. chan->b_primary_server ? chan->config.proxy_server : chan->config.bak_proxy_server,
  261. chan->remote_video_rtp_port,
  262. chan->remote_video_desc,
  263. chan->remote_client_id,
  264. chan->cb.user_data);
  265. else
  266. chan->cb.on_connect(chan,
  267. error,
  268. NULL,
  269. 0,
  270. 0,
  271. NULL,
  272. chan->cb.user_data);
  273. }
  274. //static FILE *rx_log_fp = NULL;
  275. static void invoke_on_recv_pkt(bizchan_t *chan, int type, int sub_type, int id, const char *pkt, int pkt_size)
  276. {
  277. /*if (rx_log_fp == NULL) {
  278. rx_log_fp = fopen("c:\\rxlog.txt", "wt");
  279. fprintf(rx_log_fp, "===================\n");
  280. }
  281. {
  282. SYSTEMTIME st;
  283. GetLocalTime(&st);
  284. fprintf(rx_log_fp, "[%02d:%02d:%02d.%03d] type = %d, sub_type = %d, id = %d, pkt_size = %d, hash = %d\n",
  285. st.wHour, st.wMinute, st.wSecond, st.wMilliseconds,
  286. type, sub_type, id, pkt_size, hash32_buf(pkt, pkt_size, 0));
  287. fflush(rx_log_fp);
  288. }*/
  289. if (type == ACM_TYPE_SRN) {
  290. int cat = ACM_SRN_CAT(sub_type);
  291. if (cat == ACM_SRN_ANS) {
  292. if (id == chan->screen_img_id) {
  293. if (pkt_size == 4)
  294. {
  295. int err;
  296. memcpy(&err, pkt, 4);
  297. chan->cb.on_recv_screen(chan, id, err, 0, 0, NULL, 0, chan->cb.user_data);
  298. }
  299. else
  300. {
  301. int size = 0;
  302. int width, height;
  303. int rc;
  304. char *dec_buf = NULL;
  305. rc = screen_decoder_session_decode(chan->dec_session, pkt, pkt_size, &width, &height, dec_buf, &size);
  306. if (rc == 0) {
  307. dec_buf = (char*)malloc(size);
  308. rc = screen_decoder_session_decode(chan->dec_session, pkt, pkt_size, &width, &height, dec_buf, &size);
  309. }
  310. if (rc == 0) {
  311. chan->cb.on_recv_screen(chan, id, 0, width, height, dec_buf, size, chan->cb.user_data);
  312. } else {
  313. chan->cb.on_recv_screen(chan, id, rc, 0, 0, NULL, 0, chan->cb.user_data);
  314. }
  315. if (dec_buf)
  316. free(dec_buf);
  317. }
  318. }
  319. } else
  320. {
  321. chan->cb.on_recv_pkt(chan, type, sub_type, id, pkt, pkt_size, chan->cb.user_data);
  322. }
  323. } else if (type == ACM_TYPE_PHT) {
  324. int cat = ACM_PHT_CAT(sub_type);
  325. if (cat == ACM_PHT_ANS)
  326. {
  327. if (id == chan->photo_img_id) {
  328. if (pkt_size == 4) {
  329. int err;
  330. memcpy(&err, pkt, 4);
  331. chan->cb.on_recv_photo(chan, id, err, 0, 0, NULL, 0, chan->cb.user_data);
  332. } else {
  333. jpeg2k_raw_image raw_image = {0};
  334. jpeg2k_coded_image codec_image = {0};
  335. int rc;
  336. codec_image.data = (unsigned char*)pkt;
  337. codec_image.len = pkt_size;
  338. rc = jpeg2k_decode(&raw_image, &codec_image);
  339. if (rc == 0) {
  340. chan->cb.on_recv_photo(chan, id, 0, raw_image.width, raw_image.height, (const char*)&raw_image.data[0], raw_image.len, chan->cb.user_data);
  341. jpeg2k_decode_free(&raw_image);
  342. } else {
  343. chan->cb.on_recv_photo(chan, id, rc, 0, 0, NULL, 0, chan->cb.user_data);
  344. }
  345. }
  346. }
  347. } else {
  348. chan->cb.on_recv_pkt(chan, type, sub_type, id, pkt, pkt_size, chan->cb.user_data);
  349. }
  350. } else if (type == ACM_TYPE_SYNC) {
  351. if (chan->winsync_on_recv_cb) {
  352. chan->winsync_on_recv_cb(sub_type, pkt, pkt_size, chan->winsync_user_data);
  353. } else {
  354. chan->cb.on_recv_pkt(chan, type, sub_type, id, pkt, pkt_size, chan->cb.user_data);
  355. }
  356. } else if (type == ACM_TYPE_MODE) {
  357. if (chan->mode_cb) {
  358. chan->mode_cb(TRUE, chan->winsync_user_data);
  359. }
  360. chan->cb.on_recv_pkt(chan, type, sub_type, id, pkt, pkt_size, chan->cb.user_data);
  361. } else if (type == ACM_TYPE_PING) {
  362. //..... remote ping, nothing todo
  363. } else {
  364. chan->cb.on_recv_pkt(chan, type, sub_type, id, pkt, pkt_size, chan->cb.user_data);
  365. }
  366. }
  367. static void invoke_on_close(bizchan_t *chan)
  368. {
  369. if (chan->cb.on_close) {
  370. chan->cb.on_close(chan, chan->cb.user_data);
  371. }
  372. }
  373. static void invoke_on_destroy(bizchan_t *chan)
  374. {
  375. if (chan->cb.on_destroy) {
  376. chan->cb.on_destroy(chan, chan->cb.user_data);
  377. }
  378. }
  379. static FILE *fp = NULL;
  380. static int on_recv2(bizchan_t *chan, SOCKET conn)
  381. {
  382. int n;
  383. char buf[0x10000+sizeof(acm_hdr)];
  384. if (fp == NULL) {
  385. #ifdef RVC_OS_WIN
  386. fp = fopen ("g:\\rx.dat", "wb");
  387. #else
  388. fp = fopen("/home/rx.dat", "wb");
  389. #endif
  390. }
  391. do {
  392. n = recv(conn, buf, sizeof(buf), 0);
  393. if (n > 0) {
  394. fwrite(buf, 1, n, fp);
  395. fflush(fp);
  396. }
  397. } while (n > 0);
  398. #ifdef RVC_OS_WIN
  399. return (n == 0 || (GetLastError() == WSAEWOULDBLOCK)) ? 0 : -1;
  400. #else
  401. return (n == 0 || (errno == EWOULDBLOCK) || (errno == EINTR)) ? 0 : -1;
  402. #endif
  403. }
  404. static int on_recv(bizchan_t *chan, SOCKET conn)
  405. {
  406. char buffer[128];
  407. recv_info_t *ri = &chan->recv_info;
  408. int n;
  409. int result = 0;
  410. do {
  411. n = recv(conn, ri->buf+ri->offset, ri->buf_len-ri->offset, 0);
  412. snprintf(buffer, 128, "on_recv n:%d, ri->offset:%d, ri->buf_len:%d", n, ri->offset, ri->buf_len);
  413. chan->cb.dbg(chan, 0, buffer, chan->cb.user_data);
  414. if (n > 0) {
  415. int i = 0;
  416. ri->offset += n;
  417. GetTick(&chan->last_remote_active_time, &chan->last_remote_active_time);
  418. while (ri->offset-i >= sizeof(acm_hdr)) {
  419. acm_hdr *hdr = (acm_hdr*)&ri->buf[i];
  420. if (hdr->length == 0) {
  421. printf("broken");
  422. }
  423. if (ri->offset-i >= hdr->length+sizeof(acm_hdr)) {
  424. if (hdr->encrypt) {
  425. char *dec_buf = (char*)malloc(hdr->length);
  426. RC4_set_key(&chan->remote_key, sizeof(chan->remote_pwd), (unsigned char*)chan->remote_pwd);
  427. RC4(&chan->remote_key, hdr->length, (const unsigned char*)(&hdr->data[0]), (unsigned char*)dec_buf);
  428. if (hdr->compress) {
  429. int len = (int)qlz_size_decompressed(dec_buf);
  430. char *unzip_buf = (char*)malloc(len);
  431. len = qlz_decompress(dec_buf, unzip_buf, &ri->decompress_state);
  432. if (check_hash(unzip_buf, len, hdr->hash)) {
  433. invoke_on_recv_pkt(chan, hdr->type, hdr->sub_type, hdr->id, unzip_buf, len);
  434. } else {
  435. OutputDebugStringA("pkt hash failed!\n");
  436. chan->cb.dbg(chan, 0, "pkt hash failed! 1", chan->cb.user_data);
  437. }
  438. free(unzip_buf);
  439. } else {
  440. if (check_hash(dec_buf, hdr->length, hdr->hash)) {
  441. invoke_on_recv_pkt(chan, hdr->type, hdr->sub_type, hdr->id, dec_buf, hdr->length);
  442. } else {
  443. OutputDebugStringA("pkt hash failed!\n");
  444. chan->cb.dbg(chan, 0, "pkt hash failed! 2", chan->cb.user_data);
  445. }
  446. }
  447. free(dec_buf);
  448. } else {
  449. if (hdr->compress) {
  450. int len = (int)qlz_size_decompressed((const char *)&hdr->data[0]);
  451. char *unzip_buf = (char*)malloc(len);
  452. len = qlz_decompress((const char*)&hdr->data[0], unzip_buf, &ri->decompress_state);
  453. if (check_hash(unzip_buf, len, hdr->hash)) {
  454. invoke_on_recv_pkt(chan, hdr->type, hdr->sub_type, hdr->id, unzip_buf, len);
  455. } else {
  456. OutputDebugStringA("pkt hash failed!\n");
  457. chan->cb.dbg(chan, 0, "pkt hash failed! 3", chan->cb.user_data);
  458. }
  459. free(unzip_buf);
  460. } else {
  461. if (check_hash((char*)&hdr->data[0], hdr->length, hdr->hash)) {
  462. invoke_on_recv_pkt(chan, hdr->type, hdr->sub_type, hdr->id, (const char*)&hdr->data[0], hdr->length);
  463. } else {
  464. OutputDebugStringA("pkt hash failed!\n");
  465. chan->cb.dbg(chan, 0, "pkt hash failed! 4", chan->cb.user_data);
  466. }
  467. }
  468. }
  469. i += hdr->length+sizeof(acm_hdr);
  470. } else {
  471. break;
  472. }
  473. }
  474. if (i != ri->offset) {
  475. memmove(&ri->buf[0], &ri->buf[i], ri->offset-i);
  476. }
  477. ri->offset -= i;
  478. if (ri->offset == ri->buf_len) { // double large
  479. ri->buf_len = 2 * ri->buf_len;
  480. ri->buf = (char *)realloc(ri->buf, ri->buf_len);
  481. }
  482. }
  483. } while (n > 0);
  484. #ifdef RVC_OS_WIN
  485. return (n == 0 || (GetLastError() == WSAEWOULDBLOCK)) ? 0 : -1;
  486. #else
  487. //recv ==0,对端断开
  488. if (n == 0) {
  489. result = -1;
  490. }
  491. else {
  492. result = (n < 0 || (errno == EWOULDBLOCK) || (errno == EINTR) || (errno == EAGAIN)) ? 0 : -1;
  493. }
  494. snprintf(buffer, 128, "on_recv n:%d, result:%d, errno:%d", n, result, errno);
  495. chan->cb.dbg(chan, 0, buffer, chan->cb.user_data);
  496. return result;
  497. #endif
  498. }
  499. static int on_send(bizchan_t *chan, SOCKET conn)
  500. {
  501. char buffer[128];
  502. send_info_t *si = &chan->send_info;
  503. int n = 1;//默认为1
  504. int result = -1;
  505. #ifdef RVC_OS_WIN
  506. EnterCriticalSection(&si->lock);
  507. #else
  508. pthread_mutex_lock(&si->lock);
  509. #endif
  510. if (!ListEntry_IsEmpty(&si->send_list)) {
  511. do {
  512. send_buf_node *t = CONTAINING_RECORD(ListEntry_GetHead(&si->send_list), send_buf_node, entry);
  513. if (t->need_encrypt) {
  514. acm_hdr *hdr = (acm_hdr*)t->buf;
  515. char *enc_buf = (char*)malloc(t->left+sizeof(acm_hdr));
  516. RC4_set_key(&chan->local_key, sizeof(chan->local_pwd), (unsigned char*)chan->local_pwd);
  517. RC4(&chan->local_key, t->left, (const unsigned char*)(&hdr->data[0]), (unsigned char*)(enc_buf+sizeof(acm_hdr)));
  518. memcpy(enc_buf, hdr, sizeof(acm_hdr));
  519. free(t->buf);
  520. t->buf = enc_buf;
  521. t->need_encrypt = 0;
  522. }
  523. n = send(conn, t->buf+t->sended, t->left, 0);
  524. snprintf(buffer, 128, "on_send n:%d, t->sended:%d, t->left:%d", n, t->sended, t->left);
  525. chan->cb.dbg(chan, 0, buffer, chan->cb.user_data);
  526. if (n > 0) {
  527. char tmp[32];
  528. sprintf(tmp, "send out %d bytes!\n", n);
  529. OutputDebugStringA(tmp);
  530. t->left -= n;
  531. t->sended += n;
  532. if (t->left == 0) {
  533. ListEntry_DeleteNode(&t->entry);
  534. free(t->buf);
  535. free(t);
  536. }
  537. GetTick(&chan->last_local_active_time, &chan->last_local_active_time);
  538. }
  539. } while (n > 0 && !ListEntry_IsEmpty(&si->send_list));
  540. }
  541. #ifdef RVC_OS_WIN
  542. LeaveCriticalSection(&si->lock);
  543. return (n >= 0 || (GetLastError() == WSAEWOULDBLOCK)) ? 0 : -1;
  544. #else
  545. pthread_mutex_unlock(&si->lock);
  546. if (n > 0) {
  547. result = 0;
  548. }
  549. else if (n < 0){
  550. if ((errno == EWOULDBLOCK) || (errno == EINTR) || (errno == EAGAIN)) {
  551. result = 0;
  552. }
  553. }
  554. return result;
  555. #endif
  556. }
  557. static void on_close(bizchan_t *chan)
  558. {
  559. invoke_on_close(chan);
  560. }
  561. #ifdef RVC_OS_WIN
  562. static int prepare_socket(SOCKET s, HANDLE evt)
  563. {
  564. BOOL opt = TRUE;
  565. int rc;
  566. rc = setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (char*)&opt, sizeof(opt));
  567. opt = TRUE;
  568. if (rc == 0)
  569. rc = setsockopt(s, SOL_SOCKET, SO_DONTLINGER, (char*)&opt, sizeof(opt));
  570. if (rc == 0)
  571. rc = WSAEventSelect(s, evt, FD_CONNECT | FD_ACCEPT | FD_CLOSE | FD_READ | FD_WRITE);
  572. return rc;
  573. }
  574. static void dump_exception(PEXCEPTION_POINTERS ExceptionInfo)
  575. {
  576. char tmp[MAX_PATH];
  577. HANDLE hDumpFile;
  578. sprintf(tmp, ".\\bizchan_%d.dmp", GetCurrentProcessId());
  579. hDumpFile = CreateFileA( tmp, GENERIC_READ | GENERIC_WRITE,
  580. 0, NULL, CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL );
  581. if( ( hDumpFile != NULL ) && ( hDumpFile != INVALID_HANDLE_VALUE ) )
  582. {
  583. MINIDUMP_EXCEPTION_INFORMATION mdei;
  584. MINIDUMP_TYPE mdt;
  585. mdei.ThreadId = GetCurrentThreadId();
  586. mdei.ExceptionPointers = ExceptionInfo;
  587. mdei.ClientPointers = FALSE;
  588. mdt = MiniDumpWithFullMemory;
  589. MiniDumpWriteDump( GetCurrentProcess(), GetCurrentProcessId(),
  590. hDumpFile, mdt, (ExceptionInfo != 0) ? &mdei : 0, 0, 0 );
  591. CloseHandle( hDumpFile );
  592. }
  593. }
  594. #endif
  595. #ifdef RVC_OS_WIN
  596. static void process(bizchan_t *chan)
  597. {
  598. SOCKET conn = INVALID_SOCKET;
  599. struct sockaddr_in addr = {0};
  600. int rc;
  601. HANDLE evts[2] = {chan->evt, NULL};
  602. evts[1] = WSACreateEvent();
  603. // try connect to primary proxy server
  604. addr.sin_family = AF_INET;
  605. addr.sin_port = htons(chan->config.proxy_server_port);
  606. addr.sin_addr.s_addr = inet_addr(chan->config.proxy_server);
  607. chan->b_primary_server = TRUE;
  608. conn = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  609. if (conn == INVALID_SOCKET)
  610. goto on_error;
  611. rc = prepare_socket(conn, evts[1]);
  612. if (rc != 0)
  613. goto on_error;
  614. rc = connect(conn, (struct sockaddr*)&addr, sizeof(addr));
  615. if (rc == -1 && WSAGetLastError() == WSAEWOULDBLOCK) {
  616. rc = 0;
  617. }
  618. if (rc == -1 && chan->config.bak_proxy_server && strlen(chan->config.bak_proxy_server)) { // try connect to back proxy server
  619. closesocket(conn);
  620. conn = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  621. if (conn == INVALID_SOCKET)
  622. goto on_error;
  623. WSAResetEvent(evts[1]);
  624. rc = prepare_socket(conn, evts[1]);
  625. if (rc != 0)
  626. goto on_error;
  627. addr.sin_port = htons(chan->config.bak_proxy_server_port);
  628. addr.sin_addr.s_addr = inet_addr(chan->config.bak_proxy_server);
  629. chan->b_primary_server = 0;
  630. rc = connect(conn, (struct sockaddr*)&addr, sizeof(addr));
  631. if (rc == -1 && WSAGetLastError() == WSAEWOULDBLOCK) {
  632. rc = 0;
  633. }
  634. }
  635. if (rc == -1)
  636. goto on_error;
  637. if (rc == 0)
  638. {
  639. proxy_ack_hdr ack_hdr;
  640. int ack_hdr_recv_bytes = 0;
  641. lpfn_cryptionfun encodefun = encodestring;
  642. lpfn_cryptionfun decodefun = decodestring;
  643. if (1 == chan->config.crypt_type){
  644. encodefun = encodestring_mobile;
  645. decodefun = decodestring_mobile;
  646. }
  647. while (!chan->stop_flag && !chan->connected)
  648. { // wait until connected
  649. DWORD dwRet = WaitForMultipleObjects(2, evts, FALSE, MAX_TIMEOUT);
  650. if (dwRet == WAIT_OBJECT_0)
  651. {
  652. WSAResetEvent(evts[0]);
  653. } else if (dwRet == WAIT_OBJECT_0+1)
  654. {
  655. int error = 0;
  656. WSANETWORKEVENTS netevents;
  657. if (WSAEnumNetworkEvents(conn, evts[1], &netevents) != SOCKET_ERROR)
  658. {
  659. if (netevents.lNetworkEvents & FD_CONNECT)
  660. {
  661. if (netevents.iErrorCode[FD_CONNECT_BIT])
  662. error = netevents.iErrorCode[FD_CONNECT_BIT];
  663. }
  664. if (error == 0)
  665. {
  666. if (netevents.lNetworkEvents & FD_WRITE)
  667. {
  668. error = netevents.iErrorCode[FD_WRITE_BIT];
  669. if (error == 0) {
  670. proxy_hdr hdr = {0};
  671. hdr.tag[0] = 'A';
  672. hdr.tag[1] = 'C';
  673. hdr.tag[2] = 'M';
  674. hdr.version = ACM_PROTOCOL_VERSION;
  675. generate_rand_key(chan->local_pwd, sizeof(chan->local_pwd));
  676. encodefun((unsigned char*)&hdr.encrypt_key[0], sizeof(hdr.encrypt_key), (unsigned char*)&chan->local_pwd[0], sizeof(chan->local_pwd));
  677. hdr.encrypt_keyhash = hash_key(chan->local_pwd, sizeof(chan->local_pwd));
  678. memset(hdr.callee_id, ' ', sizeof(hdr.callee_id));
  679. memset(hdr.caller_id, ' ', sizeof(hdr.caller_id));
  680. memset(hdr.client_id, ' ', sizeof(hdr.client_id));
  681. strncpy(&hdr.callee_id[0], chan->config.agent_id, sizeof(hdr.callee_id)-1);
  682. strncpy(&hdr.caller_id[0], chan->config.session_id, sizeof(hdr.caller_id)-1);
  683. if (chan->config.client_id) {
  684. strncpy(&hdr.client_id[0], chan->config.client_id, sizeof(hdr.client_id)-1);
  685. } else {
  686. hdr.client_id[0] = 0;
  687. }
  688. hdr.rtp_port = chan->config.video.rtp_port;
  689. hdr.media_desc = chan->config.video.desc;
  690. if (send(conn, (char*)&hdr, sizeof(hdr), 0) != sizeof(hdr)) {
  691. error = -1;
  692. }
  693. }
  694. }
  695. }
  696. if (error == 0)
  697. {
  698. if (netevents.lNetworkEvents & FD_CLOSE)
  699. error = -1;
  700. }
  701. if (error == 0)
  702. {
  703. if (netevents.lNetworkEvents & FD_READ)
  704. {
  705. error = netevents.iErrorCode[FD_READ_BIT];
  706. if (error == 0)
  707. {
  708. int t;
  709. do
  710. {
  711. t = recv(conn, (char*)&ack_hdr + ack_hdr_recv_bytes, sizeof(ack_hdr)-ack_hdr_recv_bytes, 0);
  712. if (t > 0) {
  713. ack_hdr_recv_bytes += t;
  714. if (ack_hdr_recv_bytes == sizeof(ack_hdr)) {
  715. decodefun((unsigned char*)&chan->remote_pwd[0], sizeof(chan->remote_pwd), (unsigned char*)&ack_hdr.encrypt_key[0], sizeof(ack_hdr.encrypt_key));
  716. if (check_hash(chan->remote_pwd, sizeof(chan->remote_pwd), ack_hdr.encrypt_keyhash)) {
  717. chan->remote_video_rtp_port = ack_hdr.rtp_port;
  718. chan->remote_video_desc = ack_hdr.media_desc;
  719. memcpy(chan->remote_client_id, ack_hdr.client_id, sizeof(chan->remote_client_id));
  720. chan->connected = TRUE;
  721. break;
  722. } else {
  723. error = -1;
  724. }
  725. }
  726. }
  727. }
  728. while (t > 0 && error == 0);
  729. if (t <= 0 && (WSAGetLastError() != WSAEWOULDBLOCK))
  730. error = -1;
  731. }
  732. }
  733. }
  734. if (error != 0)
  735. goto on_error;
  736. }
  737. } else {
  738. goto on_error;
  739. }
  740. }
  741. }
  742. assert(conn != INVALID_SOCKET);
  743. if (chan->connected)
  744. {
  745. invoke_on_connect(chan, 0);
  746. chan->recv_info.offset = 0;
  747. rc = on_recv(chan, conn);
  748. if (rc)
  749. {
  750. on_close(chan);
  751. goto on_error;
  752. }
  753. while (!chan->stop_flag)
  754. {
  755. DWORD dwRet = WaitForMultipleObjects(2, evts, FALSE, 1000);
  756. if (dwRet == WAIT_OBJECT_0)
  757. {
  758. WSAResetEvent(chan->evt);
  759. rc = on_send(chan, conn);
  760. if (rc != 0)
  761. break;
  762. } else if (dwRet == WAIT_OBJECT_0+1)
  763. {
  764. WSANETWORKEVENTS netevents;
  765. if (WSAEnumNetworkEvents(conn, evts[1], &netevents) != SOCKET_ERROR)
  766. {
  767. if (netevents.lNetworkEvents & FD_READ)
  768. {
  769. rc = on_recv(chan, conn);
  770. }
  771. if (netevents.lNetworkEvents & FD_WRITE) {
  772. rc = on_send(chan, conn);
  773. }
  774. if (rc || (netevents.lNetworkEvents & FD_CLOSE)) {
  775. break;
  776. }
  777. }
  778. } else if (dwRet == WAIT_TIMEOUT) {
  779. LARGE_INTEGER now;
  780. GetTick(&chan->last_remote_active_time, &now);
  781. if (now.QuadPart - chan->last_remote_active_time.QuadPart >= PING_INTERVAL) {
  782. GetTick(&chan->last_local_active_time, &now);
  783. if (now.QuadPart - chan->last_local_active_time.QuadPart >= PING_INTERVAL) {
  784. bizchan_post_pkt(chan, ACM_TYPE_PING, 0, 0, 0, 0, NULL, 0);
  785. }
  786. }
  787. } else {
  788. goto on_error;
  789. }
  790. }
  791. on_close(chan);
  792. }
  793. on_error:
  794. OutputDebugStringA("on_error");
  795. if (!chan->connected) {
  796. invoke_on_connect(chan, -1); // connect failed!
  797. } else {
  798. //....
  799. OutputDebugStringA("connected, and error!");
  800. }
  801. if (conn != INVALID_SOCKET)
  802. closesocket(conn);
  803. WSACloseEvent(evts[1]);
  804. }
  805. #else
  806. static void do_shake_send(bizchan_t* chan, SOCKET conn, int* error) {
  807. proxy_hdr hdr = { 0 };
  808. proxy_ack_hdr ack_hdr;
  809. int ack_hdr_recv_bytes = 0;
  810. lpfn_cryptionfun encodefun = encodestring;
  811. lpfn_cryptionfun decodefun = decodestring;
  812. if (1 == chan->config.crypt_type) {
  813. encodefun = encodestring_mobile;
  814. decodefun = decodestring_mobile;
  815. }
  816. hdr.tag[0] = 'A';
  817. hdr.tag[1] = 'C';
  818. hdr.tag[2] = 'M';
  819. hdr.version = ACM_PROTOCOL_VERSION;
  820. generate_rand_key(chan->local_pwd, sizeof(chan->local_pwd));
  821. encodefun((unsigned char*)&hdr.encrypt_key[0], sizeof(hdr.encrypt_key), (unsigned char*)&chan->local_pwd[0], sizeof(chan->local_pwd));
  822. hdr.encrypt_keyhash = hash_key(chan->local_pwd, sizeof(chan->local_pwd));
  823. memset(hdr.callee_id, ' ', sizeof(hdr.callee_id));
  824. memset(hdr.caller_id, ' ', sizeof(hdr.caller_id));
  825. memset(hdr.client_id, ' ', sizeof(hdr.client_id));
  826. strncpy(&hdr.callee_id[0], chan->config.agent_id, sizeof(hdr.callee_id) - 1);
  827. strncpy(&hdr.caller_id[0], chan->config.session_id, sizeof(hdr.caller_id) - 1);
  828. if (chan->config.client_id) {
  829. strncpy(&hdr.client_id[0], chan->config.client_id, sizeof(hdr.client_id) - 1);
  830. }
  831. else {
  832. hdr.client_id[0] = 0;
  833. }
  834. hdr.rtp_port = chan->config.video.rtp_port;
  835. hdr.media_desc = chan->config.video.desc;
  836. if (send(conn, (char*)&hdr, sizeof(hdr), 0) != sizeof(hdr)) {
  837. *error = -1;
  838. }
  839. }
  840. static void do_shake_recv(bizchan_t* chan, SOCKET conn, int* error) {
  841. int t;
  842. proxy_ack_hdr ack_hdr;
  843. int ack_hdr_recv_bytes = 0;
  844. lpfn_cryptionfun encodefun = encodestring;
  845. lpfn_cryptionfun decodefun = decodestring;
  846. char buffer[128];
  847. if (1 == chan->config.crypt_type) {
  848. encodefun = encodestring_mobile;
  849. decodefun = decodestring_mobile;
  850. }
  851. do
  852. {
  853. t = recv(conn, (char*)&ack_hdr + ack_hdr_recv_bytes, sizeof(ack_hdr) - ack_hdr_recv_bytes, 0);
  854. if (t > 0) {
  855. snprintf(buffer, 128, "do_shake_recv t:%d, ack_hdr_recv_bytes:%d, sizeof(ack_hdr):%d", t, ack_hdr_recv_bytes, sizeof(ack_hdr));
  856. chan->cb.dbg(chan, 0, buffer, chan->cb.user_data);
  857. ack_hdr_recv_bytes += t;
  858. if (ack_hdr_recv_bytes == sizeof(ack_hdr)) {
  859. decodefun((unsigned char*)&chan->remote_pwd[0], sizeof(chan->remote_pwd), (unsigned char*)&ack_hdr.encrypt_key[0], sizeof(ack_hdr.encrypt_key));
  860. chan->cb.dbg(chan, 0, "do_shake_recv remote_pwd:", chan->cb.user_data);
  861. chan->cb.dbg(chan, 0, chan->remote_pwd, chan->cb.user_data);
  862. if (check_hash(chan->remote_pwd, sizeof(chan->remote_pwd), ack_hdr.encrypt_keyhash)) {
  863. chan->remote_video_rtp_port = ack_hdr.rtp_port;
  864. chan->remote_video_desc = ack_hdr.media_desc;
  865. memcpy(chan->remote_client_id, ack_hdr.client_id, sizeof(chan->remote_client_id));
  866. chan->connected = TRUE;
  867. break;
  868. }
  869. else {
  870. *error = -1;
  871. chan->cb.dbg(chan, 0, "do_shake_recv check_hash failed", chan->cb.user_data);
  872. }
  873. }
  874. }
  875. } while (t > 0 && *error == 0);
  876. #ifdef RVC_OS_WIN
  877. if (t <= 0 && (WSAGetLastError() != WSAEWOULDBLOCK))
  878. *error = -1;
  879. #else
  880. if (t <= 0 && ((errno != EWOULDBLOCK) || (errno != EINTR) || (errno != EAGAIN))) {
  881. *error = -1;
  882. snprintf(buffer, 128, "do_shake_recv errno:%d", errno);
  883. chan->cb.dbg(chan, 0, buffer, chan->cb.user_data);
  884. chan->cb.dbg(chan, 0, "do_shake_recv t<=0 failed", chan->cb.user_data);
  885. }
  886. #endif
  887. }
  888. static void build_fd_sets(SOCKET conn, int evt, fd_set* read_fds, fd_set* write_fds, fd_set* except_fds)
  889. {
  890. FD_ZERO(read_fds);
  891. FD_SET(conn, read_fds);
  892. FD_SET(evt, read_fds);
  893. FD_ZERO(write_fds);
  894. //FD_SET(conn, write_fds);
  895. FD_ZERO(except_fds);
  896. FD_SET(conn, except_fds);
  897. FD_SET(evt, except_fds);
  898. }
  899. static int set_socket_noblock(SOCKET socket) {
  900. #ifdef RVC_OS_WIN
  901. {
  902. //windows将socket设置成非阻塞的方式
  903. unsigned long on = 1;
  904. if (ioctlsocket(socket, FIONBIO, &on) < 0) {
  905. return -1;
  906. }
  907. }
  908. #else
  909. {
  910. //linux将socket设置成非阻塞的方式
  911. //将新socket设置为non-blocking
  912. int oldflag = fcntl(socket, F_GETFL, 0);
  913. int newflag = oldflag | O_NONBLOCK;
  914. if (fcntl(socket, F_SETFL, newflag) == -1) {
  915. return -1;
  916. }
  917. }
  918. #endif
  919. return 0;
  920. }
  921. SOCKET connect_server(bizchan_t* chan, char* server, int port, int timeout)
  922. {
  923. SOCKET m_hSocket;
  924. struct sockaddr_in addrSrv = { 0 };
  925. //fd_set writeset;
  926. m_hSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  927. if (m_hSocket == INVALID_SOCKET)
  928. return INVALID_SOCKET;
  929. addrSrv.sin_family = AF_INET;
  930. addrSrv.sin_addr.s_addr = inet_addr(server);
  931. addrSrv.sin_port = htons(port);
  932. int ret = connect(m_hSocket, (struct sockaddr*)&addrSrv, sizeof(addrSrv));
  933. if (ret == 0) {
  934. return m_hSocket;
  935. }
  936. else {
  937. chan->cb.dbg(chan, 0, "connect_server failed", chan->cb.user_data);
  938. #ifdef RVC_OS_WIN
  939. closesocket(m_hSocket);
  940. #else
  941. close(m_hSocket);
  942. #endif
  943. return INVALID_SOCKET;
  944. }
  945. /*
  946. #ifdef RVC_OS_WIN
  947. //windows下检测WSAEWOULDBLOCK
  948. if (ret < 0 && WSAGetLastError() != WSAEWOULDBLOCK) {
  949. closesocket(m_hSocket);
  950. return INVALID_SOCKET;
  951. }
  952. #else
  953. //linux下需要检测EINPROGRESS和EINTR
  954. if (ret < 0 && (errno != EINPROGRESS || errno != EINTR)) {
  955. close(m_hSocket);
  956. return INVALID_SOCKET;
  957. }
  958. #endif
  959. chan->cb.dbg(chan, 0, "processLinux connect_server 5", chan->cb.user_data);
  960. FD_ZERO(&writeset);
  961. FD_SET(m_hSocket, &writeset);
  962. struct timeval tv;
  963. tv.tv_sec = timeout;
  964. //可以利用tv_usec做更小精度的超时设置
  965. tv.tv_usec = 0;
  966. if (select(m_hSocket + 1, NULL, &writeset, NULL, &tv) != 1) {
  967. #ifdef RVC_OS_WIN
  968. closesocket(m_hSocket);
  969. #else
  970. close(m_hSocket);
  971. #endif
  972. chan->cb.dbg(chan, 0, "processLinux connect_server 6", chan->cb.user_data);
  973. return INVALID_SOCKET;
  974. }
  975. return m_hSocket;*/
  976. }
  977. static void process(bizchan_t* chan) {
  978. int error = 0;
  979. SOCKET conn = connect_server(chan, chan->config.proxy_server, chan->config.proxy_server_port, 5);
  980. if (conn == INVALID_SOCKET && chan->config.bak_proxy_server && strlen(chan->config.bak_proxy_server)) { // try connect to back proxy server
  981. conn = connect_server(chan, chan->config.bak_proxy_server, chan->config.bak_proxy_server_port, 5);
  982. }
  983. if (conn == INVALID_SOCKET) {
  984. error = -1;
  985. }
  986. else {
  987. do_shake_send(chan, conn, &error);
  988. if (error != 0) {
  989. chan->cb.dbg(chan, 0, "do_shake_send failed", chan->cb.user_data);
  990. goto on_error;
  991. }
  992. do_shake_recv(chan, conn, &error);
  993. if (error != 0) {
  994. chan->cb.dbg(chan, 0, "do_shake_recv failed", chan->cb.user_data);
  995. goto on_error;
  996. }
  997. }
  998. if (chan->connected)
  999. {
  1000. char buffer[128];
  1001. int rc;
  1002. fd_set read_fds;
  1003. fd_set write_fds;
  1004. fd_set except_fds;
  1005. int maxfd = max(conn, chan->evt[0]);
  1006. rc = set_socket_noblock(conn);
  1007. if (rc)
  1008. {
  1009. on_close(chan);
  1010. chan->cb.dbg(chan, 2, "set_socket_noblock failed", chan->cb.user_data);
  1011. goto on_error;
  1012. }
  1013. invoke_on_connect(chan, 0);
  1014. chan->recv_info.offset = 0;
  1015. /*rc = on_recv(chan, conn);
  1016. if (rc)
  1017. {
  1018. on_close(chan);
  1019. chan->cb.dbg(chan, 2, "on_recv failed", chan->cb.user_data);
  1020. goto on_error;
  1021. }*/
  1022. while (!chan->stop_flag)
  1023. {
  1024. fd_set read_fds;
  1025. fd_set write_fds;
  1026. fd_set except_fds;
  1027. int maxfd = max(conn, chan->evt[0]);
  1028. struct timeval tv;
  1029. tv.tv_sec = 1;
  1030. //可以利用tv_usec做更小精度的超时设置
  1031. tv.tv_usec = 0;
  1032. // Select() updates fd_set's, so we need to build fd_set's before each select()call.
  1033. build_fd_sets(conn, chan->evt[0], &read_fds, &write_fds, &except_fds);
  1034. int activity = select(maxfd + 1, &read_fds, &write_fds, &except_fds, &tv);
  1035. if (activity == -1) {
  1036. perror("select()");
  1037. chan->cb.dbg(chan, 0, "chan select() error.", chan->cb.user_data);
  1038. goto on_error;
  1039. }
  1040. else if (activity == 0) {
  1041. // timeout
  1042. printf("select() returns 0, timeout.\n");
  1043. //chan->cb.dbg(chan, 0, "select() returns 0, timeout.", chan->cb.user_data);
  1044. LARGE_INTEGER now;
  1045. GetTick(&chan->last_remote_active_time, &now);
  1046. if (now.QuadPart - chan->last_remote_active_time.QuadPart >= PING_INTERVAL) {
  1047. GetTick(&chan->last_local_active_time, &now);
  1048. if (now.QuadPart - chan->last_local_active_time.QuadPart >= PING_INTERVAL) {
  1049. bizchan_post_pkt(chan, ACM_TYPE_PING, 0, 0, 0, 0, NULL, 0);
  1050. }
  1051. }
  1052. }else {
  1053. /* All fd_set's should be checked. */
  1054. if (FD_ISSET(conn, &read_fds)) {
  1055. rc = on_recv(chan, conn);
  1056. if (rc != 0)
  1057. break;
  1058. }
  1059. if (FD_ISSET(chan->evt[0], &read_fds)) {
  1060. //读取管道数据并丢弃,管道主要用于唤醒select
  1061. char pipe_read_buffer[4];
  1062. int n = read(chan->evt[0], pipe_read_buffer, 4);
  1063. if (n >= 0) {
  1064. rc = on_send(chan, conn);
  1065. if (rc != 0)
  1066. break;
  1067. }
  1068. else {
  1069. chan->cb.dbg(chan, 0, "pipe read < 0", chan->cb.user_data);
  1070. }
  1071. }
  1072. if (FD_ISSET(conn, &except_fds)) {
  1073. chan->cb.dbg(chan, 0, "select() except_fds set.", chan->cb.user_data);
  1074. error = -1;
  1075. }
  1076. if (error != 0) {
  1077. goto on_error;
  1078. }
  1079. }
  1080. }
  1081. on_close(chan);
  1082. }
  1083. on_error:
  1084. OutputDebugStringA("on_error");
  1085. chan->cb.dbg(chan, 0, "on_error", chan->cb.user_data);
  1086. if (!chan->connected) {
  1087. invoke_on_connect(chan, -1); // connect failed!
  1088. }
  1089. else {
  1090. //....
  1091. OutputDebugStringA("connected, and error!");
  1092. chan->cb.dbg(chan, 0, "connected, and error!", chan->cb.user_data);
  1093. }
  1094. if (conn != INVALID_SOCKET) {
  1095. #ifdef RVC_OS_WIN
  1096. closesocket(conn);
  1097. #else
  1098. close(conn);
  1099. #endif
  1100. }
  1101. }
  1102. #endif
  1103. #ifdef RVC_OS_WIN
  1104. static unsigned int __stdcall work_proc(LPVOID arg)
  1105. #else
  1106. static void* work_proc(void* arg)
  1107. #endif
  1108. {
  1109. bizchan_t *chan = (bizchan_t *)arg;
  1110. #ifdef RVC_OS_WIN
  1111. __try {
  1112. process(chan);
  1113. } __except(dump_exception(GetExceptionInformation()), EXCEPTION_EXECUTE_HANDLER) {
  1114. //....
  1115. }
  1116. #else
  1117. chan->cb.dbg(chan, 0, "work_proc start", chan->cb.user_data);
  1118. process(chan);
  1119. #endif
  1120. return 0;
  1121. }
  1122. #ifdef RVC_OS_WIN
  1123. static int init_decode_func(const bizchan_config_t* config, const bizchan_callback_t* cb)
  1124. {
  1125. int ret = -1;
  1126. if (!encodestring) {
  1127. HMODULE hInst = LoadLibraryA("acmstrenc.dll");
  1128. if (hInst) {
  1129. encodestring = (lpfn_cryptionfun)GetProcAddress(hInst, "encodestring");
  1130. }
  1131. if (!encodestring)
  1132. return ret;
  1133. }
  1134. if (!decodestring) {
  1135. HMODULE hInst = LoadLibraryA("acmstrdec.dll");
  1136. if (hInst) {
  1137. decodestring = (lpfn_cryptionfun)GetProcAddress(hInst, "decodestring");
  1138. }
  1139. if (!decodestring)
  1140. return ret;
  1141. }
  1142. if (!encodestring_mobile) {
  1143. HMODULE hInst = LoadLibraryA("acmstrenc_mobile.dll");
  1144. if (hInst) {
  1145. encodestring_mobile = (lpfn_cryptionfun)GetProcAddress(hInst, "encodestring");
  1146. }
  1147. if (!encodestring_mobile)
  1148. return ret;
  1149. }
  1150. if (!decodestring_mobile) {
  1151. HMODULE hInst = LoadLibraryA("acmstrdec_mobile.dll");
  1152. if (hInst) {
  1153. decodestring_mobile = (lpfn_cryptionfun)GetProcAddress(hInst, "decodestring");
  1154. }
  1155. if (!decodestring_mobile)
  1156. return ret;
  1157. }
  1158. if (encodestring && decodestring && encodestring_mobile && decodestring_mobile){
  1159. ret = 0;
  1160. }
  1161. return ret;
  1162. }
  1163. #else
  1164. static int init_decode_func(const bizchan_config_t* config, const bizchan_callback_t* cb)
  1165. {
  1166. int ret = -1;
  1167. if (!encodestring) {
  1168. HMODULE hInst = LoadLibraryA("libacmstrenc.so");
  1169. if (hInst) {
  1170. encodestring = (lpfn_cryptionfun)GetProcAddress(hInst, "encodestring");
  1171. }
  1172. else {
  1173. cb->dbg(NULL, 2, "libacmstrenc LoadLibraryA failed", cb->user_data);
  1174. }
  1175. if (!encodestring) {
  1176. cb->dbg(NULL, 2, "libacmstrenc GetProcAddress failed", cb->user_data);
  1177. return ret;
  1178. }
  1179. }
  1180. if (!decodestring) {
  1181. HMODULE hInst = LoadLibraryA("libacmstrdec.so");
  1182. if (hInst) {
  1183. decodestring = (lpfn_cryptionfun)GetProcAddress(hInst, "decodestring");
  1184. }
  1185. if (!decodestring)
  1186. return ret;
  1187. }
  1188. if (1 == config->crypt_type) {
  1189. if (!encodestring_mobile) {
  1190. HMODULE hInst = LoadLibraryA("libacmstrenc_mobile.so");
  1191. if (hInst) {
  1192. encodestring_mobile = (lpfn_cryptionfun)GetProcAddress(hInst, "encodestring");
  1193. }
  1194. if (!encodestring_mobile)
  1195. return ret;
  1196. }
  1197. if (!decodestring_mobile) {
  1198. HMODULE hInst = LoadLibraryA("libacmstrdec_mobile.so");
  1199. if (hInst) {
  1200. decodestring_mobile = (lpfn_cryptionfun)GetProcAddress(hInst, "decodestring");
  1201. }
  1202. if (!decodestring_mobile)
  1203. return ret;
  1204. }
  1205. }
  1206. if (encodestring && decodestring) {
  1207. ret = 0;
  1208. }
  1209. return ret;
  1210. }
  1211. #endif
  1212. BIZCHAN_API(int) bizchan_create(const bizchan_config_t *config, const bizchan_callback_t *cb, bizchan_t **p_chan)
  1213. {
  1214. bizchan_t *chan = NULL;
  1215. if (-1 == init_decode_func(config, cb)){
  1216. cb->dbg(chan, 2, "init_decode_func failed", cb->user_data);
  1217. return -1;
  1218. }
  1219. if (!config || !p_chan) {
  1220. cb->dbg(chan, 2, "config or chan is null, failed", cb->user_data);
  1221. return -1;
  1222. }
  1223. if (config_check(config) != 0) {
  1224. cb->dbg(chan, 2, "config_check failed", cb->user_data);
  1225. return -1;
  1226. }
  1227. if (callback_check(cb) != 0) {
  1228. cb->dbg(chan, 2, "callback_check failed", cb->user_data);
  1229. return -1;
  1230. }
  1231. chan = (bizchan_t*)malloc(sizeof(bizchan_t));
  1232. if (!chan)
  1233. goto on_error;
  1234. memset(chan, 0, sizeof(bizchan_t));
  1235. chan->recv_info.buf_len = DEFAULT_RX_BUF_SIZE;
  1236. chan->recv_info.buf = (char*)malloc(chan->recv_info.buf_len);
  1237. if (!chan->recv_info.buf)
  1238. goto on_error;
  1239. memcpy(&chan->cb, cb, sizeof(bizchan_callback_t));
  1240. if (config_copy(chan, config, &chan->config) != 0)
  1241. goto on_error;
  1242. ListEntry_InitHead(&chan->send_info.send_list);
  1243. #ifdef RVC_OS_WIN
  1244. InitializeCriticalSection(&chan->send_info.lock);
  1245. #else
  1246. pthread_mutexattr_init(&chan->send_info.attr);
  1247. pthread_mutexattr_settype(&chan->send_info.attr, PTHREAD_MUTEX_RECURSIVE);
  1248. pthread_mutex_init(&chan->send_info.lock, &chan->send_info.attr);
  1249. #endif
  1250. screen_decoder_session_create(&chan->dec_session);
  1251. #ifdef RVC_OS_WIN
  1252. chan->evt = WSACreateEvent();
  1253. if (!chan->evt)
  1254. goto on_error;
  1255. #else
  1256. pipe(chan->evt);
  1257. #endif
  1258. *p_chan = chan;
  1259. cb->dbg(chan, 0, "bizchan_create success", cb->user_data);
  1260. return 0;
  1261. on_error:
  1262. cb->dbg(chan, 2, "bizchan_create on_error", cb->user_data);
  1263. bizchan_destroy(chan);
  1264. return -1;
  1265. }
  1266. BIZCHAN_API(int) bizchan_winsync_set_cb(bizchan_t *chan, OnRecvPacket pkt_cb, OnMode mode_cb, void *user_data)
  1267. {
  1268. chan->winsync_on_recv_cb = pkt_cb;
  1269. chan->mode_cb = mode_cb;
  1270. chan->winsync_user_data = user_data;
  1271. return 0;
  1272. }
  1273. BIZCHAN_API(void) bizchan_destroy(bizchan_t *chan)
  1274. {
  1275. if (chan) {
  1276. #ifdef RVC_OS_WIN
  1277. if (chan->evt) {
  1278. WSACloseEvent(chan->evt);
  1279. chan->evt = NULL;
  1280. }
  1281. #else
  1282. close(chan->evt[0]);
  1283. close(chan->evt[1]);
  1284. #endif
  1285. invoke_on_destroy(chan);
  1286. #ifdef RVC_OS_WIN
  1287. assert(chan->work_thread == NULL);
  1288. #endif
  1289. config_free(&chan->config);
  1290. if (chan->recv_info.buf)
  1291. free(chan->recv_info.buf);
  1292. if (chan->recv_info.unzip_buf)
  1293. free(chan->recv_info.unzip_buf);
  1294. while (!ListEntry_IsEmpty(&chan->send_info.send_list)) {
  1295. send_buf_node *p = CONTAINING_RECORD(ListEntry_RemoveListHead(&chan->send_info.send_list), send_buf_node, entry);
  1296. free(p->buf);
  1297. free(p);
  1298. }
  1299. #ifdef RVC_OS_WIN
  1300. DeleteCriticalSection(&chan->send_info.lock);
  1301. #else
  1302. pthread_mutex_destroy(&chan->send_info.lock);
  1303. #endif
  1304. if (chan->dec_session) {
  1305. screen_decoder_session_destroy(chan->dec_session);
  1306. chan->dec_session = NULL;
  1307. }
  1308. free(chan);
  1309. }
  1310. }
  1311. BIZCHAN_API(void) bizchan_set_tag(bizchan_t *chan, void *tag)
  1312. {
  1313. chan->tag = tag;
  1314. }
  1315. BIZCHAN_API(void*) bizchan_get_tag(bizchan_t *chan)
  1316. {
  1317. return chan->tag;
  1318. }
  1319. BIZCHAN_API(int) bizchan_start_connect(bizchan_t *chan)
  1320. {
  1321. DWORD threadId;
  1322. if (!chan) {
  1323. return -1;
  1324. }
  1325. if (chan->work_thread) {
  1326. chan->cb.dbg(chan, 2, "work_thread exsit!!!", chan->cb.user_data);
  1327. return -1;
  1328. }
  1329. #ifdef RVC_OS_WIN
  1330. WSAResetEvent(chan->evt);
  1331. #endif
  1332. chan->cb.dbg(chan, 0, "bizchan_start_connect 2", chan->cb.user_data);
  1333. chan->stop_flag = 0;
  1334. chan->connected = 0;
  1335. #ifdef RVC_OS_WIN
  1336. chan->work_thread = (HANDLE)_beginthreadex(NULL, 0, &work_proc, (LPVOID)chan, 0, (unsigned int*)&threadId);
  1337. if (!chan->work_thread) {
  1338. return -1;
  1339. }
  1340. #else
  1341. int err = pthread_create(&chan->work_thread, NULL, work_proc, chan);
  1342. if (0 == err) {
  1343. Dbg("create work thread success, %lu.", chan->work_thread);
  1344. }
  1345. else {
  1346. Dbg("create work thread failed.");
  1347. }
  1348. #endif // RVC_OS_WIN
  1349. // we now return, when connected, on_connect will invoked in work_proc thread
  1350. chan->cb.dbg(chan, 0, "bizchan_start_connect success", chan->cb.user_data);
  1351. return 0;
  1352. }
  1353. BIZCHAN_API(int) bizchan_start_close(bizchan_t *chan)
  1354. {
  1355. chan->cb.dbg(chan, 2, "bizchan_start_close", chan->cb.user_data);
  1356. chan->stop_flag = 1;
  1357. #ifdef RVC_OS_WIN
  1358. WSASetEvent(chan->evt);
  1359. #else
  1360. write(chan->evt[1], "stop", sizeof("stop"));
  1361. #endif
  1362. return 0;
  1363. }
  1364. BIZCHAN_API(int) bizchan_close(bizchan_t *chan)
  1365. {
  1366. #ifdef RVC_OS_WIN
  1367. if (chan->work_thread) {
  1368. WaitForSingleObject(chan->work_thread, INFINITE);
  1369. CloseHandle(chan->work_thread);
  1370. chan->work_thread = NULL;
  1371. }
  1372. #else
  1373. pthread_join(chan->work_thread, NULL);
  1374. chan->work_thread = 0;
  1375. #endif // RVC_OS_WIN
  1376. return 0;
  1377. }
  1378. //static FILE *tx_log_fp = NULL;
  1379. BIZCHAN_API(int) bizchan_post_pkt(bizchan_t *chan, int type, int compress, int encrypt, int sub_type, int id, const char *pkt, int pkt_size)
  1380. {
  1381. send_buf_node *t;
  1382. if (!chan->connected)
  1383. return -1;
  1384. t = (send_buf_node *)malloc(sizeof(send_buf_node));
  1385. //if (!tx_log_fp) {
  1386. // tx_log_fp = fopen("c:\\txlog.txt", "wt");
  1387. // fprintf(tx_log_fp, "===================\n");
  1388. //}
  1389. //{
  1390. // SYSTEMTIME st;
  1391. // GetLocalTime(&st);
  1392. // fprintf(tx_log_fp, "[%02d:%02d:%02d.%03d] type = %d, compress = %d, sub_type = %d, id = %d, pkt_size = %d, hash = %d\n",
  1393. // st.wHour, st.wMinute, st.wSecond, st.wMilliseconds,
  1394. // type, compress, sub_type, id, pkt_size, hash32_buf(pkt, pkt_size, 0));
  1395. // fflush(tx_log_fp);
  1396. //}
  1397. if (type == ACM_TYPE_SRN) {
  1398. int cat = ACM_SRN_CAT(sub_type);
  1399. if (cat == ACM_SRN_REQ) {
  1400. chan->screen_img_id = id;
  1401. }
  1402. } else if (type == ACM_TYPE_PHT) {
  1403. int cat = ACM_PHT_CAT(sub_type);
  1404. if (cat == ACM_PHT_REQ) {
  1405. chan->photo_img_id = id;
  1406. }
  1407. }
  1408. if (!compress || pkt_size == 0) {
  1409. acm_hdr *hdr;
  1410. t->buf = (char*)malloc(pkt_size + sizeof(acm_hdr));
  1411. t->left = pkt_size + sizeof(acm_hdr);
  1412. t->sended = 0;
  1413. hdr = (acm_hdr*)&t->buf[0];
  1414. hdr->compress = 0;
  1415. hdr->length = pkt_size;
  1416. hdr->sub_type = sub_type;
  1417. hdr->type = type;
  1418. hdr->id = id;
  1419. hdr->encrypt = !!encrypt;
  1420. hdr->hash = hash_key(pkt, pkt_size);
  1421. memcpy(&hdr->data[0], pkt, pkt_size);
  1422. t->need_encrypt = encrypt;
  1423. } else {
  1424. qlz_state_compress state_compress;
  1425. acm_hdr *hdr;
  1426. int new_pkt_size;
  1427. t->buf = (char*)malloc(2*pkt_size + sizeof(acm_hdr) + 16);
  1428. hdr = (acm_hdr *)&t->buf[0];
  1429. new_pkt_size = (int)qlz_compress(pkt, (char*)&hdr->data[0], pkt_size, &state_compress);
  1430. if (new_pkt_size < pkt_size) {
  1431. hdr->compress = 1;
  1432. hdr->length = new_pkt_size;
  1433. t->left = new_pkt_size + sizeof(acm_hdr);
  1434. } else {
  1435. hdr->compress = 0;
  1436. hdr->length = pkt_size;
  1437. memcpy(&hdr->data[0], pkt, pkt_size);
  1438. t->left = pkt_size + sizeof(acm_hdr);
  1439. }
  1440. hdr->type = type;
  1441. hdr->id = id;
  1442. hdr->sub_type = sub_type;
  1443. hdr->encrypt = !!encrypt;
  1444. hdr->hash = hash_key(pkt, pkt_size);
  1445. t->sended = 0;
  1446. t->need_encrypt = encrypt;
  1447. }
  1448. #ifdef RVC_OS_WIN
  1449. EnterCriticalSection(&chan->send_info.lock);
  1450. ListEntry_AddTail(&chan->send_info.send_list, &t->entry);
  1451. LeaveCriticalSection(&chan->send_info.lock);
  1452. WSASetEvent(chan->evt);
  1453. #else
  1454. pthread_mutex_lock(&chan->send_info.lock);
  1455. ListEntry_AddTail(&chan->send_info.send_list, &t->entry);
  1456. pthread_mutex_unlock(&chan->send_info.lock);
  1457. write(chan->evt[1], "post", sizeof("post"));
  1458. #endif
  1459. return 0;
  1460. }
  1461. BIZCHAN_API(int) bizchan_winsync_send(bizchan_t *chan, int sub_type, const void *buf, int size)
  1462. {
  1463. if (chan && chan->work_thread) {
  1464. return bizchan_post_pkt(chan, ACM_TYPE_SYNC, 1, sub_type, 1, 0, (const char*)buf, size);
  1465. } else {
  1466. return -1;
  1467. }
  1468. }