| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681 |
- #if (defined _WIN32 || defined _WIN64)
- #include "stdafx.h"
- #include "../mod_browser/IEBrowser_client_g.h"
- #endif
- #include "CWebsocketServer.h"
- #include "baseEx.h"
- #include "CModTools.h"
- #include <vector>
- #include "MessageType.h"
- #include <future>
- #include <boost/thread/thread.hpp>
- #include <boost/thread/lock_guard.hpp>
- #include <iostream>
- #include <thread>
- #include "guitask/guitask.h"
- #define DEFAULT_SERVER_PORT 9002
- namespace Chromium{
- CWebsocketServer::CWebsocketServer(const char* strPath, CEntityBase* pEntity):
- m_wsserver(), m_ios(),m_serializer(NULL),m_socket(NULL),m_pEntity(pEntity),m_initSuccess(false)
- {
- DbgEx("CWebsocketServer constructor");
- // Initialize socket client
- DbgEx("Initialize socket client");
- this->m_pEntity = pEntity;
- m_socket = new CSocketClient(m_ios, "127.0.0.1", "4504", pEntity, 0);
- DbgEx("Set socket client MessageHandler -- message_handler");
- m_socket->SetMessageHandler(this);
- // Initialize serializer
- DbgEx("Initialize serializer");
- this->m_serializer = CWSCodec::getInstance();
- this->m_serializer->setEntityHandler(pEntity->GetFunction());
- this->m_serializer->init(strPath);
- while (Error_Succeed != m_socket->Connect())
- Sleep(100);
- DbgEx("init Entity Session Manager");
- myTest();
- m_esm = new EntitySessionManager();
-
- init_websocket();
- init_entity_sessions();
- }
- void CWebsocketServer::updateMsgPool(std::string entityName, std::string &payload, websocketpp::connection_hdl hdl)
- {
- //存储的消息应具备时效性,保证一个hdl只有一个记录的消息
- auto it = m_msg_pool.find(entityName);
- if (it != m_msg_pool.end())
- {
- auto &curArr = it->second;
- for (auto i = curArr.begin(); i != curArr.end(); i++)
- {
- if (&hdl == &i->first)
- {
- curArr.erase(i);
- break;
- }
-
- }
- it->second.push_back(std::make_pair(hdl, payload));
- }
-
- else
- {
- std::vector<std::pair<websocketpp::connection_hdl, std::string>> t_saveMsgs;
- t_saveMsgs.push_back(std::make_pair(hdl, payload));
- m_msg_pool.insert(std::make_pair(entityName, t_saveMsgs));
- }
- }
- void CWebsocketServer::storeEntityWithCLass(std::string entityName, std::string entityClass)
- {
- if (m_entityAndClass.end() == m_entityAndClass.find(entityName))
- m_entityAndClass.insert(std::make_pair(entityName, entityClass)); //not exist
- }
- std::pair<bool, std::string> CWebsocketServer::getEntityClass(std::string entityName)
- {
- auto it = m_entityAndClass.find(entityName);
- if (m_entityAndClass.end() == it)
- return std::make_pair(false, "");
- else
- return std::make_pair(true, it->second);
- }
- void CWebsocketServer::do_sendJsonStartSession(std::string entityName, std::string entityClass)
- {
- auto startSessionReq = m_esm->GetStartSessionRequest(entityName, entityClass);
- auto sessionBuf = this->m_serializer->JsonToBuffer(startSessionReq.second).second;
- if (nullptr == sessionBuf)
- {
- DbgEx("JsonToBuffer return NULL !");
- return;
- }
- m_esm->StoreSessionReq(startSessionReq.first, entityName);
- m_esm->MakeNewTransID(sessionBuf, 0);
- //保存msg,先建立session
- if (m_esm->checkBeginSession(entityName)) {
- m_esm->updateBeginSessionTime(entityName);
- WriteToFramework(sessionBuf);
- }
- else
- DbgEx("Already begin session in 5s, %s", entityName.c_str());
- }
- void CWebsocketServer::WriteToFramework(CMessage* msg)
- {
- m_socket->Write(msg);
- if (chromiumRpcTask::get_mutable_instance().isChromiumRpcWork()) chromiumRpcTask::get_mutable_instance().publishMsg({ generateTimeStr(), "WriteToFramework---------" + msg->printfHEX() });
- }
- void CWebsocketServer::deal_msg(std::string &payload, websocketpp::connection_hdl hdl)
- {
- boost::lock_guard<boost::mutex> lock(m_dealMsgLock); //在buffer和json处理时,deal_msg会调用多次,导致transId存在重复可能
- ::DbgEx("deal_msg :%d, %s", payload.length(), payload.length() > 800 ? payload.substr(0, 800).append("...").c_str() : payload.c_str());
- if (chromiumRpcTask::get_mutable_instance().isChromiumRpcWork()) chromiumRpcTask::get_mutable_instance().publishMsg({ generateTimeStr(), payload.c_str() });
- auto ret = this->m_serializer->JsonToBuffer(payload);
- CMessage *p = ret.second;
- if (ret.first == Broadcast && nullptr == p)
- {
- DbgEx("JsonToBuffer return NULL, perhaps an event happened");
- do_sendJsonBroadcast(payload);
- return;
- }
- else if (ret.first == GetSession)
- {
- cJSON* pJson = cJSON_Parse(payload.c_str());
- auto transIdJson = cJSON_GetObjectItem(pJson, "transId");
- if (transIdJson != nullptr)
- {
- int transid = transIdJson->valueint;
- auto sessionRet = m_esm->GetAllSessionRequest(transid);
- DbgEx("sessionJson:%s", sessionRet.second.c_str());
- if (sessionRet.first)
- {
- auto js = restroreTransId(sessionRet.second);
- m_wsserver.send(hdl, js, websocketpp::frame::opcode::TEXT);
- }
- }
- return;
- }
- else if (ret.first == BeginSession)
- storeEntityWithCLass(m_serializer->GetEntityName(payload), m_serializer->GetClassName(payload));
- // Try to manage entity session
- RequestProcessType processType = this->m_esm->RequestProcess(p, m_serializer->GetEntityName(payload), (long)hdl.lock().get());
- switch (processType)
- {
- case Chromium::PROCESS_NOTHING:
- DbgEx("do PROCESS_NOTHING");
- return;
- case Chromium::PROCESS_SEND:
- DbgEx("do PROCESS_SEND");
- if (nullptr != p)
- WriteToFramework(p);
- break;
- case Chromium::PROCESS_STARTSESSION:
- {
- DbgEx("do PROCESS_STARTSESSION");
- auto entityName = m_serializer->GetEntityName(payload);
- if (entityName == "Chromium")
- {
- DbgEx("discard, don't make session with chromium");
- break;
- }
- updateMsgPool(entityName, payload, hdl);
- do_sendJsonStartSession(entityName, m_serializer->GetClassName(payload));
- }
- break;
- case Chromium::PROCESS_FINDSESSION:
- {
- DbgEx("do PROCESS_FINDSESSION");
- std::string js = m_esm->GetStartSessionAck(p, m_serializer->GetEntityName(payload));
- js = restroreTransId(js);
- m_wsserver.send(hdl, js, websocketpp::frame::opcode::TEXT);
- }
- break;
- case Chromium::PROCESS_RECORDMSG:
- {
- DbgEx("do PROCESS_RECORDMSG");
- auto entityName = m_serializer->GetEntityName(payload);
- updateMsgPool(entityName, payload, hdl);
- }
-
- break;
- default:
- break;
- }
- }
- // websocket message handler
- void CWebsocketServer::message_handler(websocketpp::connection_hdl hdl, server::message_ptr msg){
- /*
- static bool isTest = true;
- if (isTest)
- {
- msg->set_payload("{\"messageType\":13,\"transID\" : 11111,\"name\" : \"UIState\",\"value\" : \"M\"}");
- isTest = false;
- }
- else
- return;
- */
- static int pos = 0;
- static std::map<int, int> t_hdlArr;
- int hdlPos = (long)hdl.lock().get();
- if (t_hdlArr.end() == t_hdlArr.find(hdlPos))
- t_hdlArr[hdlPos] = pos++;
- auto msgHandleFun = [&]() {
- DbgEx("CWebsocketServer -> message_handler");
- std::string payload = msg->get_payload();
- // proto convert here
- cJSON* pJson = cJSON_Parse(payload.c_str());
- if (nullptr == pJson) {
- DbgEx("CWebsocketServer -> message_handler, jsonErr:%s", payload.c_str());
- return;
- }
- auto transIdJson = cJSON_GetObjectItem(pJson, "transId");
- if(transIdJson != nullptr)
- {
- int transid = transIdJson->valueint;
- int modifyT = t_hdlArr[hdlPos] << 24;
- int dstTransId = transid ^ modifyT;
- cJSON_SetIntValue(transIdJson, dstTransId);
- }
- std::string dstPayLoad = cJSON_PrintUnformatted(pJson);
- deal_msg(dstPayLoad, hdl);
- DbgEx("Leave CWebsocketServer -> message_handler");
- };
- boost::thread dealMsgThread(msgHandleFun);
- dealMsgThread.join();
- }
- std::string CWebsocketServer::restroreTransId(std::string payLoad)
- {
- cJSON* pJson = cJSON_Parse(payLoad.c_str());
- auto transIdJson = cJSON_GetObjectItem(pJson, "transId");
- if (transIdJson != nullptr)
- {
- int transid = transIdJson->valueint;
- int dstTransId = transid & 0x00FFFFFF;
- cJSON_SetIntValue(transIdJson, dstTransId);
- }
- return cJSON_PrintUnformatted(pJson);
- }
- void CWebsocketServer::open_handler(websocketpp::connection_hdl hdl){
- // hand shake here
- DbgEx("new connection to ws server : %u", hdl.lock().get());
-
- m_connection_hdls.insert(std::pair<unsigned int, websocketpp::connection_hdl>((long)hdl.lock().get(), hdl));
- }
- void CWebsocketServer::close_handler(websocketpp::connection_hdl hdl){
- // hand shake here
-
- auto connectionIter = m_connection_hdls.find((long)hdl.lock().get());
- if (m_connection_hdls.end() != connectionIter)
- {
- DbgEx("connection with ws server closed : %u", hdl.lock().get());
- m_connection_hdls.erase(connectionIter);
- }
- else
- DbgEx("connection close erase failed : %u", hdl.lock().get());
-
- }
- void CWebsocketServer::do_run(){
- DbgEx("CWebsocketServer -> do_run");
- // Start the Asio io_service run loop
- DbgEx("Start the Asio io_service run loop");
- while (true)
- {
- try
- {
- m_ios.poll();
- Sleep(2);
- }
- catch (...)
- {
- DbgEx("other exception in ios poll");
- }
- }
- }
- void CWebsocketServer::do_relink()
- {
- DbgEx("do_relink Enter");
- while (true)
- {
- try
- {
- Sleep(10000);
- DbgEx("try do_relink");
- auto unlinkArr = m_esm->queryUnLinkSession();
- boost::lock_guard<boost::mutex> lock(m_dealMsgLock);
- for (auto it : unlinkArr)
- {
- auto ret = getEntityClass(it);
- if (ret.first)
- {
- DbgEx("try to relink %s, %s", it.c_str(), ret.second.c_str());
- do_sendJsonStartSession(it, ret.second);
- }
- else
- DbgEx("unable find class of entity %s", it.c_str());
-
- }
- }
- catch (...)
- {
- DbgEx("exception in do_relink");
- }
- }
-
- }
- void CWebsocketServer::run(){
- DbgEx("CWebsocketServer -> run");
-
- boost::thread thread1(boost::bind(&CWebsocketServer::do_run, this));
- boost::thread thread2(boost::bind(&CWebsocketServer::do_relink, this));
-
- }
- void CWebsocketServer::do_sendJsonBroadcast(std::string js)
- {
- js = restroreTransId(js);
- DbgEx("message broadcast : json = %s", js.c_str());
- if (js.empty())
- {
- DbgEx("string empty");
- return;
- }
- if (m_connection_hdls.empty())
- {
- DbgEx("message_from_socket : no websocket client connection");
- }
- else
- {//broadcast不进行转换
- if (js.empty())
- {
- DbgEx("string empty 2");
- return;
- }
- DbgEx("do_sendJsonBroadcast Enter...");
- for (auto it = m_connection_hdls.begin(); it != m_connection_hdls.end(); it++)
- {
- try
- {
- websocketpp::connection_hdl hdl = it->second;
- m_wsserver.send(hdl, js, websocketpp::frame::opcode::TEXT);
- }
- catch (const websocketpp::lib::error_code& e)
- {
- DbgEx("m_wsserver send crash : error message=%s", e.message());
- }
- catch (const std::exception &e) {
- DbgEx("std exception : %s", e.what());
- }
- catch (...) {
- DbgEx("other exception");
- }
- }
- DbgEx("do_sendJsonBroadcast End...");
- }
- }
- void CWebsocketServer::do_sendJson(std::string js, int hdlID, unsigned int id)
- {
- js = restroreTransId(js);
- DbgEx("WebSocket Search message_from_socket : json = %s", js.c_str());
- if (chromiumRpcTask::get_mutable_instance().isChromiumRpcWork()) chromiumRpcTask::get_mutable_instance().publishMsg({ generateTimeStr(), "do_sendJson---------" + js });
- if (js.empty())
- {
- DbgEx("string empty");
- return;
- }
- if (m_connection_hdls.empty())
- DbgEx("message_from_socket : no websocket client connection");
- else {
- #if(defined _WIN32 || defined _WIN64)
- js = string_to_utf8(js);
- #endif
- if (js.empty())
- {
- DbgEx("string empty 2");
- return;
- }
- try
- {
- DbgEx("do_sendJson Enter...");
- std::map<unsigned int, websocketpp::connection_hdl>::iterator it = m_connection_hdls.find(hdlID);
- if (m_connection_hdls.end() != it)
- {
- DbgEx("Send....");
- websocketpp::connection_hdl hdl = it->second;
- m_wsserver.send(hdl, js, websocketpp::frame::opcode::TEXT);
- }
- else {
- DbgEx("ws connection handler not found! id = %u", id);
- }
- DbgEx("do_sendJson End...");
- }
- catch (const websocketpp::lib::error_code& e)
- {
- DbgEx("m_wsserver send crash : error message=%s", e.message());
- }
- catch (const std::exception &e) {
- DbgEx("std exception : %s", e.what());
- FILE* fp = NULL;
- fp = fopen("chromiumTmp", "a+");
- if (fp == NULL)
- return;
- int res = fprintf(fp, "%s\n", js.c_str());
- fclose(fp);
- }
- catch (...) {
- DbgEx("other exception");
- }
- }
- }
- // socket message handler
- void CWebsocketServer::message_from_socket(CMessage& msg, unsigned int id){
- // get message from socket and deserialize
- // then send back to the web client
- auto bufferLength = msg.getBufferLength();
- if(bufferLength > MAX_TRANSFER_LEN)
- DbgEx("WebSocket Search message_from_socket : buffer len = %d, buffer pre50:%s", msg.getBufferLength(), msg.printfHEX(50).c_str());
- else
- DbgEx("WebSocket Search message_from_socket : buffer len = %d", msg.getBufferLength());
- if (chromiumRpcTask::get_mutable_instance().isChromiumRpcWork()) chromiumRpcTask::get_mutable_instance().publishMsg({ generateTimeStr(), "message_from_socket---------" + msg.printfHEX() });
- int replaceTransId = 0;
- if (msg.getLength() > 16 && m_esm != NULL)
- {//this is return buffer, it would not send out again, so I exchange the transId and sessionId place
- msg.exchangeSessionIdAndTransId();
- auto tmpReplace = m_esm->getSrcTransID(msg.getTransID());
- replaceTransId = tmpReplace.first == true ? tmpReplace.second : 0;
- }
- // 处理session ack,获取hdlID
- unsigned int hdlID = 0;
- std::vector<std::pair<int, int>> sendArr;
- if (8 == msg.getMessageType())
- {
- m_esm->AskProcessEvent(&msg, sendArr);
- if (sendArr.size() == 0)
- m_esm->AskProcessEvent(&msg, sendArr);
- for (auto i = sendArr.begin(); i != sendArr.end(); i++)
- {
- msg.setTransID(i->first);
- std::string js = this->m_serializer->BufferToJson(msg);
- do_sendJson(js, i->second, id);
- }
- }
- else if (2 == msg.getMessageType()) //session end
- {
- int sessionId = msg.getTransID();
-
- auto ret = m_esm->DoSessionRemove(sessionId);
- if (ret.first)
- DbgEx("detect session %s:%d lost!remove success", ret.second.c_str(), sessionId);
- else
- DbgEx("detect session %d lost!remove failed", sessionId);
-
- }
- else if (5 == msg.getMessageType())
- {//sessionAck
- auto ret = m_esm->AskProcessSession(&msg, hdlID);
- auto sessionId = msg.getSessionID();
- if (ACKPROCESS_NOTHING == ret.first)
- {
- DbgEx("can not find save session, process noting, %s", ret.second.c_str());//error
- return;
- }
- if (0 != hdlID)
- {//hdlId为0时,说明本地发起的session
- std::string js = this->m_serializer->BufferToJson(msg, replaceTransId);
- do_sendJson(js, hdlID, id); //发给首记录的CMessage
- }
- for (auto cur = m_msg_pool.begin(); cur != m_msg_pool.end() && -1 != sessionId;cur++)
- {//sessionId为-1时需处理,否则会引发消息风暴
- if (cur->first == ret.second)
- {
- std::vector<std::pair<websocketpp::connection_hdl, std::string>> msgArr(cur->second);
- m_msg_pool.erase(cur); //为防止处理消息时引发消息风暴,拷贝并删除原msg_pool
- DbgEx("Session with %s Make, deal with record Msg:%d", ret.second.c_str(), msgArr.size());
- for (auto msg = msgArr.begin(); msg != msgArr.end(); msg++)
- deal_msg(msg->second, msg->first);
- break;
- }
- }
-
- }
- else
- {
- m_esm->AckProcess(&msg, hdlID);
- std::string js = this->m_serializer->BufferToJson(msg, replaceTransId);
- do_sendJson(js, hdlID, id);
- }
-
- }
- void CWebsocketServer::init_websocket(){
- auto checkPortExist = []() -> bool {
- boost::asio::io_service ioService;
- boost::asio::ip::tcp::socket* pSockTcp = NULL;
- bool bSockUseError = false;
- try {
- auto tcpEndpoint = boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), DEFAULT_SERVER_PORT);
- pSockTcp = new boost::asio::ip::tcp::socket(ioService, tcpEndpoint);
- bSockUseError = false;
- }
- catch (...)
- {
- bSockUseError = true;
- }
- //释放
- if (!bSockUseError && pSockTcp != NULL)
- {
- delete pSockTcp;
- pSockTcp = NULL;
- }
- ioService.stop();
- return bSockUseError;
- };
- /*
- while (true) {
- if (!checkPortExist())
- break;
- else
- DbgEx("checkPortExist failed");
- std::this_thread::sleep_for(std::chrono::seconds(5));
- }
- */
- try
- {
- // Set logging settings
- DbgEx("Set logging settings");
- m_wsserver.set_error_channels(websocketpp::log::elevel::all);
- m_wsserver.set_access_channels(websocketpp::log::alevel::all ^ websocketpp::log::alevel::frame_payload);
- DbgEx("m_wsserver.get_max_message_size = %d", m_wsserver.get_max_message_size());
- m_wsserver.set_max_message_size(MAX_TRANSFER_LEN);
- m_wsserver.set_close_handshake_timeout(3000);
- m_wsserver.set_pong_timeout(3000);
- // Initialize Asio
- DbgEx("Initialize Asio");
- m_wsserver.init_asio(&m_ios);
- // Set the default message handler to the echo handler
- DbgEx("Set the default message handler to the echo handler");
- m_wsserver.set_message_handler(websocketpp::lib::bind(&CWebsocketServer::message_handler, this, websocketpp::lib::placeholders::_1, websocketpp::lib::placeholders::_2));
- DbgEx("Set set_open_handler");
- m_wsserver.set_open_handler(websocketpp::lib::bind(&CWebsocketServer::open_handler, this, websocketpp::lib::placeholders::_1));
- m_wsserver.set_close_handler(websocketpp::lib::bind(&CWebsocketServer::close_handler, this, websocketpp::lib::placeholders::_1));
- // Listen on port 9002
- m_wsserver.listen(DEFAULT_SERVER_PORT);
-
- // Queues a connection accept operation
- DbgEx("Queues a connection accept operation");
- m_wsserver.start_accept();
- m_initSuccess = true;
- DbgEx("do_run end");
- } catch (websocketpp::exception const & e) {
- DbgEx("websocketpp exception %s ", e.what());
- m_initSuccess = false;
- } catch (...) {
- DbgEx("other exception");
- m_initSuccess = false;
- }
- }
- void CWebsocketServer::myTest()
- {
- // using namespace IEBrowser;
- // IEBrowserSrv_ClientBase *pClient = new IEBrowserSrv_ClientBase(this->m_pEntity);
- // auto rc = pClient->Connect();
- // if (Error_Succeed != rc)
- // DbgEx("connect to IEBrowser entity fail: %d", rc);
- // else
- // DbgEx("connect to IEBrowser entity success");
- }
- #if(defined _WIN32 || defined _WIN64)
- std::string CWebsocketServer::string_to_utf8(const std::string & str) {
- LOG_FUNCTION();
- DbgEx("start string_to_utf8...");
- int wcLen = MultiByteToWideChar(CP_ACP, 0, str.c_str(), -1, NULL, 0);
- DbgEx("wcLen = %d", wcLen);
- if(wcLen > 0) {
- WCHAR* pwBuf = new WCHAR[wcLen + 1];
- if(pwBuf == NULL) {
- return std::string();
- }
- memset(pwBuf, 0, sizeof(WCHAR)*(wcLen + 1));
- wcLen = MultiByteToWideChar(CP_ACP, 0, str.c_str(), -1, pwBuf, wcLen);
- if(wcLen <= 0) {
- delete[] pwBuf;
- return std::string();
- }
- int ucLen = WideCharToMultiByte(CP_UTF8, 0, pwBuf, -1, NULL, 0, NULL, NULL);
- DbgEx("ucLen = %d", ucLen);
- if(ucLen < 0) {
- delete[] pwBuf;
- return std::string();
- }
- char* pBuf = new char[ucLen + 1];
- if(pBuf == NULL) {
- delete pwBuf;
- return std::string();
- }
- memset(pBuf, 0, sizeof(char)*(ucLen + 1));
- ucLen = WideCharToMultiByte(CP_UTF8, 0, pwBuf, -1, pBuf, ucLen, NULL, NULL);
- if (ucLen <= 0) {
- delete[] pwBuf;
- delete[] pBuf;
- return std::string();
- }
- std::string retStr(pBuf);
- DbgEx("string_to_utf8 return: %s",retStr.c_str());
- if(pwBuf) {
- delete[] pwBuf;
- pwBuf = NULL;
- }
- if(pBuf) {
- delete[] pBuf;
- pBuf = NULL;
- }
- return retStr;
- }
- return std::string();
- }
- #endif
- void CWebsocketServer::init_entity_sessions(){
- return;
- }
- }
|