#ifndef RVC_MOD_UPLOAD_UPLOADFSM_H_ #define RVC_MOD_UPLOAD_UPLOADFSM_H_ #include "SpBase.h" #include "SpFSM.h" #include "SpSecureClient.h" #include "SpUtility.h" #include "upload.h" #include #define USER_EVT_JMP_DISABLE EVT_USER+1 #define USER_EVT_JMP_ENABLE EVT_USER+2 #define USER_EVT_JMP_CONNECT EVT_USER+3 #define USER_EVT_DISCONNECT EVT_USER+4 #define USER_EVT_UPLOAD_ANS EVT_USER+5 #define USER_EVT_BLOCK_ANS EVT_USER+6 #define USER_EVT_JMP_UPLOAD EVT_USER+7 using namespace std; #pragma pack(1) // [StructName("UPREQ")] struct UploadReq { char TerminalNo[16]; char FileName[64]; char FileType[16]; int FileLength; int FileCreatDate; }; // [StructName("UPANS")] struct UploadReply { int ResultCode; char UploadID[16]; int BeginBlock; }; //[StructName("BLKREQ")] struct BlockReq { char TerminalNo[16]; char FileName[64]; char UploadID[16]; int BlockNo; char Data[0]; }; //[StructName("BLKANS")] struct BlockReply { char UploadID[16]; int ResultCode; }; #pragma pack() enum UploadCtlCode { Finish = 0, //完成(BLOCK) Begin = 1, //开始上传(UPLOAD) Continua = 2, //文件已经存在,断点续传,按返回块开始传(UPLOAD)(BLOCK) ErrorBlock = 3, //上块错误,重新传(BLOCK) ErrorType = 4, //错误的类型(UPLOAD) Timeout = 5, //过期(UPLOAD) ErrorUploadId = 6, //错误的上传id(BLOCK) }; //上传请求回应 struct UploadAnsEvent : public FSMEvent { UploadAnsEvent(BYTE *pBuf, int nLen) : FSMEvent(USER_EVT_UPLOAD_ANS) { memcpy(&m_reply, pBuf, sizeof(UploadReply)); } virtual ~UploadAnsEvent() {} UploadReply m_reply; }; //上传块请求回应 struct BlockAnsEvent : public FSMEvent { BlockAnsEvent(BYTE *pBuf, int nLen) : FSMEvent(USER_EVT_BLOCK_ANS) { memcpy(&m_reply, pBuf, sizeof(BlockReply)); } virtual ~BlockAnsEvent() {} BlockReply m_reply; }; struct UploadProgress { int uploadState; int uploadNumber; CSimpleStringA elapseTime; }; class UploadConnection; class UploadFSM : public FSMImpl, public IFSMStateHooker, public ISysVarListener { public: enum {s0,s1,s2,s3,s4,s5,s6}; BEGIN_FSM_STATE(UploadFSM) FSM_STATE_ENTRY(s0,"Starting",s0_on_entry,s0_on_exit,s0_on_event) FSM_STATE_ENTRY(s1, "Disable",s1_on_entry,s1_on_exit,s1_on_event) FSM_STATE_ENTRY(s2, "Enable", s2_on_entry, s2_on_exit, s2_on_event) FSM_STATE_ENTRY(s3, "Connect", s3_on_entry, s3_on_exit, s3_on_event) FSM_STATE_ENTRY(s4, "Upload", s4_on_entry, s4_on_exit, s4_on_event) FSM_STATE_ENTRY(s5, "Release", s5_on_entry, s5_on_exit, s5_on_event) FSM_STATE_ENTRY(s6, "DisConnect", s6_on_entry, s6_on_exit, s6_on_event)//添加断连等待时间状态 END_FSM_STATE() BEGIN_FSM_RULE(UploadFSM,s0) FSM_RULE_ENTRY_ANY(s0, s1, USER_EVT_JMP_DISABLE) FSM_RULE_ENTRY_ANY(s0, s2, USER_EVT_JMP_ENABLE) FSM_RULE_ENTRY_ANY(s1, s2, USER_EVT_JMP_ENABLE) FSM_RULE_ENTRY_ANY(s2, s3, USER_EVT_JMP_CONNECT) FSM_RULE_ENTRY_ANY(s2, s1, USER_EVT_JMP_DISABLE) FSM_RULE_ENTRY_ANY(s3, s4, USER_EVT_JMP_UPLOAD) FSM_RULE_ENTRY_ANY(s3, s1, USER_EVT_JMP_DISABLE) FSM_RULE_ENTRY(s4, s5, USER_EVT_UPLOAD_ANS, 1) FSM_RULE_ENTRY(s4, s6, USER_EVT_DISCONNECT, 1)//断连时跳转到S6 FSM_RULE_ENTRY(s4, s5, USER_EVT_BLOCK_ANS, 1) FSM_RULE_ENTRY_ANY(s4, s5, EVT_TIMER) FSM_RULE_ENTRY_ANY(s4, s1, USER_EVT_JMP_DISABLE) FSM_RULE_ENTRY_ANY(s5, s2, EVT_TIMER) FSM_RULE_ENTRY_ANY(s5, s2, USER_EVT_DISCONNECT) FSM_RULE_ENTRY_ANY(s5, s1, USER_EVT_JMP_DISABLE) FSM_RULE_ENTRY_ANY(s6, s2, EVT_TIMER)//等待一段时间进入S2 FSM_RULE_ENTRY_ANY(s6, s1, USER_EVT_JMP_DISABLE) END_FSM_RULE() UploadFSM(); ~UploadFSM(); virtual void OnStateTrans(int iSrcState, int iDstState); virtual void OnSysVarEvent(const char *pszKey, const char *pszValue,const char *pszOldValue,const char *pszEntityName); virtual ErrorCodeEnum OnInit(); virtual ErrorCodeEnum OnExit(); void s0_on_entry(); void s0_on_exit(); unsigned int s0_on_event(FSMEvent* event); void s1_on_entry(); void s1_on_exit(); unsigned int s1_on_event(FSMEvent* event); void s2_on_entry(); void s2_on_exit(); unsigned int s2_on_event(FSMEvent* event); void s3_on_entry(); void s3_on_exit(); unsigned int s3_on_event(FSMEvent* event); void s4_on_entry(); void s4_on_exit(); unsigned int s4_on_event(FSMEvent* event); void s5_on_entry(); void s5_on_exit(); unsigned int s5_on_event(FSMEvent* event); void s6_on_entry(); void s6_on_exit(); unsigned int s6_on_event(FSMEvent* event); private: ErrorCodeEnum LoadServerConfigFromCenterSetting(); file_t *find_first_upload_file(); void post_process(); int getUploadFileNum(int &fileSumlen); public: int getCheckDirFile(int silentTime);//检查文件夹文件个数 void getUploadProgress(UploadProgress &progress);//获取上传文件状态 bool clearUploadDate();//清空上传日期 ErrorCodeEnum SaveUploadDate();//保存上传日期 ErrorCodeEnum getUploadDate(CAutoArray &strList);//获取上传日期 ErrorCodeEnum insertUploadDate();//插入上传日期 bool uploadDate_exist(CSimpleStringA uploadDate); private: UploadConnection *m_pConnection; file_t *m_uploading_file; //当前正在上传的文件 int m_uploading_block_id; //正在上传的块号 #ifdef RVC_OS_WIN HANDLE m_uploading_handle; #else() FILE* m_uploading_handle; #endif // RVC_OS_WIN char m_upload_id[16]; CSimpleStringA m_terminalNo; CSimpleStringA m_server1; int m_server1_port; CSimpleStringA m_server2; int m_server2_port; struct list_head m_updir_list;//文件夹配置列表 //添加lwt list* m_check_dir;//重点文件夹的列表 DWORD m_dBeginTime;//统计上传速度开始时间 DWORD m_dEndTime;//统计上传速度结束时间 int m_iSpeed;//单位k/ms CRITICAL_SECTION m_cs;//临界区变量 list* m_uploadDateList;//上传日期文件列表 }; class UploadConnection : public SpSecureClient { public: UploadConnection(CEntityBase *pEntity, UploadFSM *pFSM) : SpSecureClient(pEntity), m_pFSM(pFSM) {} void SendUpReq(file_t *file) { UploadReq req = {0}; CSystemStaticInfo si; { m_pEntity->GetFunction()->GetSystemStaticInfo(si); } strcpy(&req.TerminalNo[0], si.strTerminalID); strcpy(&req.FileType[0], file->owner->name); #ifdef RVC_OS_WIN strcpy(&req.FileName[0], file->name); #else string fileName = file->name; std::string result = SP::Utility::UTF8ToGBK(fileName); strcpy(&req.FileName[0], result.c_str()); #endif // RVC_OS_WIN req.FileLength = file->length; req.FileCreatDate = file->create_time; CSmartPointer pkt = CreateNewPackage("UPREQ"); pkt->AddStruct("UPREQ", false, false, (LPBYTE)&req, sizeof(UploadReq)); SendPackage(pkt); } #ifdef RVC_OS_WIN bool SendBlockReq(file_t* file, HANDLE hFile, const char* upload_id, int block_id) { bool ret = true; int nLen = sizeof(BlockReq) + 0x8000; BlockReq* req = (BlockReq*)new BYTE[nLen]; memset(req, 0, nLen); CSystemStaticInfo si; { m_pEntity->GetFunction()->GetSystemStaticInfo(si); } req->BlockNo = block_id; strcpy(&req->TerminalNo[0], si.strTerminalID); strcpy(&req->FileName[0], file->name); memcpy(req->UploadID, upload_id, sizeof(req->UploadID)); DWORD dwOffset = block_id << 15; // 32k once DWORD dwLength = min(file->length - dwOffset, 1 << 15); SetFilePointer(hFile, dwOffset, NULL, FILE_BEGIN); BOOL bRet = ReadFile(hFile, &req->Data[0], dwLength, &dwLength, NULL); if (bRet) { //Dbg("block len:%d", dwLength); CSmartPointerpkt = CreateNewPackage("BLKREQ"); pkt->AddStruct("BLKREQ", false, false, (LPBYTE)req, sizeof(BlockReq) + dwLength); SendPackage(pkt); ret = true; } else { //增加跳出分支,防止状态机跳不出 DWORD err = GetLastError(); Dbg("SendBlockReq is error file name [%s] , block_id [%d] , GetLastError = %d", file->name, block_id, err); ret = false; } delete req; return ret; } #else bool SendBlockReq(file_t* file, FILE* hFile, const char* upload_id, int block_id) { bool ret = true; int nLen = sizeof(BlockReq) + 0x8000; BlockReq* req = (BlockReq*)new BYTE[nLen]; memset(req, 0, nLen); CSystemStaticInfo si; { m_pEntity->GetFunction()->GetSystemStaticInfo(si); } req->BlockNo = block_id; strcpy(&req->TerminalNo[0], si.strTerminalID); string fileName = file->name; std::string result = SP::Utility::UTF8ToGBK(fileName); strcpy(&req->FileName[0], fileName.c_str()); memcpy(req->UploadID, upload_id, sizeof(req->UploadID)); DWORD dwOffset = block_id << 15; // 32k once //DWORD dwLength = min(file->length - dwOffset, 1 << 15); DWORD dwLength = ((file->length - dwOffset) < (1 << 15)) ? (file->length - dwOffset) : (1 << 15); if (fseek(hFile, dwOffset, SEEK_SET)==0) { int bRet = fread(&req->Data[0], dwLength, 1, hFile); //dwLength=0时,表示文件长度是0,也需要发送到分行服务 if (bRet == 1||dwLength==0) { CSmartPointerpkt = CreateNewPackage("BLKREQ"); pkt->AddStruct("BLKREQ", false, false, (LPBYTE)req, sizeof(BlockReq) + dwLength); SendPackage(pkt); ret = true; } else { //增加跳出分支,防止状态机跳不出 Dbg("SendBlockReq fread fail, is error file name [%s] , block_id [%d] ", file->name, block_id); ret = false; } } else { //增加跳出分支,防止状态机跳不出 Dbg("SendBlockReq fseek fail, is error file name [%s] , block_id [%d]", file->name, block_id); ret = false; } delete req; return ret; } #endif // RVC_OS_WIN protected: virtual ~UploadConnection() {} virtual void OnDisconnect() { m_pFSM->PostEventFIFO(new FSMEvent(USER_EVT_DISCONNECT)); } virtual void OnPkgAnswer(const CSmartPointer &pRecvPkg) { string serviceCode = pRecvPkg->GetServiceCode(); if (serviceCode == "UPREQ") { DWORD dwSysCode, dwUserCode; string strErrMsg; ErrorCodeEnum rc = Error_Succeed; if (pRecvPkg->GetErrMsg(dwSysCode, dwUserCode, strErrMsg)) { rc = (ErrorCodeEnum)dwSysCode; LogError(Severity_Middle, rc, dwUserCode, CSimpleStringA::Format("create up packet Fail!, %s", strErrMsg.c_str())); //Sleep(3000); OnDisconnect(); } else { int nLen = pRecvPkg->GetStructLen("UPANS"); if (nLen > 0) { BYTE *pBuf = new BYTE[nLen]; memset(pBuf, 0, nLen); int nArrayNum = 0; if (pRecvPkg->GetStructData("UPANS", pBuf, &nLen, &nArrayNum)) { FSMEvent *evt = new UploadAnsEvent(pBuf, nLen); m_pFSM->PostEventFIFO(evt); } else { Dbg("create invalid upans packet!"); OnDisconnect(); } delete pBuf; }else{ //nlen增加跳出分支 Dbg("upans packet len is error len=%d",nLen); //Sleep(1000); OnDisconnect(); } } } else if (serviceCode == "BLKREQ") { DWORD dwSysCode, dwUserCode; string strErrMsg; ErrorCodeEnum rc = Error_Succeed; if (pRecvPkg->GetErrMsg(dwSysCode, dwUserCode, strErrMsg)) { rc = (ErrorCodeEnum)dwSysCode; LogError(Severity_Middle, rc, dwUserCode, CSimpleStringA::Format("create up blk Fail!, %s", strErrMsg.c_str())); //Sleep(3000); OnDisconnect(); } else { int nLen = pRecvPkg->GetStructLen("BLKANS"); if (nLen > 0) { BYTE *pBuf = new BYTE[nLen]; memset(pBuf, 0, nLen); int nArrayNum = 0; if (pRecvPkg->GetStructData("BLKANS", pBuf, &nLen, &nArrayNum)) { FSMEvent *evt = new BlockAnsEvent(pBuf, nLen); m_pFSM->PostEventFIFO(evt); } else { Dbg("create invalid blkans packet!"); OnDisconnect(); } delete pBuf; }else{ //nlen增加跳出分支 Dbg("blkans packet len is error len=%d",nLen); //Sleep(1000); OnDisconnect(); } } } else { Dbg("unknown service code! code= %s",serviceCode.c_str()); //Sleep(2000); OnDisconnect(); } } private: UploadFSM *m_pFSM; }; #endif // RVC_MOD_UPLOAD_UPLOADFSM_H_