| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932 |
- #include "stdafx.h"
- #if (defined _WIN32 || defined _WIN64)
- #include "../mod_browser/IEBrowser_client_g.h"
- #else
- #include <future>
- #include <thread>
- #endif
- #include "CWebsocketServer.h"
- #include "baseEx.h"
- #include "CModTools.h"
- #include <vector>
- #include "MessageType.h"
- #include <boost/thread/thread.hpp>
- #include <boost/thread/lock_guard.hpp>
- #include <iostream>
- #include <locale>
- #include <codecvt>
- #define DEFAULT_SERVER_PORT 9002
- namespace Chromium {
- server m_wsserver;
- CWebsocketServer::CWebsocketServer(const char* strPath, CEntityBase* pEntity) : m_ios(), m_serializer(NULL), m_socket(NULL), m_pEntity(pEntity), m_initSuccess(false)
- {
- DbgEx("CWebsocketServer constructor");
- // Initialize socket client
- DbgEx("Initialize socket client");
- this->m_pEntity = pEntity;
- m_socket = new CSocketClient(m_ios, "127.0.0.1", "4504", pEntity, 0);
- DbgEx("Set socket client MessageHandler -- message_handler");
- m_socket->SetMessageHandler(this);
- // Initialize serializer
- DbgEx("Initialize serializer");
- this->m_serializer = CWSCodec::getInstance();
- this->m_serializer->setEntityHandler(pEntity->GetFunction());
- this->m_serializer->init(strPath);
- while (Error_Succeed != m_socket->Connect())
- Sleep(100);
- DbgEx("init Entity Session Manager");
- myTest();
- m_esm = new EntitySessionManager();
- init_websocket();
- init_entity_sessions();
- }
- void CWebsocketServer::updateMsgPool(std::string entityName, std::string& payload, websocketpp::connection_hdl hdl)
- {
- //存储的消息应具备时效性,保证一个hdl只有一个记录的消息
- auto it = m_msg_pool.find(entityName);
- if (it != m_msg_pool.end())
- {
- auto& curArr = it->second;
- for (auto i = curArr.begin(); i != curArr.end(); i++)
- {
- if (&hdl == &i->first)
- {
- curArr.erase(i);
- break;
- }
- }
- it->second.emplace_back(std::make_pair(hdl, payload));
- }
- else
- {
- std::vector<std::pair<websocketpp::connection_hdl, std::string>> t_saveMsgs;
- t_saveMsgs.emplace_back(std::make_pair(hdl, payload));
- m_msg_pool.insert(std::make_pair(entityName, t_saveMsgs));
- }
- }
- void CWebsocketServer::updateNotifyPool(unsigned hdl, unsigned transId)
- {
- auto it = m_notifyPool.find(hdl);
- if (it != m_notifyPool.end())
- {
- if (transId == 0)
- m_notifyPool.erase(it);
- else
- it->second = transId;
- }
- else if (transId != 0)
- m_notifyPool.insert(std::make_pair(hdl, transId));
- }
- void CWebsocketServer::storeEntityWithCLass(std::string entityName, std::string entityClass)
- {
- if (m_entityAndClass.end() == m_entityAndClass.find(entityName)) {
- m_entityAndClass.insert(std::make_pair(entityName, entityClass)); //not exist
- //DbgEx("storeEntityWithCLass, %s:%s", entityName.c_str(), entityClass.c_str());
- }
- }
- std::pair<bool, std::string> CWebsocketServer::getEntityClass(std::string entityName)
- {
- auto it = m_entityAndClass.find(entityName);
- if (m_entityAndClass.end() == it)
- return std::make_pair(false, "");
- else
- return std::make_pair(true, it->second);
- }
- void CWebsocketServer::do_sendJsonStartSession(std::string entityName, std::string entityClass)
- {
- auto startSessionReq = m_esm->GetStartSessionRequest(entityName, entityClass);
- auto sessionBuf = this->m_serializer->JsonToBuffer(startSessionReq.second).second;
- if (nullptr == sessionBuf)
- {
- DbgEx("JsonToBuffer return NULL !");
- return;
- }
- m_esm->StoreSessionReq(startSessionReq.first, entityName);
- m_esm->MakeNewTransID(sessionBuf, 0);
- //保存msg,先建立session
- if (m_esm->checkBeginSession(entityName)) {
- m_esm->updateBeginSessionTime(entityName);
- WriteToFramework(sessionBuf);
- }
- else
- DbgEx("Already begin session in 5s, %s", entityName.c_str());
- }
- void CWebsocketServer::WriteToFramework(CMessage* msg)
- {
- m_socket->Write(msg);
- }
- void CWebsocketServer::deal_webchromium_msg(std::string& payload, websocketpp::connection_hdl hdl, int messageType)
- {
- DbgEx("deal_webchromium_msg :%d, %s", payload.length(), payload.length() > 800 ? payload.substr(0, 800).append("...").c_str() : payload.c_str());
- switch (messageType)
- {
- case RegisterNotify:
- {
- std::shared_ptr<cJSON> pJson(cJSON_Parse(payload.c_str()), [](cJSON* p) {
- if (nullptr != p) {
- cJSON_Delete(p);
- //cJSON_free(p);
- }
- });
- auto transIdJson = cJSON_GetObjectItem(pJson.get(), "transId");
- if (transIdJson == nullptr)
- {
- DbgEx("deal_webchromium_msg err, transIdJson == null");
- return;
- }
- int transid = transIdJson->valueint;
- updateNotifyPool((long)hdl.lock().get(), transid);
- }
- break;
- case UnRegisterNotify:
- updateNotifyPool((long)hdl.lock().get(), 0);
- break;
- default:
- break;
- }
- }
- void CWebsocketServer::deal_logMsg(std::string& payload, websocketpp::connection_hdl hdl, int messageType)
- {
- std::shared_ptr<cJSON> pJson(cJSON_Parse(payload.c_str()), [](cJSON* p) {
- if (nullptr != p) {
- cJSON_Delete(p);
- //cJSON_free(p);
- }
- });
- if (nullptr == pJson.get()) {
- DbgEx("CWebsocketServer -> deal_logMsg, jsonErr:%s", payload.c_str());
- return;
- }
- std::string msg = cJSON_GetObjectItem(pJson.get(), "msg")->valuestring;
- auto changeMessageTypeToLogLevel = [](int messageType) ->LOG_LEVEL_E {
- switch (messageType)
- {
- case METHOD_SYSTEM_LOG_DEBUG:
- return LOG_LEVEL_DEBUG;
- case METHOD_SYSTEM_LOG_INFO:
- return LOG_LEVEL_INFO;
- case METHOD_SYSTEM_LOG_WARN:
- return LOG_LEVEL_WARN;
- case METHOD_SYSTEM_LOG_ERROR:
- return LOG_LEVEL_ERROR;
- default:
- return LOG_LEVEL_DEBUG;
- }
- };
- DbgWithLink(changeMessageTypeToLogLevel(messageType), LOG_TYPE_BUSINESS_SYSTEM).withLogProducer(logProducer).setAPI("logMsg").withExtendLog(false).setResultMsg(msg.c_str())();
- }
- void CWebsocketServer::deal_sessionBreakMsg(std::string& payload, websocketpp::connection_hdl hdl)
- {
- std::shared_ptr<cJSON> pJson(cJSON_Parse(payload.c_str()), [](cJSON* p) {
- if (nullptr != p) {
- cJSON_Delete(p);
- //cJSON_free(p);
- }
- });
- if (nullptr == pJson.get()) {
- DbgEx("CWebsocketServer -> deal_sessionBreakMsg, jsonErr:%s", payload.c_str());
- return;
- }
- int transidNum = cJSON_GetObjectItem(pJson.get(), "transId")->valueint;
- int messageTypeNum = cJSON_GetObjectItem(pJson.get(), "messageType")->valueint;
- std::string sessionId = std::to_string((LONGLONG)-1);
- std::string transid = std::to_string((LONGLONG)transidNum);
- std::string js = "{\"messageType\":5,\"errorCode\":\"1537\",\"errorMsg\":\"session break\",\"sessionID\":";
- js.append(sessionId.c_str());
- js.append(",\"transID\":");
- js.append(transid.c_str());
- js.append("}");
- js = restroreTransId(js);
- DbgEx("deal_sessionBreakMsg, len:%d, srcPayLoad:%s, ret:%s", payload.length()
- , payload.length() > 800 ? payload.substr(0, 800).append("...").c_str() : payload.c_str(), js.c_str());
- m_wsserver.send(hdl, js, websocketpp::frame::opcode::TEXT);
- }
- void CWebsocketServer::deal_msg(std::string& payload, websocketpp::connection_hdl hdl)
- {
- boost::lock_guard<boost::mutex> lock(m_dealMsgLock); //在buffer和json处理时,deal_msg会调用多次,导致transId存在重复可能
- DbgEx("deal_msg :%d, %s", payload.length(), payload.length() > 800 ? payload.substr(0, 800).append("...").c_str() : payload.c_str());
- auto ret = this->m_serializer->JsonToBuffer(payload);
- CMessage* p = ret.second;
- if (ret.first == Broadcast && nullptr == p)
- {
- DbgEx("JsonToBuffer return NULL, perhaps an event happened");
- do_sendJsonBroadcast(payload);
- return;
- }
- else if(ret.first == Request || ret.first == Info)
- {
- if(nullptr == p)
- {
- //can not find the method
- std::shared_ptr<cJSON> pJson(cJSON_Parse(payload.c_str()), [](cJSON* p) {
- if (nullptr != p) {
- cJSON_Delete(p);
- //cJSON_free(p);
- }
- });
- auto transIdJson = cJSON_GetObjectItem(pJson.get(), "transId");
- if(transIdJson != nullptr)
- {
- std::string errRetJs = "{\"messageType\":4,\"transID\":" + std::to_string((LONGLONG)transIdJson->valueint) + ",\"errorCode\":103,\"errorMsg\":\"can not find entity or class!\"}";
- auto js = restroreTransId(errRetJs);
- DbgEx("deal_msg %s get null msg, ret:%s", ret.first == Request ? "Request" : "Info", errRetJs.c_str());
- m_wsserver.send(hdl, js, websocketpp::frame::opcode::TEXT);
- }
- return;
- }
- }
- else if (ret.first == GetSession)
- {
- std::shared_ptr<cJSON> pJson(cJSON_Parse(payload.c_str()), [](cJSON* p) {
- if (nullptr != p) {
- cJSON_Delete(p);
- //cJSON_free(p);
- }
- });
- auto transIdJson = cJSON_GetObjectItem(pJson.get(), "transId");
- if (transIdJson != nullptr)
- {
- int transid = transIdJson->valueint;
- auto sessionRet = m_esm->GetAllSessionRequest(transid);
- DbgEx("sessionJson:%s", sessionRet.second.c_str());
- if (sessionRet.first)
- {
- auto js = restroreTransId(sessionRet.second);
- m_wsserver.send(hdl, js, websocketpp::frame::opcode::TEXT);
- }
- }
- return;
- }
- else if (ret.first == BeginSession)
- storeEntityWithCLass(m_serializer->GetEntityName(payload), m_serializer->GetClassName(payload));
- // Try to manage entity session
- RequestProcessType processType = this->m_esm->RequestProcess(p, m_serializer->GetEntityName(payload), (long)hdl.lock().get());
- switch (processType)
- {
- case Chromium::PROCESS_NOTHING:
- DbgEx("do PROCESS_NOTHING");
- return;
- case Chromium::PROCESS_SEND:
- //DbgEx("do PROCESS_SEND");
- if (nullptr != p)
- WriteToFramework(p);
- break;
- case Chromium::PROCESS_STARTSESSION:
- {
- DbgEx("do PROCESS_STARTSESSION");
- auto entityName = m_serializer->GetEntityName(payload);
- if (entityName == "Chromium")
- {
- DbgEx("discard, don't make session with chromium");
- break;
- }
- updateMsgPool(entityName, payload, hdl);
- do_sendJsonStartSession(entityName, m_serializer->GetClassName(payload));
- }
- break;
- case Chromium::PROCESS_FINDSESSION:
- {
- DbgEx("do PROCESS_FINDSESSION");
- std::string js = m_esm->GetStartSessionAck(p, m_serializer->GetEntityName(payload));
- js = restroreTransId(js);
- m_wsserver.send(hdl, js, websocketpp::frame::opcode::TEXT);
- }
- break;
- case Chromium::PROCESS_RECORDMSG:
- {
- DbgEx("do PROCESS_RECORDMSG");
- auto entityName = m_serializer->GetEntityName(payload);
- updateMsgPool(entityName, payload, hdl);
- }
- break;
- default:
- break;
- }
- }
- // websocket message handler
- void CWebsocketServer::message_handler(websocketpp::connection_hdl hdl, server::message_ptr msg) {
- static int pos = 0;
- static std::map<int, int> t_hdlArr;
- int hdlPos = (long)hdl.lock().get();
- if (t_hdlArr.end() == t_hdlArr.find(hdlPos))
- t_hdlArr[hdlPos] = pos++;
- auto msgHandleFun = [&]() {
- //DbgEx("CWebsocketServer -> message_handler");
- std::string payload = msg->get_payload();
- // proto convert here
- #if (defined _WIN32 || defined _WIN64)
- payload = utf8_to_string(payload);//windows web utf8, terminal gbk
- #endif
- std::shared_ptr<cJSON> pJson(cJSON_Parse(payload.c_str()), [](cJSON* p) {
- if (nullptr != p) {
- cJSON_Delete(p);
- //cJSON_free(p);
- }
- });
- if (nullptr == pJson.get()) {
- DbgEx("CWebsocketServer -> message_handler, jsonErr:%s", payload.c_str());
- return;
- }
- auto transIdJson = cJSON_GetObjectItem(pJson.get(), "transId");
- if (transIdJson != nullptr)
- {
- int transid = transIdJson->valueint;
- int modifyT = t_hdlArr[hdlPos] << 24;
- int dstTransId = transid ^ modifyT;
- cJSON_SetIntValue(transIdJson, dstTransId);
- }
- auto messageTypeJson = cJSON_GetObjectItem(pJson.get(), "messageType");
- if (messageTypeJson == nullptr) {
- DbgEx("CWebsocketServer -> messageTypeJson == null");
- return;
- }
- char *unformateStr = cJSON_PrintUnformatted(pJson.get());
- std::string dstPayLoad = unformateStr;
- delete[]unformateStr;
- auto messageType = messageTypeJson->valueint;
- if (messageType < WEB_CHROMIUM_MSG_BEGIN)
- deal_msg(dstPayLoad, hdl);
- else if (messageType > WEB_CHROMIUM_MSG_BEGIN && messageType < WEB_CHROMIUM_MSG_END)
- deal_webchromium_msg(dstPayLoad, hdl, messageType);
- else if (messageType > METHOD_SYSTEM_START && messageType < METHOD_SYSTEM_END)
- deal_logMsg(dstPayLoad, hdl, messageType);
- };
- boost::thread dealMsgThread(msgHandleFun);
- dealMsgThread.join();
- }
- std::string CWebsocketServer::restroreTransId(std::string payLoad)
- {
- std::shared_ptr<cJSON> pJson(cJSON_Parse(payLoad.c_str()), [](cJSON* p) {
- if (nullptr != p) {
- cJSON_Delete(p);
- //cJSON_free(p);
- }
- });
- auto transIdJson = cJSON_GetObjectItem(pJson.get(), "transId");
- if (transIdJson != nullptr)
- {
- int transid = transIdJson->valueint;
- int dstTransId = transid & 0x00FFFFFF;
- cJSON_SetIntValue(transIdJson, dstTransId);
- }
- char* unformateStr = cJSON_PrintUnformatted(pJson.get());
- std::string dstPayLoad = unformateStr;
- delete[]unformateStr;
- return dstPayLoad;
- }
- void CWebsocketServer::open_handler(websocketpp::connection_hdl hdl) {
- // hand shake here
- DbgEx("new connection to ws server : %u", hdl.lock().get());
- m_connection_hdls.insert(std::pair<unsigned int, websocketpp::connection_hdl>((long)hdl.lock().get(), hdl));
- }
- void CWebsocketServer::close_handler(websocketpp::connection_hdl hdl) {
- // hand shake here
- auto connectionIter = m_connection_hdls.find((long)hdl.lock().get());
- if (m_connection_hdls.end() != connectionIter)
- {
- DbgEx("connection with ws server closed : %u", hdl.lock().get());
- m_connection_hdls.erase(connectionIter);
- }
- else
- DbgEx("connection close erase failed : %u", hdl.lock().get());
- }
- void CWebsocketServer::do_run() {
- DbgEx("CWebsocketServer -> do_run");
- // Start the Asio io_service run loop
- DbgEx("Start the Asio io_service run loop");
- while (true)
- {
- try
- {
- m_ios.poll();
- Sleep(2);
- }
- catch (...)
- {
- DbgEx("other exception in ios poll");
- }
- }
- }
- void CWebsocketServer::do_relink()
- {
- DbgEx("do_relink Enter");
- auto checkEntityIsNoStart = [&](std::string entityName) -> bool
- {
- CSmartPointer<IEntityFunction> spFunc = m_pEntity->GetFunction();
- LOG_ASSERT(spFunc != NULL);
- CEntityStaticInfo StaticInfo;
- CEntityRunInfo RunInfo;
- do {
- if (Error_Succeed != spFunc->GetEntityStaticInfo(entityName.c_str(), StaticInfo))
- break;
- if (Error_Succeed != spFunc->GetEntityRunInfo(entityName.c_str(), RunInfo))
- break;
- return RunInfo.eState == EntityState_NoStart;
- } while (false);
- return true;
- };
- while (true)
- {
- try
- {
- boost::this_thread::sleep_for(boost::chrono::seconds(10));
- auto unlinkArr = m_esm->queryUnLinkSession();
- boost::lock_guard<boost::mutex> lock(m_dealMsgLock);
- std::string breakEntityStr = "";
- std::string noStartEntity = "";
- std::string noServerEntity = "";
- #if (defined _WIN32 || defined _WIN64)
- for each (auto it in unlinkArr)
- #else
- for (auto it : unlinkArr)
- #endif
- {
- if (checkEntityIsNoStart(it))
- {
- if (noStartEntity.empty())
- noStartEntity.append(it);
- else
- noStartEntity.append("|").append(it);
- continue;
- }
- if (!CWSCodec::getInstance()->checkEntityHasService(it))
- {
- if (noServerEntity.empty())
- noServerEntity.append(it);
- else
- noServerEntity.append("|").append(it);
- continue;
- }
-
-
- auto ret = getEntityClass(it);
- if (ret.first)
- {
- if (breakEntityStr.empty())
- breakEntityStr.append(it);
- else
- breakEntityStr.append("|").append(it);
- do_sendJsonStartSession(it, ret.second);
- }
- else
- DbgEx("unable find class of entity %s", it.c_str());
- }
- if (!breakEntityStr.empty())
- DbgEx("try to relink entity : %s, noStartEntity: %s, noServerEntity: %s", breakEntityStr.c_str(), noStartEntity.c_str(), noServerEntity.c_str());
- }
- catch (...)
- {
- DbgEx("exception in do_relink");
- }
- }
- }
- void CWebsocketServer::run() {
- DbgEx("CWebsocketServer -> run");
- boost::thread thread1(boost::bind(&CWebsocketServer::do_run, this));
- boost::thread thread2(boost::bind(&CWebsocketServer::do_relink, this));
- }
- void CWebsocketServer::do_sendJsonBroadcast(std::string js)
- {
- js = restroreTransId(js);
- DbgEx("message broadcast : json = %s", js.c_str());
- if (js.empty())
- {
- DbgEx("string empty");
- return;
- }
- if (m_connection_hdls.empty())
- {
- DbgEx("message_from_socket : no websocket client connection");
- }
- else
- {//broadcast不进行转换
- if (js.empty())
- {
- DbgEx("string empty 2");
- return;
- }
- //DbgEx("do_sendJsonBroadcast Enter...");
- for (auto it = m_connection_hdls.begin(); it != m_connection_hdls.end(); it++)
- {
- try
- {
- websocketpp::connection_hdl hdl = it->second;
- m_wsserver.send(hdl, js, websocketpp::frame::opcode::TEXT);
- }
- catch (const websocketpp::lib::error_code& e)
- {
- DbgEx("m_wsserver send crash : error message=%s", e.message());
- }
- catch (const std::exception& e) {
- DbgEx("std exception : %s", e.what());
- }
- catch (...) {
- DbgEx("other exception");
- }
- }
- //DbgEx("do_sendJsonBroadcast End...");
- }
- }
- void CWebsocketServer::do_send_notifyMsg(unsigned hdlID, unsigned transId, const std::string& reason, const std::string& errmsg, const std::string& rebootTime, const DWORD& dwSysError, const DWORD& dwUserCode) {
- #if (defined _WIN32 || defined _WIN64)
- auto notifyMsg = CSimpleStringA::Format("{\"messageType\":%d,\"transID:%d\", \"reason\":%s, \"errmsg\":%s, \"rebootTime\":%s}", RegisterNotify, transId, reason, errmsg, rebootTime);
- #else
- auto notifyMsg = CSimpleStringA::Format(R"({"messageType":%d,"transID:%d", "reason":%s, "errmsg":%s, "rebootTime":%s})", RegisterNotify, transId, reason, errmsg, rebootTime);
- #endif
- try
- {
- //DbgEx("do_send_notifyMsg Enter...");
- std::map<unsigned int, websocketpp::connection_hdl>::iterator it = m_connection_hdls.find(hdlID);
- if (m_connection_hdls.end() != it)
- {
- //DbgEx("Send....");
- websocketpp::connection_hdl hdl = it->second;
- m_wsserver.send(hdl, notifyMsg.GetData(), websocketpp::frame::opcode::TEXT);
- }
- //DbgEx("do_send_notifyMsg End...");
- }
- catch (const websocketpp::lib::error_code& e)
- {
- DbgEx("m_wsserver send crash : error message=%s", e.message());
- }
- catch (const std::exception& e) {
- DbgEx("std exception : %s", e.what());
- }
- catch (...) {
- DbgEx("other exception");
- }
- }
- void CWebsocketServer::do_sendJson(std::string js, int hdlID, unsigned int id)
- {
- js = restroreTransId(js);
- DbgEx("WebSocket Search message_from_socket : json = %s", js.c_str());
- if (js.empty())
- {
- DbgEx("string empty");
- return;
- }
- if (m_connection_hdls.empty())
- DbgEx("message_from_socket : no websocket client connection");
- else {
- #if(defined _WIN32 || defined _WIN64)
- js = string_to_utf8(js);
- #endif
- if (js.empty())
- {
- DbgEx("string empty 2");
- return;
- }
- try
- {
- std::map<unsigned int, websocketpp::connection_hdl>::iterator it = m_connection_hdls.find(hdlID);
- if (m_connection_hdls.end() != it)
- {
- //DbgEx("Send....");
- websocketpp::connection_hdl hdl = it->second;
- m_wsserver.send(hdl, js, websocketpp::frame::opcode::TEXT);
- }
- else
- DbgEx("ws connection handler not found! id = %u", id);
- }
- catch (const websocketpp::lib::error_code& e)
- {
- DbgEx("m_wsserver send crash : error message=%s", e.message());
- }
- catch (const std::exception& e) {
- DbgEx("std exception : %s", e.what());
- }
- catch (...) {
- DbgEx("other exception");
- }
- }
- }
- // socket message handler
- void CWebsocketServer::message_from_socket(CMessage& msg, unsigned int id) {
- // get message from socket and deserialize
- // then send back to the web client
- auto bufferLength = msg.getBufferLength();
- if (bufferLength > MAX_TRANSFER_LEN)
- DbgEx("WebSocket Search message_from_socket : buffer len = %d, buffer pre50:%s", msg.getBufferLength(), msg.printfHEX(50).c_str());
- //else
- // DbgEx("WebSocket Search message_from_socket : buffer len = %d", msg.getBufferLength());
- int replaceTransId = 0;
- if (msg.getLength() > 16 && m_esm != NULL)
- {//this is return buffer, it would not send out again, so I exchange the transId and sessionId place
- msg.exchangeSessionIdAndTransId();
- auto tmpReplace = m_esm->getSrcTransID(msg.getTransID());
- replaceTransId = tmpReplace.first == true ? tmpReplace.second : 0;
- }
- // 处理session ack,获取hdlID
- unsigned int hdlID = 0;
- std::vector<std::pair<int, int>> sendArr;
- if (8 == msg.getMessageType())
- {
- m_esm->AskProcessEvent(&msg, sendArr);
- if (sendArr.size() == 0)
- m_esm->AskProcessEvent(&msg, sendArr);
- for (auto i = sendArr.begin(); i != sendArr.end(); i++)
- {
- msg.setTransID(i->first);
- std::string js = this->m_serializer->BufferToJson(msg);
- do_sendJson(js, i->second, id);
- }
- }
- else if (2 == msg.getMessageType()) //session end
- {
- int sessionId = msg.getTransID();
- auto ret = m_esm->DoSessionRemove(sessionId);
- if (ret.first)
- DbgEx("detect session %s:%d lost!remove success", ret.second.c_str(), sessionId);
- else
- DbgEx("detect session %d lost!remove failed", sessionId);
- }
- else if (5 == msg.getMessageType())
- {//sessionAck
- auto ret = m_esm->AskProcessSession(&msg, hdlID);
- auto sessionId = msg.getSessionID();
- if (ACKPROCESS_NOTHING == ret.first)
- {
- DbgEx("can not find save session, process noting, %s", ret.second.c_str());//error
- return;
- }
- if (0 != hdlID)
- {//hdlId为0时,说明本地发起的session
- std::string js = this->m_serializer->BufferToJson(msg, replaceTransId);
- do_sendJson(js, hdlID, id); //发给首记录的CMessage
- }
- for (auto cur = m_msg_pool.begin(); cur != m_msg_pool.end(); cur++)
- {//sessionId为-1时需处理,否则会引发消息风暴
- if (cur->first == ret.second)
- {
- std::vector<std::pair<websocketpp::connection_hdl, std::string>> msgArr(cur->second);
- m_msg_pool.erase(cur); //为防止处理消息时引发消息风暴,拷贝并删除原msg_pool
- if (-1 != sessionId) {
- DbgEx("Session with %s Make, deal with record Msg:%d", ret.second.c_str(), msgArr.size());
- for (auto msg = msgArr.begin(); msg != msgArr.end(); msg++)
- deal_msg(msg->second, msg->first);
- break;
- }
- else
- {
- DbgEx("Session with %s Make Failed, deal with record Msg:%d", ret.second.c_str(), msgArr.size());
- #if (defined _WIN32 || defined _WIN64)
- for each (auto msg in msgArr)
- #else
- for (auto msg : msgArr)
- #endif
- deal_sessionBreakMsg(msg.second, msg.first);
- break;
- }
- }
- }
- }
- else
- {
- m_esm->AckProcess(&msg, hdlID);
- std::string js = this->m_serializer->BufferToJson(msg, replaceTransId);
- do_sendJson(js, hdlID, id);
- }
- }
- void CWebsocketServer::init_websocket() {
- auto checkPortExist = []() -> bool {
- boost::asio::io_service ioService;
- boost::asio::ip::tcp::socket* pSockTcp = NULL;
- bool bSockUseError = false;
- try {
- auto tcpEndpoint = boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), DEFAULT_SERVER_PORT);
- pSockTcp = new boost::asio::ip::tcp::socket(ioService, tcpEndpoint);
- bSockUseError = false;
- }
- catch (...)
- {
- bSockUseError = true;
- }
- //释放
- if (!bSockUseError && pSockTcp != NULL)
- {
- delete pSockTcp;
- pSockTcp = NULL;
- }
- ioService.stop();
- return bSockUseError;
- };
- /*
- while (true) {
- if (!checkPortExist())
- break;
- else
- DbgEx("checkPortExist failed");
- std::this_thread::sleep_for(std::chrono::seconds(5));
- }
- */
- try
- {
- // Set logging settings
- DbgEx("Set logging settings");
- m_wsserver.set_error_channels(websocketpp::log::elevel::all);
- m_wsserver.set_access_channels(websocketpp::log::alevel::all ^ websocketpp::log::alevel::frame_payload);
- DbgEx("m_wsserver.get_max_message_size = %d", m_wsserver.get_max_message_size());
- m_wsserver.set_max_message_size(MAX_TRANSFER_LEN);
- m_wsserver.set_close_handshake_timeout(3000);
- m_wsserver.set_pong_timeout(3000);
- // Initialize Asio
- DbgEx("Initialize Asio");
- m_wsserver.init_asio(&m_ios);
- // Set the default message handler to the echo handler
- DbgEx("Set the default message handler to the echo handler");
- m_wsserver.set_message_handler(websocketpp::lib::bind(&CWebsocketServer::message_handler, this, websocketpp::lib::placeholders::_1, websocketpp::lib::placeholders::_2));
- DbgEx("Set set_open_handler");
- m_wsserver.set_open_handler(websocketpp::lib::bind(&CWebsocketServer::open_handler, this, websocketpp::lib::placeholders::_1));
- m_wsserver.set_close_handler(websocketpp::lib::bind(&CWebsocketServer::close_handler, this, websocketpp::lib::placeholders::_1));
- // Listen on port 9002, add m_acceptor->set_option(lib::asio::socket_base::reuse_address(true), bec);
- m_wsserver.listen(DEFAULT_SERVER_PORT);
- // Queues a connection accept operation
- DbgEx("Queues a connection accept operation");
- m_wsserver.start_accept();
- m_initSuccess = true;
- DbgEx("do_run end");
- }
- catch (websocketpp::exception const& e) {
- DbgEx("websocketpp exception %s ", e.what());
- m_initSuccess = false;
- }
- catch (...) {
- DbgEx("other exception");
- m_initSuccess = false;
- }
- }
- void CWebsocketServer::myTest()
- {
- }
- #if(defined _WIN32 || defined _WIN64)
- std::string CWebsocketServer::utf8_to_string(const std::string& str) {
- int len = MultiByteToWideChar(CP_UTF8, 0, str.c_str(), -1, NULL, 0);
- wchar_t* wszGBK = new wchar_t[len + 1];
- wmemset(wszGBK, 0, len + 1);
- MultiByteToWideChar(CP_UTF8, 0, (LPCTSTR)str.c_str(), -1, wszGBK, len);
- len = WideCharToMultiByte(CP_ACP, 0, wszGBK, -1, NULL, 0, NULL, NULL);
- char* szGBK = new char[len + 1];
- memset(szGBK, 0, len + 1);
- WideCharToMultiByte(CP_ACP, 0, wszGBK, -1, szGBK, len, NULL, NULL);
- std::string strTemp(szGBK);
- delete[] szGBK;
- delete[] wszGBK;
- return strTemp;
- }
- std::string CWebsocketServer::string_to_utf8(const std::string& str) {
- int wcLen = MultiByteToWideChar(CP_ACP, 0, str.c_str(), -1, NULL, 0);
- if (wcLen > 0) {
- wchar_t* pwBuf = new wchar_t[wcLen + 1];
- if (pwBuf == NULL) {
- return std::string();
- }
- memset(pwBuf, 0, sizeof(wchar_t) * (wcLen + 1));
- wcLen = MultiByteToWideChar(CP_ACP, 0, str.c_str(), -1, pwBuf, wcLen);
- if (wcLen <= 0) {
- delete[] pwBuf;
- return std::string();
- }
- int ucLen = WideCharToMultiByte(CP_UTF8, 0, pwBuf, -1, NULL, 0, NULL, NULL);
- //DbgEx("ucLen = %d", ucLen);
- if (ucLen < 0) {
- delete[] pwBuf;
- return std::string();
- }
- char* pBuf = new char[ucLen + 1];
- if (pBuf == NULL) {
- delete pwBuf;
- return std::string();
- }
- memset(pBuf, 0, sizeof(char) * (ucLen + 1));
- ucLen = WideCharToMultiByte(CP_UTF8, 0, pwBuf, -1, pBuf, ucLen, NULL, NULL);
- if (ucLen <= 0) {
- delete[] pwBuf;
- delete[] pBuf;
- return std::string();
- }
- std::string retStr(pBuf);
- //DbgEx("string_to_utf8 return: %s", retStr.c_str());
- if (pwBuf) {
- delete[] pwBuf;
- pwBuf = NULL;
- }
- if (pBuf) {
- delete[] pBuf;
- pBuf = NULL;
- }
- return retStr;
- }
- return std::string();
- }
- #endif
- void CWebsocketServer::init_entity_sessions() {
- return;
- }
- }
|