CWebsocketServer.cpp 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932
  1. #include "stdafx.h"
  2. #if (defined _WIN32 || defined _WIN64)
  3. #include "../mod_browser/IEBrowser_client_g.h"
  4. #else
  5. #include <future>
  6. #include <thread>
  7. #endif
  8. #include "CWebsocketServer.h"
  9. #include "baseEx.h"
  10. #include "CModTools.h"
  11. #include <vector>
  12. #include "MessageType.h"
  13. #include <boost/thread/thread.hpp>
  14. #include <boost/thread/lock_guard.hpp>
  15. #include <iostream>
  16. #include <locale>
  17. #include <codecvt>
  18. #define DEFAULT_SERVER_PORT 9002
  19. namespace Chromium {
  20. server m_wsserver;
  21. CWebsocketServer::CWebsocketServer(const char* strPath, CEntityBase* pEntity) : m_ios(), m_serializer(NULL), m_socket(NULL), m_pEntity(pEntity), m_initSuccess(false)
  22. {
  23. DbgEx("CWebsocketServer constructor");
  24. // Initialize socket client
  25. DbgEx("Initialize socket client");
  26. this->m_pEntity = pEntity;
  27. m_socket = new CSocketClient(m_ios, "127.0.0.1", "4504", pEntity, 0);
  28. DbgEx("Set socket client MessageHandler -- message_handler");
  29. m_socket->SetMessageHandler(this);
  30. // Initialize serializer
  31. DbgEx("Initialize serializer");
  32. this->m_serializer = CWSCodec::getInstance();
  33. this->m_serializer->setEntityHandler(pEntity->GetFunction());
  34. this->m_serializer->init(strPath);
  35. while (Error_Succeed != m_socket->Connect())
  36. Sleep(100);
  37. DbgEx("init Entity Session Manager");
  38. myTest();
  39. m_esm = new EntitySessionManager();
  40. init_websocket();
  41. init_entity_sessions();
  42. }
  43. void CWebsocketServer::updateMsgPool(std::string entityName, std::string& payload, websocketpp::connection_hdl hdl)
  44. {
  45. //存储的消息应具备时效性,保证一个hdl只有一个记录的消息
  46. auto it = m_msg_pool.find(entityName);
  47. if (it != m_msg_pool.end())
  48. {
  49. auto& curArr = it->second;
  50. for (auto i = curArr.begin(); i != curArr.end(); i++)
  51. {
  52. if (&hdl == &i->first)
  53. {
  54. curArr.erase(i);
  55. break;
  56. }
  57. }
  58. it->second.emplace_back(std::make_pair(hdl, payload));
  59. }
  60. else
  61. {
  62. std::vector<std::pair<websocketpp::connection_hdl, std::string>> t_saveMsgs;
  63. t_saveMsgs.emplace_back(std::make_pair(hdl, payload));
  64. m_msg_pool.insert(std::make_pair(entityName, t_saveMsgs));
  65. }
  66. }
  67. void CWebsocketServer::updateNotifyPool(unsigned hdl, unsigned transId)
  68. {
  69. auto it = m_notifyPool.find(hdl);
  70. if (it != m_notifyPool.end())
  71. {
  72. if (transId == 0)
  73. m_notifyPool.erase(it);
  74. else
  75. it->second = transId;
  76. }
  77. else if (transId != 0)
  78. m_notifyPool.insert(std::make_pair(hdl, transId));
  79. }
  80. void CWebsocketServer::storeEntityWithCLass(std::string entityName, std::string entityClass)
  81. {
  82. if (m_entityAndClass.end() == m_entityAndClass.find(entityName)) {
  83. m_entityAndClass.insert(std::make_pair(entityName, entityClass)); //not exist
  84. //DbgEx("storeEntityWithCLass, %s:%s", entityName.c_str(), entityClass.c_str());
  85. }
  86. }
  87. std::pair<bool, std::string> CWebsocketServer::getEntityClass(std::string entityName)
  88. {
  89. auto it = m_entityAndClass.find(entityName);
  90. if (m_entityAndClass.end() == it)
  91. return std::make_pair(false, "");
  92. else
  93. return std::make_pair(true, it->second);
  94. }
  95. void CWebsocketServer::do_sendJsonStartSession(std::string entityName, std::string entityClass)
  96. {
  97. auto startSessionReq = m_esm->GetStartSessionRequest(entityName, entityClass);
  98. auto sessionBuf = this->m_serializer->JsonToBuffer(startSessionReq.second).second;
  99. if (nullptr == sessionBuf)
  100. {
  101. DbgEx("JsonToBuffer return NULL !");
  102. return;
  103. }
  104. m_esm->StoreSessionReq(startSessionReq.first, entityName);
  105. m_esm->MakeNewTransID(sessionBuf, 0);
  106. //保存msg,先建立session
  107. if (m_esm->checkBeginSession(entityName)) {
  108. m_esm->updateBeginSessionTime(entityName);
  109. WriteToFramework(sessionBuf);
  110. }
  111. else
  112. DbgEx("Already begin session in 5s, %s", entityName.c_str());
  113. }
  114. void CWebsocketServer::WriteToFramework(CMessage* msg)
  115. {
  116. m_socket->Write(msg);
  117. }
  118. void CWebsocketServer::deal_webchromium_msg(std::string& payload, websocketpp::connection_hdl hdl, int messageType)
  119. {
  120. DbgEx("deal_webchromium_msg :%d, %s", payload.length(), payload.length() > 800 ? payload.substr(0, 800).append("...").c_str() : payload.c_str());
  121. switch (messageType)
  122. {
  123. case RegisterNotify:
  124. {
  125. std::shared_ptr<cJSON> pJson(cJSON_Parse(payload.c_str()), [](cJSON* p) {
  126. if (nullptr != p) {
  127. cJSON_Delete(p);
  128. //cJSON_free(p);
  129. }
  130. });
  131. auto transIdJson = cJSON_GetObjectItem(pJson.get(), "transId");
  132. if (transIdJson == nullptr)
  133. {
  134. DbgEx("deal_webchromium_msg err, transIdJson == null");
  135. return;
  136. }
  137. int transid = transIdJson->valueint;
  138. updateNotifyPool((long)hdl.lock().get(), transid);
  139. }
  140. break;
  141. case UnRegisterNotify:
  142. updateNotifyPool((long)hdl.lock().get(), 0);
  143. break;
  144. default:
  145. break;
  146. }
  147. }
  148. void CWebsocketServer::deal_logMsg(std::string& payload, websocketpp::connection_hdl hdl, int messageType)
  149. {
  150. std::shared_ptr<cJSON> pJson(cJSON_Parse(payload.c_str()), [](cJSON* p) {
  151. if (nullptr != p) {
  152. cJSON_Delete(p);
  153. //cJSON_free(p);
  154. }
  155. });
  156. if (nullptr == pJson.get()) {
  157. DbgEx("CWebsocketServer -> deal_logMsg, jsonErr:%s", payload.c_str());
  158. return;
  159. }
  160. std::string msg = cJSON_GetObjectItem(pJson.get(), "msg")->valuestring;
  161. auto changeMessageTypeToLogLevel = [](int messageType) ->LOG_LEVEL_E {
  162. switch (messageType)
  163. {
  164. case METHOD_SYSTEM_LOG_DEBUG:
  165. return LOG_LEVEL_DEBUG;
  166. case METHOD_SYSTEM_LOG_INFO:
  167. return LOG_LEVEL_INFO;
  168. case METHOD_SYSTEM_LOG_WARN:
  169. return LOG_LEVEL_WARN;
  170. case METHOD_SYSTEM_LOG_ERROR:
  171. return LOG_LEVEL_ERROR;
  172. default:
  173. return LOG_LEVEL_DEBUG;
  174. }
  175. };
  176. DbgWithLink(changeMessageTypeToLogLevel(messageType), LOG_TYPE_BUSINESS_SYSTEM).withLogProducer(logProducer).setAPI("logMsg").withExtendLog(false).setResultMsg(msg.c_str())();
  177. }
  178. void CWebsocketServer::deal_sessionBreakMsg(std::string& payload, websocketpp::connection_hdl hdl)
  179. {
  180. std::shared_ptr<cJSON> pJson(cJSON_Parse(payload.c_str()), [](cJSON* p) {
  181. if (nullptr != p) {
  182. cJSON_Delete(p);
  183. //cJSON_free(p);
  184. }
  185. });
  186. if (nullptr == pJson.get()) {
  187. DbgEx("CWebsocketServer -> deal_sessionBreakMsg, jsonErr:%s", payload.c_str());
  188. return;
  189. }
  190. int transidNum = cJSON_GetObjectItem(pJson.get(), "transId")->valueint;
  191. int messageTypeNum = cJSON_GetObjectItem(pJson.get(), "messageType")->valueint;
  192. std::string sessionId = std::to_string((LONGLONG)-1);
  193. std::string transid = std::to_string((LONGLONG)transidNum);
  194. std::string js = "{\"messageType\":5,\"errorCode\":\"1537\",\"errorMsg\":\"session break\",\"sessionID\":";
  195. js.append(sessionId.c_str());
  196. js.append(",\"transID\":");
  197. js.append(transid.c_str());
  198. js.append("}");
  199. js = restroreTransId(js);
  200. DbgEx("deal_sessionBreakMsg, len:%d, srcPayLoad:%s, ret:%s", payload.length()
  201. , payload.length() > 800 ? payload.substr(0, 800).append("...").c_str() : payload.c_str(), js.c_str());
  202. m_wsserver.send(hdl, js, websocketpp::frame::opcode::TEXT);
  203. }
  204. void CWebsocketServer::deal_msg(std::string& payload, websocketpp::connection_hdl hdl)
  205. {
  206. boost::lock_guard<boost::mutex> lock(m_dealMsgLock); //在buffer和json处理时,deal_msg会调用多次,导致transId存在重复可能
  207. DbgEx("deal_msg :%d, %s", payload.length(), payload.length() > 800 ? payload.substr(0, 800).append("...").c_str() : payload.c_str());
  208. auto ret = this->m_serializer->JsonToBuffer(payload);
  209. CMessage* p = ret.second;
  210. if (ret.first == Broadcast && nullptr == p)
  211. {
  212. DbgEx("JsonToBuffer return NULL, perhaps an event happened");
  213. do_sendJsonBroadcast(payload);
  214. return;
  215. }
  216. else if(ret.first == Request || ret.first == Info)
  217. {
  218. if(nullptr == p)
  219. {
  220. //can not find the method
  221. std::shared_ptr<cJSON> pJson(cJSON_Parse(payload.c_str()), [](cJSON* p) {
  222. if (nullptr != p) {
  223. cJSON_Delete(p);
  224. //cJSON_free(p);
  225. }
  226. });
  227. auto transIdJson = cJSON_GetObjectItem(pJson.get(), "transId");
  228. if(transIdJson != nullptr)
  229. {
  230. std::string errRetJs = "{\"messageType\":4,\"transID\":" + std::to_string((LONGLONG)transIdJson->valueint) + ",\"errorCode\":103,\"errorMsg\":\"can not find entity or class!\"}";
  231. auto js = restroreTransId(errRetJs);
  232. DbgEx("deal_msg %s get null msg, ret:%s", ret.first == Request ? "Request" : "Info", errRetJs.c_str());
  233. m_wsserver.send(hdl, js, websocketpp::frame::opcode::TEXT);
  234. }
  235. return;
  236. }
  237. }
  238. else if (ret.first == GetSession)
  239. {
  240. std::shared_ptr<cJSON> pJson(cJSON_Parse(payload.c_str()), [](cJSON* p) {
  241. if (nullptr != p) {
  242. cJSON_Delete(p);
  243. //cJSON_free(p);
  244. }
  245. });
  246. auto transIdJson = cJSON_GetObjectItem(pJson.get(), "transId");
  247. if (transIdJson != nullptr)
  248. {
  249. int transid = transIdJson->valueint;
  250. auto sessionRet = m_esm->GetAllSessionRequest(transid);
  251. DbgEx("sessionJson:%s", sessionRet.second.c_str());
  252. if (sessionRet.first)
  253. {
  254. auto js = restroreTransId(sessionRet.second);
  255. m_wsserver.send(hdl, js, websocketpp::frame::opcode::TEXT);
  256. }
  257. }
  258. return;
  259. }
  260. else if (ret.first == BeginSession)
  261. storeEntityWithCLass(m_serializer->GetEntityName(payload), m_serializer->GetClassName(payload));
  262. // Try to manage entity session
  263. RequestProcessType processType = this->m_esm->RequestProcess(p, m_serializer->GetEntityName(payload), (long)hdl.lock().get());
  264. switch (processType)
  265. {
  266. case Chromium::PROCESS_NOTHING:
  267. DbgEx("do PROCESS_NOTHING");
  268. return;
  269. case Chromium::PROCESS_SEND:
  270. //DbgEx("do PROCESS_SEND");
  271. if (nullptr != p)
  272. WriteToFramework(p);
  273. break;
  274. case Chromium::PROCESS_STARTSESSION:
  275. {
  276. DbgEx("do PROCESS_STARTSESSION");
  277. auto entityName = m_serializer->GetEntityName(payload);
  278. if (entityName == "Chromium")
  279. {
  280. DbgEx("discard, don't make session with chromium");
  281. break;
  282. }
  283. updateMsgPool(entityName, payload, hdl);
  284. do_sendJsonStartSession(entityName, m_serializer->GetClassName(payload));
  285. }
  286. break;
  287. case Chromium::PROCESS_FINDSESSION:
  288. {
  289. DbgEx("do PROCESS_FINDSESSION");
  290. std::string js = m_esm->GetStartSessionAck(p, m_serializer->GetEntityName(payload));
  291. js = restroreTransId(js);
  292. m_wsserver.send(hdl, js, websocketpp::frame::opcode::TEXT);
  293. }
  294. break;
  295. case Chromium::PROCESS_RECORDMSG:
  296. {
  297. DbgEx("do PROCESS_RECORDMSG");
  298. auto entityName = m_serializer->GetEntityName(payload);
  299. updateMsgPool(entityName, payload, hdl);
  300. }
  301. break;
  302. default:
  303. break;
  304. }
  305. }
  306. // websocket message handler
  307. void CWebsocketServer::message_handler(websocketpp::connection_hdl hdl, server::message_ptr msg) {
  308. static int pos = 0;
  309. static std::map<int, int> t_hdlArr;
  310. int hdlPos = (long)hdl.lock().get();
  311. if (t_hdlArr.end() == t_hdlArr.find(hdlPos))
  312. t_hdlArr[hdlPos] = pos++;
  313. auto msgHandleFun = [&]() {
  314. //DbgEx("CWebsocketServer -> message_handler");
  315. std::string payload = msg->get_payload();
  316. // proto convert here
  317. #if (defined _WIN32 || defined _WIN64)
  318. payload = utf8_to_string(payload);//windows web utf8, terminal gbk
  319. #endif
  320. std::shared_ptr<cJSON> pJson(cJSON_Parse(payload.c_str()), [](cJSON* p) {
  321. if (nullptr != p) {
  322. cJSON_Delete(p);
  323. //cJSON_free(p);
  324. }
  325. });
  326. if (nullptr == pJson.get()) {
  327. DbgEx("CWebsocketServer -> message_handler, jsonErr:%s", payload.c_str());
  328. return;
  329. }
  330. auto transIdJson = cJSON_GetObjectItem(pJson.get(), "transId");
  331. if (transIdJson != nullptr)
  332. {
  333. int transid = transIdJson->valueint;
  334. int modifyT = t_hdlArr[hdlPos] << 24;
  335. int dstTransId = transid ^ modifyT;
  336. cJSON_SetIntValue(transIdJson, dstTransId);
  337. }
  338. auto messageTypeJson = cJSON_GetObjectItem(pJson.get(), "messageType");
  339. if (messageTypeJson == nullptr) {
  340. DbgEx("CWebsocketServer -> messageTypeJson == null");
  341. return;
  342. }
  343. char *unformateStr = cJSON_PrintUnformatted(pJson.get());
  344. std::string dstPayLoad = unformateStr;
  345. delete[]unformateStr;
  346. auto messageType = messageTypeJson->valueint;
  347. if (messageType < WEB_CHROMIUM_MSG_BEGIN)
  348. deal_msg(dstPayLoad, hdl);
  349. else if (messageType > WEB_CHROMIUM_MSG_BEGIN && messageType < WEB_CHROMIUM_MSG_END)
  350. deal_webchromium_msg(dstPayLoad, hdl, messageType);
  351. else if (messageType > METHOD_SYSTEM_START && messageType < METHOD_SYSTEM_END)
  352. deal_logMsg(dstPayLoad, hdl, messageType);
  353. };
  354. boost::thread dealMsgThread(msgHandleFun);
  355. dealMsgThread.join();
  356. }
  357. std::string CWebsocketServer::restroreTransId(std::string payLoad)
  358. {
  359. std::shared_ptr<cJSON> pJson(cJSON_Parse(payLoad.c_str()), [](cJSON* p) {
  360. if (nullptr != p) {
  361. cJSON_Delete(p);
  362. //cJSON_free(p);
  363. }
  364. });
  365. auto transIdJson = cJSON_GetObjectItem(pJson.get(), "transId");
  366. if (transIdJson != nullptr)
  367. {
  368. int transid = transIdJson->valueint;
  369. int dstTransId = transid & 0x00FFFFFF;
  370. cJSON_SetIntValue(transIdJson, dstTransId);
  371. }
  372. char* unformateStr = cJSON_PrintUnformatted(pJson.get());
  373. std::string dstPayLoad = unformateStr;
  374. delete[]unformateStr;
  375. return dstPayLoad;
  376. }
  377. void CWebsocketServer::open_handler(websocketpp::connection_hdl hdl) {
  378. // hand shake here
  379. DbgEx("new connection to ws server : %u", hdl.lock().get());
  380. m_connection_hdls.insert(std::pair<unsigned int, websocketpp::connection_hdl>((long)hdl.lock().get(), hdl));
  381. }
  382. void CWebsocketServer::close_handler(websocketpp::connection_hdl hdl) {
  383. // hand shake here
  384. auto connectionIter = m_connection_hdls.find((long)hdl.lock().get());
  385. if (m_connection_hdls.end() != connectionIter)
  386. {
  387. DbgEx("connection with ws server closed : %u", hdl.lock().get());
  388. m_connection_hdls.erase(connectionIter);
  389. }
  390. else
  391. DbgEx("connection close erase failed : %u", hdl.lock().get());
  392. }
  393. void CWebsocketServer::do_run() {
  394. DbgEx("CWebsocketServer -> do_run");
  395. // Start the Asio io_service run loop
  396. DbgEx("Start the Asio io_service run loop");
  397. while (true)
  398. {
  399. try
  400. {
  401. m_ios.poll();
  402. Sleep(2);
  403. }
  404. catch (...)
  405. {
  406. DbgEx("other exception in ios poll");
  407. }
  408. }
  409. }
  410. void CWebsocketServer::do_relink()
  411. {
  412. DbgEx("do_relink Enter");
  413. auto checkEntityIsNoStart = [&](std::string entityName) -> bool
  414. {
  415. CSmartPointer<IEntityFunction> spFunc = m_pEntity->GetFunction();
  416. LOG_ASSERT(spFunc != NULL);
  417. CEntityStaticInfo StaticInfo;
  418. CEntityRunInfo RunInfo;
  419. do {
  420. if (Error_Succeed != spFunc->GetEntityStaticInfo(entityName.c_str(), StaticInfo))
  421. break;
  422. if (Error_Succeed != spFunc->GetEntityRunInfo(entityName.c_str(), RunInfo))
  423. break;
  424. return RunInfo.eState == EntityState_NoStart;
  425. } while (false);
  426. return true;
  427. };
  428. while (true)
  429. {
  430. try
  431. {
  432. boost::this_thread::sleep_for(boost::chrono::seconds(10));
  433. auto unlinkArr = m_esm->queryUnLinkSession();
  434. boost::lock_guard<boost::mutex> lock(m_dealMsgLock);
  435. std::string breakEntityStr = "";
  436. std::string noStartEntity = "";
  437. std::string noServerEntity = "";
  438. #if (defined _WIN32 || defined _WIN64)
  439. for each (auto it in unlinkArr)
  440. #else
  441. for (auto it : unlinkArr)
  442. #endif
  443. {
  444. if (checkEntityIsNoStart(it))
  445. {
  446. if (noStartEntity.empty())
  447. noStartEntity.append(it);
  448. else
  449. noStartEntity.append("|").append(it);
  450. continue;
  451. }
  452. if (!CWSCodec::getInstance()->checkEntityHasService(it))
  453. {
  454. if (noServerEntity.empty())
  455. noServerEntity.append(it);
  456. else
  457. noServerEntity.append("|").append(it);
  458. continue;
  459. }
  460. auto ret = getEntityClass(it);
  461. if (ret.first)
  462. {
  463. if (breakEntityStr.empty())
  464. breakEntityStr.append(it);
  465. else
  466. breakEntityStr.append("|").append(it);
  467. do_sendJsonStartSession(it, ret.second);
  468. }
  469. else
  470. DbgEx("unable find class of entity %s", it.c_str());
  471. }
  472. if (!breakEntityStr.empty())
  473. DbgEx("try to relink entity : %s, noStartEntity: %s, noServerEntity: %s", breakEntityStr.c_str(), noStartEntity.c_str(), noServerEntity.c_str());
  474. }
  475. catch (...)
  476. {
  477. DbgEx("exception in do_relink");
  478. }
  479. }
  480. }
  481. void CWebsocketServer::run() {
  482. DbgEx("CWebsocketServer -> run");
  483. boost::thread thread1(boost::bind(&CWebsocketServer::do_run, this));
  484. boost::thread thread2(boost::bind(&CWebsocketServer::do_relink, this));
  485. }
  486. void CWebsocketServer::do_sendJsonBroadcast(std::string js)
  487. {
  488. js = restroreTransId(js);
  489. DbgEx("message broadcast : json = %s", js.c_str());
  490. if (js.empty())
  491. {
  492. DbgEx("string empty");
  493. return;
  494. }
  495. if (m_connection_hdls.empty())
  496. {
  497. DbgEx("message_from_socket : no websocket client connection");
  498. }
  499. else
  500. {//broadcast不进行转换
  501. if (js.empty())
  502. {
  503. DbgEx("string empty 2");
  504. return;
  505. }
  506. //DbgEx("do_sendJsonBroadcast Enter...");
  507. for (auto it = m_connection_hdls.begin(); it != m_connection_hdls.end(); it++)
  508. {
  509. try
  510. {
  511. websocketpp::connection_hdl hdl = it->second;
  512. m_wsserver.send(hdl, js, websocketpp::frame::opcode::TEXT);
  513. }
  514. catch (const websocketpp::lib::error_code& e)
  515. {
  516. DbgEx("m_wsserver send crash : error message=%s", e.message());
  517. }
  518. catch (const std::exception& e) {
  519. DbgEx("std exception : %s", e.what());
  520. }
  521. catch (...) {
  522. DbgEx("other exception");
  523. }
  524. }
  525. //DbgEx("do_sendJsonBroadcast End...");
  526. }
  527. }
  528. void CWebsocketServer::do_send_notifyMsg(unsigned hdlID, unsigned transId, const std::string& reason, const std::string& errmsg, const std::string& rebootTime, const DWORD& dwSysError, const DWORD& dwUserCode) {
  529. #if (defined _WIN32 || defined _WIN64)
  530. auto notifyMsg = CSimpleStringA::Format("{\"messageType\":%d,\"transID:%d\", \"reason\":%s, \"errmsg\":%s, \"rebootTime\":%s}", RegisterNotify, transId, reason, errmsg, rebootTime);
  531. #else
  532. auto notifyMsg = CSimpleStringA::Format(R"({"messageType":%d,"transID:%d", "reason":%s, "errmsg":%s, "rebootTime":%s})", RegisterNotify, transId, reason, errmsg, rebootTime);
  533. #endif
  534. try
  535. {
  536. //DbgEx("do_send_notifyMsg Enter...");
  537. std::map<unsigned int, websocketpp::connection_hdl>::iterator it = m_connection_hdls.find(hdlID);
  538. if (m_connection_hdls.end() != it)
  539. {
  540. //DbgEx("Send....");
  541. websocketpp::connection_hdl hdl = it->second;
  542. m_wsserver.send(hdl, notifyMsg.GetData(), websocketpp::frame::opcode::TEXT);
  543. }
  544. //DbgEx("do_send_notifyMsg End...");
  545. }
  546. catch (const websocketpp::lib::error_code& e)
  547. {
  548. DbgEx("m_wsserver send crash : error message=%s", e.message());
  549. }
  550. catch (const std::exception& e) {
  551. DbgEx("std exception : %s", e.what());
  552. }
  553. catch (...) {
  554. DbgEx("other exception");
  555. }
  556. }
  557. void CWebsocketServer::do_sendJson(std::string js, int hdlID, unsigned int id)
  558. {
  559. js = restroreTransId(js);
  560. DbgEx("WebSocket Search message_from_socket : json = %s", js.c_str());
  561. if (js.empty())
  562. {
  563. DbgEx("string empty");
  564. return;
  565. }
  566. if (m_connection_hdls.empty())
  567. DbgEx("message_from_socket : no websocket client connection");
  568. else {
  569. #if(defined _WIN32 || defined _WIN64)
  570. js = string_to_utf8(js);
  571. #endif
  572. if (js.empty())
  573. {
  574. DbgEx("string empty 2");
  575. return;
  576. }
  577. try
  578. {
  579. std::map<unsigned int, websocketpp::connection_hdl>::iterator it = m_connection_hdls.find(hdlID);
  580. if (m_connection_hdls.end() != it)
  581. {
  582. //DbgEx("Send....");
  583. websocketpp::connection_hdl hdl = it->second;
  584. m_wsserver.send(hdl, js, websocketpp::frame::opcode::TEXT);
  585. }
  586. else
  587. DbgEx("ws connection handler not found! id = %u", id);
  588. }
  589. catch (const websocketpp::lib::error_code& e)
  590. {
  591. DbgEx("m_wsserver send crash : error message=%s", e.message());
  592. }
  593. catch (const std::exception& e) {
  594. DbgEx("std exception : %s", e.what());
  595. }
  596. catch (...) {
  597. DbgEx("other exception");
  598. }
  599. }
  600. }
  601. // socket message handler
  602. void CWebsocketServer::message_from_socket(CMessage& msg, unsigned int id) {
  603. // get message from socket and deserialize
  604. // then send back to the web client
  605. auto bufferLength = msg.getBufferLength();
  606. if (bufferLength > MAX_TRANSFER_LEN)
  607. DbgEx("WebSocket Search message_from_socket : buffer len = %d, buffer pre50:%s", msg.getBufferLength(), msg.printfHEX(50).c_str());
  608. //else
  609. // DbgEx("WebSocket Search message_from_socket : buffer len = %d", msg.getBufferLength());
  610. int replaceTransId = 0;
  611. if (msg.getLength() > 16 && m_esm != NULL)
  612. {//this is return buffer, it would not send out again, so I exchange the transId and sessionId place
  613. msg.exchangeSessionIdAndTransId();
  614. auto tmpReplace = m_esm->getSrcTransID(msg.getTransID());
  615. replaceTransId = tmpReplace.first == true ? tmpReplace.second : 0;
  616. }
  617. // 处理session ack,获取hdlID
  618. unsigned int hdlID = 0;
  619. std::vector<std::pair<int, int>> sendArr;
  620. if (8 == msg.getMessageType())
  621. {
  622. m_esm->AskProcessEvent(&msg, sendArr);
  623. if (sendArr.size() == 0)
  624. m_esm->AskProcessEvent(&msg, sendArr);
  625. for (auto i = sendArr.begin(); i != sendArr.end(); i++)
  626. {
  627. msg.setTransID(i->first);
  628. std::string js = this->m_serializer->BufferToJson(msg);
  629. do_sendJson(js, i->second, id);
  630. }
  631. }
  632. else if (2 == msg.getMessageType()) //session end
  633. {
  634. int sessionId = msg.getTransID();
  635. auto ret = m_esm->DoSessionRemove(sessionId);
  636. if (ret.first)
  637. DbgEx("detect session %s:%d lost!remove success", ret.second.c_str(), sessionId);
  638. else
  639. DbgEx("detect session %d lost!remove failed", sessionId);
  640. }
  641. else if (5 == msg.getMessageType())
  642. {//sessionAck
  643. auto ret = m_esm->AskProcessSession(&msg, hdlID);
  644. auto sessionId = msg.getSessionID();
  645. if (ACKPROCESS_NOTHING == ret.first)
  646. {
  647. DbgEx("can not find save session, process noting, %s", ret.second.c_str());//error
  648. return;
  649. }
  650. if (0 != hdlID)
  651. {//hdlId为0时,说明本地发起的session
  652. std::string js = this->m_serializer->BufferToJson(msg, replaceTransId);
  653. do_sendJson(js, hdlID, id); //发给首记录的CMessage
  654. }
  655. for (auto cur = m_msg_pool.begin(); cur != m_msg_pool.end(); cur++)
  656. {//sessionId为-1时需处理,否则会引发消息风暴
  657. if (cur->first == ret.second)
  658. {
  659. std::vector<std::pair<websocketpp::connection_hdl, std::string>> msgArr(cur->second);
  660. m_msg_pool.erase(cur); //为防止处理消息时引发消息风暴,拷贝并删除原msg_pool
  661. if (-1 != sessionId) {
  662. DbgEx("Session with %s Make, deal with record Msg:%d", ret.second.c_str(), msgArr.size());
  663. for (auto msg = msgArr.begin(); msg != msgArr.end(); msg++)
  664. deal_msg(msg->second, msg->first);
  665. break;
  666. }
  667. else
  668. {
  669. DbgEx("Session with %s Make Failed, deal with record Msg:%d", ret.second.c_str(), msgArr.size());
  670. #if (defined _WIN32 || defined _WIN64)
  671. for each (auto msg in msgArr)
  672. #else
  673. for (auto msg : msgArr)
  674. #endif
  675. deal_sessionBreakMsg(msg.second, msg.first);
  676. break;
  677. }
  678. }
  679. }
  680. }
  681. else
  682. {
  683. m_esm->AckProcess(&msg, hdlID);
  684. std::string js = this->m_serializer->BufferToJson(msg, replaceTransId);
  685. do_sendJson(js, hdlID, id);
  686. }
  687. }
  688. void CWebsocketServer::init_websocket() {
  689. auto checkPortExist = []() -> bool {
  690. boost::asio::io_service ioService;
  691. boost::asio::ip::tcp::socket* pSockTcp = NULL;
  692. bool bSockUseError = false;
  693. try {
  694. auto tcpEndpoint = boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), DEFAULT_SERVER_PORT);
  695. pSockTcp = new boost::asio::ip::tcp::socket(ioService, tcpEndpoint);
  696. bSockUseError = false;
  697. }
  698. catch (...)
  699. {
  700. bSockUseError = true;
  701. }
  702. //释放
  703. if (!bSockUseError && pSockTcp != NULL)
  704. {
  705. delete pSockTcp;
  706. pSockTcp = NULL;
  707. }
  708. ioService.stop();
  709. return bSockUseError;
  710. };
  711. /*
  712. while (true) {
  713. if (!checkPortExist())
  714. break;
  715. else
  716. DbgEx("checkPortExist failed");
  717. std::this_thread::sleep_for(std::chrono::seconds(5));
  718. }
  719. */
  720. try
  721. {
  722. // Set logging settings
  723. DbgEx("Set logging settings");
  724. m_wsserver.set_error_channels(websocketpp::log::elevel::all);
  725. m_wsserver.set_access_channels(websocketpp::log::alevel::all ^ websocketpp::log::alevel::frame_payload);
  726. DbgEx("m_wsserver.get_max_message_size = %d", m_wsserver.get_max_message_size());
  727. m_wsserver.set_max_message_size(MAX_TRANSFER_LEN);
  728. m_wsserver.set_close_handshake_timeout(3000);
  729. m_wsserver.set_pong_timeout(3000);
  730. // Initialize Asio
  731. DbgEx("Initialize Asio");
  732. m_wsserver.init_asio(&m_ios);
  733. // Set the default message handler to the echo handler
  734. DbgEx("Set the default message handler to the echo handler");
  735. m_wsserver.set_message_handler(websocketpp::lib::bind(&CWebsocketServer::message_handler, this, websocketpp::lib::placeholders::_1, websocketpp::lib::placeholders::_2));
  736. DbgEx("Set set_open_handler");
  737. m_wsserver.set_open_handler(websocketpp::lib::bind(&CWebsocketServer::open_handler, this, websocketpp::lib::placeholders::_1));
  738. m_wsserver.set_close_handler(websocketpp::lib::bind(&CWebsocketServer::close_handler, this, websocketpp::lib::placeholders::_1));
  739. // Listen on port 9002, add m_acceptor->set_option(lib::asio::socket_base::reuse_address(true), bec);
  740. m_wsserver.listen(DEFAULT_SERVER_PORT);
  741. // Queues a connection accept operation
  742. DbgEx("Queues a connection accept operation");
  743. m_wsserver.start_accept();
  744. m_initSuccess = true;
  745. DbgEx("do_run end");
  746. }
  747. catch (websocketpp::exception const& e) {
  748. DbgEx("websocketpp exception %s ", e.what());
  749. m_initSuccess = false;
  750. }
  751. catch (...) {
  752. DbgEx("other exception");
  753. m_initSuccess = false;
  754. }
  755. }
  756. void CWebsocketServer::myTest()
  757. {
  758. }
  759. #if(defined _WIN32 || defined _WIN64)
  760. std::string CWebsocketServer::utf8_to_string(const std::string& str) {
  761. int len = MultiByteToWideChar(CP_UTF8, 0, str.c_str(), -1, NULL, 0);
  762. wchar_t* wszGBK = new wchar_t[len + 1];
  763. wmemset(wszGBK, 0, len + 1);
  764. MultiByteToWideChar(CP_UTF8, 0, (LPCTSTR)str.c_str(), -1, wszGBK, len);
  765. len = WideCharToMultiByte(CP_ACP, 0, wszGBK, -1, NULL, 0, NULL, NULL);
  766. char* szGBK = new char[len + 1];
  767. memset(szGBK, 0, len + 1);
  768. WideCharToMultiByte(CP_ACP, 0, wszGBK, -1, szGBK, len, NULL, NULL);
  769. std::string strTemp(szGBK);
  770. delete[] szGBK;
  771. delete[] wszGBK;
  772. return strTemp;
  773. }
  774. std::string CWebsocketServer::string_to_utf8(const std::string& str) {
  775. int wcLen = MultiByteToWideChar(CP_ACP, 0, str.c_str(), -1, NULL, 0);
  776. if (wcLen > 0) {
  777. wchar_t* pwBuf = new wchar_t[wcLen + 1];
  778. if (pwBuf == NULL) {
  779. return std::string();
  780. }
  781. memset(pwBuf, 0, sizeof(wchar_t) * (wcLen + 1));
  782. wcLen = MultiByteToWideChar(CP_ACP, 0, str.c_str(), -1, pwBuf, wcLen);
  783. if (wcLen <= 0) {
  784. delete[] pwBuf;
  785. return std::string();
  786. }
  787. int ucLen = WideCharToMultiByte(CP_UTF8, 0, pwBuf, -1, NULL, 0, NULL, NULL);
  788. //DbgEx("ucLen = %d", ucLen);
  789. if (ucLen < 0) {
  790. delete[] pwBuf;
  791. return std::string();
  792. }
  793. char* pBuf = new char[ucLen + 1];
  794. if (pBuf == NULL) {
  795. delete pwBuf;
  796. return std::string();
  797. }
  798. memset(pBuf, 0, sizeof(char) * (ucLen + 1));
  799. ucLen = WideCharToMultiByte(CP_UTF8, 0, pwBuf, -1, pBuf, ucLen, NULL, NULL);
  800. if (ucLen <= 0) {
  801. delete[] pwBuf;
  802. delete[] pBuf;
  803. return std::string();
  804. }
  805. std::string retStr(pBuf);
  806. //DbgEx("string_to_utf8 return: %s", retStr.c_str());
  807. if (pwBuf) {
  808. delete[] pwBuf;
  809. pwBuf = NULL;
  810. }
  811. if (pBuf) {
  812. delete[] pBuf;
  813. pBuf = NULL;
  814. }
  815. return retStr;
  816. }
  817. return std::string();
  818. }
  819. #endif
  820. void CWebsocketServer::init_entity_sessions() {
  821. return;
  822. }
  823. }