ioqueue-unix.c 77 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755
  1. #include "precompile.h"
  2. #include "ioqueue.h"
  3. #include "timerqueue.h"
  4. #include "memutil.h"
  5. #include "refcnt.h"
  6. #include "strutil.h"
  7. #include "sockutil.h"
  8. #include "bus.h"
  9. #include "array.h"
  10. #include "evtpoll.h"
  11. #include <winpr/file.h>
  12. #include <winpr/handle.h>
  13. #include <winpr/synch.h>
  14. #include <winpr/pipe.h>
  15. #include <winpr/string.h>
  16. #include <winpr/wlog.h>
  17. #define TAG TOOLKIT_TAG("ioqueue_unix")
  18. #ifndef SO_UPDATE_CONNECT_CONTEXT
  19. #define SO_UPDATE_CONNECT_CONTEXT 0x7010
  20. #endif
  21. #ifndef WSAID_CONNECTEX
  22. #define WSAID_CONNECTEX \
  23. {0x25a207b9,0xddf3,0x4660,{0x8e,0xe9,0x76,0xe5,0x8c,0x74,0x06,0x3e}}
  24. #endif
  25. #define MT_TTL 10*60*1000 /* maintenance 10 minutes */
  26. #define MT_INTERVAL 30*1000 /* maintenance 30 seconds */
  27. /* The address specified in AcceptEx() must be 16 more than the size of
  28. * SOCKADDR (source: MSDN).
  29. */
  30. #define ACCEPT_ADDR_LEN (16+sizeof(SOCKADDR))
  31. struct ioqueue_t {
  32. HANDLE iocp; //for build successfully temporary!!!!!!!!!!!!!!! TODO: delete it!!
  33. evtpoll_t* ep;
  34. void *user_data;
  35. /* timer */
  36. spinlock_t tm_queue_lock;
  37. timer_queue_t *tm_queue;
  38. /* msg handler */
  39. ioqueue_on_msg_callback msg_handlers[MAX_MSG][MAX_MSG_PRIORITY];
  40. LONG msg_cnt;
  41. /* connect */
  42. spinlock_t connect_list_lock;
  43. struct list_head connect_list;
  44. spinlock_t handler_list_lock;
  45. struct list_head handler_list;
  46. LONG stop;
  47. };
  48. typedef struct ioqueue_msg {
  49. int msg_type;
  50. int param1;
  51. int param2;
  52. HANDLE evt; /* for send message */
  53. }ioqueue_msg;
  54. #define HANDLE_TYPE_ACCEPTOR 0x01
  55. #define HANDLE_TYPE_TCPSOCK 0x02
  56. #define HANDLE_TYPE_UDPSOCK 0x03
  57. #define HANDLE_TYPE_FILE 0x04
  58. #define HANDLE_TYPE_PIPEACCEPTOR 0x05
  59. #define OV_ACCEPT 0x01
  60. #define OV_CONNECT 0x02
  61. #define OV_SENDSOME 0x03
  62. #define OV_SENDN 0x04
  63. #define OV_RECVSOME 0x05
  64. #define OV_RECVN 0x06
  65. #define OV_SENDTO 0x07
  66. #define OV_RECVFROM 0x08
  67. #define OV_READFILESOME 0x09
  68. #define OV_WRITEFILESOME 0x0a
  69. #define OV_READFILEN 0x0b
  70. #define OV_WRITEFILEN 0x0c
  71. #define OV_RECVUNTIL 0x0d
  72. #define OV_CONNECTPIPE 0x0e
  73. typedef struct ioqueue_base_overlapped_t {
  74. OVERLAPPED ov;
  75. int type;
  76. void *user_data;
  77. struct list_head pending_entry;
  78. ioqueue_handle_context *handle_ctx;
  79. }ioqueue_base_overlapped_t;
  80. typedef struct ioqueue_accept_overlapped_t {
  81. ioqueue_base_overlapped_t base;
  82. SOCKET client;
  83. ioqueue_on_accept_callback on_accept_callback;
  84. char accept_buf[2*ACCEPT_ADDR_LEN];
  85. }ioqueue_accept_overlapped_t;
  86. typedef struct ioqueue_connect_overlapped_t {
  87. ioqueue_base_overlapped_t base;
  88. ioqueue_on_connect_callback on_connect_callback;
  89. struct list_head node;
  90. HANDLE hevt;
  91. }ioqueue_connect_overlapped_t;
  92. typedef struct ioqueue_sendsome_overlapped_t {
  93. ioqueue_base_overlapped_t base;
  94. ioqueue_on_send_callback on_send_callback;
  95. WSABUF wsabuf;
  96. }ioqueue_sendsome_overlapped_t;
  97. typedef struct ioqueue_sendn_overlapped_t {
  98. ioqueue_base_overlapped_t base;
  99. ioqueue_on_send_callback on_send_callback;
  100. WSABUF wsabuf;
  101. char *original_buf;
  102. unsigned int sended_bytes;
  103. unsigned int total_bytes;
  104. }ioqueue_sendn_overlapped_t;
  105. typedef struct ioqueue_recvsome_overlapped_t {
  106. ioqueue_base_overlapped_t base;
  107. ioqueue_on_recv_callback on_recv_callback;
  108. WSABUF wsabuf;
  109. DWORD dwFlags;
  110. }ioqueue_recvsome_overlapped_t;
  111. typedef struct ioqueue_recvn_overlapped_t {
  112. ioqueue_base_overlapped_t base;
  113. ioqueue_on_recv_callback on_recv_callback;
  114. WSABUF wsabuf;
  115. char *original_buf;
  116. unsigned int recved_bytes;
  117. unsigned int total_bytes;
  118. DWORD dwFlags;
  119. }ioqueue_recvn_overlapped_t;
  120. typedef struct ioqueue_recvuntil_overlapped_t {
  121. ioqueue_base_overlapped_t base;
  122. ioqueue_on_recvuntil_callback on_recvuntil_callback;
  123. WSABUF wsabuf;
  124. char *original_buf;
  125. char *delimer;
  126. unsigned int recved_bytes;
  127. unsigned int total_bytes;
  128. DWORD dwFlags;
  129. }ioqueue_recvuntil_overlapped_t;
  130. typedef struct ioqueue_sendto_overlapped_t {
  131. ioqueue_base_overlapped_t base;
  132. ioqueue_on_sendto_callback on_sendto_callback;
  133. WSABUF wsabuf;
  134. }ioqueue_sendto_overlapped_t;
  135. typedef struct ioqueue_recvfrom_overlapped_t {
  136. ioqueue_base_overlapped_t base;
  137. ioqueue_on_recvfrom_callback on_recvfrom_callback;
  138. WSABUF wsabuf;
  139. struct sockaddr_in peer;
  140. int addrlen;
  141. DWORD dwFlags;
  142. }ioqueue_recvfrom_overlapped_t;
  143. typedef struct ioqueue_readfilesome_overlapped_t {
  144. ioqueue_base_overlapped_t base;
  145. ioqueue_on_read_callback on_read_callback;
  146. char *buf;
  147. HANDLE hevt;
  148. }ioqueue_readfilesome_overlapped_t;
  149. typedef struct ioqueue_readfilen_overlapped_t {
  150. ioqueue_base_overlapped_t base;
  151. ioqueue_on_read_callback on_read_callback;
  152. char *buf;
  153. HANDLE hevt;
  154. unsigned int recved_bytes;
  155. unsigned int total_bytes;
  156. }ioqueue_readfilen_overlapped_t;
  157. typedef struct ioqueue_writefilesome_overlapped_t {
  158. ioqueue_base_overlapped_t base;
  159. ioqueue_on_write_callback on_write_callback;
  160. HANDLE hevt;
  161. char *buf;
  162. }ioqueue_writefilesome_overlapped_t;
  163. typedef struct ioqueue_writefilen_overlapped_t {
  164. ioqueue_base_overlapped_t base;
  165. ioqueue_on_write_callback on_write_callback;
  166. char *buf;
  167. HANDLE hevt;
  168. unsigned int sended_bytes;
  169. unsigned int total_bytes;
  170. }ioqueue_writefilen_overlapped_t;
  171. typedef struct ioqueue_connectpipe_overlapped_t {
  172. ioqueue_base_overlapped_t base;
  173. HANDLE client;
  174. HANDLE hevt;
  175. ioqueue_on_pipe_accept_callback on_accept_callback;
  176. }ioqueue_connectpipe_overlapped_t;
  177. static int is_os_gte_xp() /* is os version greater and equal than xp */
  178. {
  179. static int yes = -1;
  180. #ifdef _WIN32
  181. if (yes == -1) {
  182. OSVERSIONINFO ver;
  183. ver.dwOSVersionInfoSize = sizeof(OSVERSIONINFO);
  184. GetVersionEx(&ver);
  185. if (ver.dwMajorVersion > 5 || (ver.dwMajorVersion == 5 && ver.dwMinorVersion > 0)) {
  186. yes = 1;
  187. }
  188. }
  189. #endif//_WIN32
  190. return yes;
  191. }
  192. static __inline LONG inc_msg_cnt(ioqueue_t *ioq)
  193. {
  194. return InterlockedIncrement(&ioq->msg_cnt);
  195. }
  196. static __inline LONG dec_msg_cnt(ioqueue_t *ioq)
  197. {
  198. return InterlockedDecrement(&ioq->msg_cnt);
  199. }
  200. static __inline void add_handler_list(ioqueue_handle_context *handle_ctx, ioqueue_t *ioq)
  201. {
  202. spinlock_enter(&ioq->handler_list_lock, -1);
  203. list_add(&handle_ctx->node, &ioq->handler_list);
  204. spinlock_leave(&ioq->handler_list_lock);
  205. }
  206. static __inline void del_handler_list(ioqueue_handle_context *handle_ctx, ioqueue_t *ioq)
  207. {
  208. if (handle_ctx->node.next) {
  209. spinlock_enter(&ioq->handler_list_lock, -1);
  210. list_del(&handle_ctx->node);
  211. handle_ctx->node.next = handle_ctx->node.prev = NULL;
  212. spinlock_leave(&ioq->handler_list_lock);
  213. }
  214. }
  215. static void ioqueue_handle_context_free(ioqueue_handle_context *handle_ctx)
  216. {
  217. if (handle_ctx->type == HANDLE_TYPE_UDPSOCK
  218. || handle_ctx->type == HANDLE_TYPE_TCPSOCK
  219. || handle_ctx->type == HANDLE_TYPE_ACCEPTOR) {
  220. if (handle_ctx->u.sock != INVALID_SOCKET) {
  221. closesocket(handle_ctx->u.sock);
  222. handle_ctx->u.sock = INVALID_SOCKET;
  223. }
  224. } else if (handle_ctx->type == HANDLE_TYPE_FILE) {
  225. if (handle_ctx->u.file != INVALID_HANDLE_VALUE) {
  226. CloseHandle(handle_ctx->u.file);
  227. handle_ctx->u.file = INVALID_HANDLE_VALUE;
  228. }
  229. } else if (handle_ctx->type == HANDLE_TYPE_PIPEACCEPTOR) {
  230. if (handle_ctx->u.pipe_name) {
  231. free(handle_ctx->u.pipe_name);
  232. handle_ctx->u.pipe_name = NULL;
  233. }
  234. } else {
  235. assert(0);
  236. return;
  237. }
  238. del_handler_list(handle_ctx, handle_ctx->owner);
  239. }
  240. IMPLEMENT_REF_COUNT_MT(ioqueue_handle_context, ioqueue_handle_context, pending_ios, ioqueue_handle_context_free)
  241. static __inline LONG inc_pending_io(ioqueue_handle_context *handle_ctx)
  242. {
  243. return inc_ref(ioqueue_handle_context, handle_ctx);
  244. }
  245. static __inline LONG dec_pending_io(ioqueue_handle_context *handle_ctx)
  246. {
  247. return dec_ref(ioqueue_handle_context, handle_ctx);
  248. }
  249. static SOCKET new_socket()
  250. {
  251. SOCKET sock = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP,
  252. NULL, 0, WSA_FLAG_OVERLAPPED);
  253. if (sock != INVALID_SOCKET) {
  254. reuse_addr(sock);
  255. }
  256. return sock;
  257. }
  258. static void delete_socket(SOCKET sock)
  259. {
  260. LINGER l;
  261. l.l_onoff = 1;
  262. l.l_linger = 0;
  263. setsockopt(sock, SOL_SOCKET, SO_LINGER, (char*)&l, sizeof(l));
  264. closesocket(sock);
  265. }
  266. TOOLKIT_API ioqueue_t *ioqueue_create()
  267. {
  268. ioqueue_t *ioq = ZALLOC_T(ioqueue_t);
  269. if (!ioq)
  270. return NULL;
  271. ioq->ep = evtpoll_create();
  272. if (!ioq->ep) {
  273. goto on_error_0;
  274. }
  275. if (timer_heap_create(&ioq->tm_queue) != 0) {
  276. goto on_error_3;
  277. }
  278. spinlock_init(&ioq->tm_queue_lock);
  279. spinlock_init(&ioq->connect_list_lock);
  280. INIT_LIST_HEAD(&ioq->connect_list);
  281. spinlock_init(&ioq->handler_list_lock);
  282. INIT_LIST_HEAD(&ioq->handler_list);
  283. return ioq;
  284. on_error_3:
  285. evtpoll_destroy(ioq->ep);
  286. on_error_0:
  287. free(ioq);
  288. return NULL;
  289. }
  290. TOOLKIT_API void ioqueue_destroy(ioqueue_t *ioq)
  291. {
  292. assert(ioq);
  293. assert(ioqueue_handler_empty(ioq));
  294. assert(ioqueue_msg_empty(ioq));
  295. timer_queue_destroy(ioq->tm_queue);
  296. evtpoll_destroy(ioq->ep);
  297. free(ioq);
  298. }
  299. TOOLKIT_API int ioqueue_handler_empty(ioqueue_t *ioq)
  300. {
  301. int ret;
  302. assert(ioq);
  303. spinlock_enter(&ioq->handler_list_lock, -1);
  304. ret = list_empty(&ioq->handler_list);
  305. spinlock_leave(&ioq->handler_list_lock);
  306. return ret;
  307. }
  308. TOOLKIT_API int ioqueue_msg_empty(ioqueue_t *ioq)
  309. {
  310. assert(ioq);
  311. return ioq->msg_cnt == 0;
  312. }
  313. TOOLKIT_API int ioqueue_msg_add_handler(ioqueue_t *ioq, int msg_type, int priority, ioqueue_on_msg_callback cb)
  314. {
  315. assert(ioq);
  316. assert(cb);
  317. assert(msg_type >= 0 && msg_type <= MAX_MSG);
  318. assert(priority >= 0 && priority <= MAX_MSG_PRIORITY);
  319. ioq->msg_handlers[msg_type][priority] = cb;
  320. return 0;
  321. }
  322. TOOLKIT_API int ioqueue_msg_remove_handler(ioqueue_t *ioq, int msg_type, int priority)
  323. {
  324. assert(ioq);
  325. assert(msg_type >= 0 && msg_type <= MAX_MSG);
  326. assert(priority >= 0 && priority <= MAX_MSG_PRIORITY);
  327. ioq->msg_handlers[msg_type][priority] = NULL;
  328. return 0;
  329. }
  330. static void dispatch_acceptor(int err,
  331. DWORD dwBytesTransfer,
  332. ioqueue_acceptor_t *acceptor,
  333. ioqueue_accept_overlapped_t *overlapped)
  334. {
  335. ioqueue_t *ioq = acceptor->owner;
  336. if (err == 0) {
  337. /* only valid for winxp or later, ignore return value */
  338. setsockopt(overlapped->client, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
  339. (char*)&acceptor->u.sock, sizeof(SOCKET));
  340. } else {
  341. delete_socket(overlapped->client);
  342. overlapped->client = INVALID_SOCKET;
  343. }
  344. {
  345. SOCKET s = overlapped->client;
  346. int accepted = overlapped->on_accept_callback(acceptor, (ioqueue_overlapped_t*)overlapped, s,
  347. overlapped->base.user_data, err);
  348. if (!err && !accepted && s != INVALID_SOCKET)
  349. delete_socket(s);
  350. }
  351. }
  352. static void dispatch_pipe_acceptor(int err,
  353. DWORD dwBytesTransfer,
  354. ioqueue_pipe_acceptor_t *acceptor,
  355. ioqueue_connectpipe_overlapped_t *overlapped)
  356. {
  357. ioqueue_t *ioq = acceptor->owner;
  358. int accepted;
  359. CloseHandle(overlapped->hevt);
  360. overlapped->hevt = NULL;
  361. if (!err && overlapped->client == INVALID_HANDLE_VALUE)
  362. err = -1;
  363. if (err) {
  364. if (overlapped->client != INVALID_HANDLE_VALUE) {
  365. CloseHandle(overlapped->client);
  366. overlapped->client = INVALID_HANDLE_VALUE;
  367. }
  368. }
  369. accepted = overlapped->on_accept_callback(acceptor,
  370. (ioqueue_overlapped_t*)overlapped,
  371. overlapped->client,
  372. overlapped->base.user_data, err);
  373. if (!err && !accepted && overlapped->client != INVALID_HANDLE_VALUE) {
  374. CloseHandle(overlapped->client);
  375. }
  376. }
  377. static int pre_dispatch_network(BOOL* ret, DWORD* dwBytesTransfer, ioqueue_overlapped_t* io_ctx, int epfd)
  378. {
  379. int err = 0;
  380. ioqueue_base_overlapped_t* base_ov = (ioqueue_base_overlapped_t*)io_ctx;
  381. ioqueue_handle_context* handle_ctx = base_ov->handle_ctx;
  382. if (ret) *ret = FALSE;
  383. switch (handle_ctx->type) {
  384. case HANDLE_TYPE_ACCEPTOR:
  385. {
  386. SOCKET conn_socket;
  387. struct sockaddr_in addr;
  388. socklen_t addrlen;
  389. assert(((ioqueue_accept_overlapped_t*)io_ctx)->client == INVALID_SOCKET);
  390. addrlen = sizeof(addr);
  391. bzero(&addr, addrlen);
  392. conn_socket = accept(handle_ctx->u.sock, (struct sockaddr*) & addr, &addrlen);
  393. if (conn_socket == -1) {
  394. WLog_ERR(TAG, "accept connect socket failed: %d", errno);
  395. return -1;
  396. }
  397. WLog_INFO(TAG, "new connected socket fd(%d) arrived at listen socket: %d from %s:%d",
  398. conn_socket, handle_ctx->u.sock, inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
  399. ((ioqueue_accept_overlapped_t*)io_ctx)->client = conn_socket;
  400. if (ret) *ret = TRUE;
  401. if (dwBytesTransfer) *dwBytesTransfer = 0;
  402. err = 1;
  403. }
  404. break;
  405. case HANDLE_TYPE_PIPEACCEPTOR:
  406. break;
  407. case HANDLE_TYPE_TCPSOCK:
  408. case HANDLE_TYPE_UDPSOCK:
  409. case HANDLE_TYPE_FILE:
  410. switch (base_ov->type) {
  411. case OV_CONNECT: //暂时没有看到该事件的使用
  412. break;
  413. case OV_SENDSOME: {
  414. }
  415. break;
  416. case OV_SENDN: {
  417. int n, nwrite;
  418. ioqueue_recvn_overlapped_t* overlapped = (ioqueue_recvn_overlapped_t*)io_ctx;
  419. ioqueue_tcpsock_t* tcpsock = overlapped->base.handle_ctx;
  420. n = 0;
  421. for (; n < overlapped->wsabuf.len; ) {
  422. nwrite = write(tcpsock->u.sock, overlapped->wsabuf.buf + n, overlapped->wsabuf.len - n);
  423. if (nwrite < 0) {
  424. if (errno == EAGAIN || errno == EINTR) {
  425. nwrite = 0;
  426. }
  427. else {
  428. WLog_ERR(TAG, "write error: %d", errno);
  429. break;
  430. }
  431. }
  432. else if (nwrite == 0) {
  433. WLog_WARN(TAG, "write not specified: %d", errno);
  434. //TODO: 对端可能已经关闭了
  435. }
  436. else {
  437. n += nwrite;
  438. }
  439. }
  440. if (nwrite < 0) {
  441. WLog_WARN(TAG, "write op occurs exception");
  442. }
  443. WLog_INFO(TAG, "<OV_SENDN>: total_write: %d/%d, cur_send: %d", n, overlapped->wsabuf.len, nwrite);
  444. if (ret) *ret = nwrite < 0 ? FALSE : TRUE;
  445. if (dwBytesTransfer) *dwBytesTransfer = n;
  446. err = 1;
  447. }
  448. break;
  449. case OV_RECVSOME: {
  450. }
  451. break;
  452. case OV_RECVUNTIL: {
  453. }
  454. break;
  455. case OV_RECVN: {
  456. int n, nread;
  457. ioqueue_recvn_overlapped_t* overlapped = (ioqueue_recvn_overlapped_t*)io_ctx;
  458. ioqueue_tcpsock_t* tcpsock = overlapped->base.handle_ctx;
  459. n = 0;
  460. for (; n < overlapped->wsabuf.len;) {
  461. nread = read(tcpsock->u.sock, overlapped->wsabuf.buf + n, overlapped->wsabuf.len - n);
  462. if (nread < 0) {
  463. if (errno == EAGAIN) {
  464. /* that means we have read all data. So go back to the main loop. */
  465. nread = 0;
  466. }
  467. else if (errno == EINTR) {
  468. continue;
  469. }
  470. else {
  471. WLog_ERR(TAG, "read error: %d", errno);
  472. }
  473. break;
  474. }
  475. else if (nread == 0) {
  476. break;
  477. }
  478. else {
  479. n += nread;
  480. }
  481. }
  482. if (nread < 0) {
  483. WLog_WARN(TAG, "read op occurs exception");
  484. }
  485. WLog_INFO(TAG, "<OV_RECVN>: total_recv: %d/%d, cur_recv: %d", n, overlapped->wsabuf.len, nread);
  486. if (ret) *ret = nread < 0 ? FALSE : TRUE;
  487. if (dwBytesTransfer) *dwBytesTransfer = n;
  488. err = 1;
  489. }
  490. break;
  491. case OV_SENDTO: {
  492. }
  493. break;
  494. case OV_RECVFROM: {
  495. }
  496. break;
  497. case OV_READFILESOME: {
  498. }
  499. break;
  500. case OV_READFILEN: {
  501. }
  502. break;
  503. case OV_WRITEFILESOME: {
  504. }
  505. break;
  506. case OV_WRITEFILEN: {
  507. }
  508. break;
  509. }
  510. break;
  511. default:
  512. assert(0);
  513. break;
  514. }
  515. return err;
  516. }
  517. static void dispatch_network(BOOL ret, DWORD dwBytesTransfer, ioqueue_overlapped_t *io_ctx)
  518. {
  519. int err = ret ? 0 : -1;
  520. ioqueue_base_overlapped_t *base_ov = (ioqueue_base_overlapped_t*)io_ctx;
  521. ioqueue_handle_context *handle_ctx = base_ov->handle_ctx;
  522. fastlock_enter(handle_ctx->ov_pending_list_lock);
  523. list_del(&base_ov->pending_entry);
  524. fastlock_leave(handle_ctx->ov_pending_list_lock);
  525. dec_pending_io(handle_ctx);
  526. switch (handle_ctx->type) {
  527. case HANDLE_TYPE_ACCEPTOR:
  528. dispatch_acceptor(err, dwBytesTransfer, handle_ctx, (ioqueue_accept_overlapped_t*)io_ctx);
  529. break;
  530. case HANDLE_TYPE_PIPEACCEPTOR:
  531. dispatch_pipe_acceptor(err, dwBytesTransfer, handle_ctx, (ioqueue_connectpipe_overlapped_t*)io_ctx);
  532. break;
  533. case HANDLE_TYPE_TCPSOCK:
  534. case HANDLE_TYPE_UDPSOCK:
  535. case HANDLE_TYPE_FILE:
  536. switch (base_ov->type) {
  537. case OV_CONNECT: {
  538. ioqueue_connect_overlapped_t *overlapped = (ioqueue_connect_overlapped_t*)io_ctx;
  539. if (err == 0) {
  540. setsockopt(handle_ctx->u.sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0);
  541. }
  542. overlapped->on_connect_callback(handle_ctx, io_ctx, base_ov->user_data, err);
  543. }
  544. break;
  545. case OV_SENDSOME: {
  546. ioqueue_sendsome_overlapped_t *overlapped = (ioqueue_sendsome_overlapped_t*)io_ctx;
  547. overlapped->on_send_callback(handle_ctx, io_ctx,
  548. overlapped->wsabuf.buf, dwBytesTransfer, base_ov->user_data, err);
  549. }
  550. break;
  551. case OV_SENDN: {
  552. ioqueue_sendn_overlapped_t *overlapped = (ioqueue_sendn_overlapped_t*)io_ctx;
  553. overlapped->sended_bytes += dwBytesTransfer;
  554. if (err == 0 && overlapped->sended_bytes < overlapped->total_bytes) {
  555. ioqueue_tcpsock_t* tcpsock = overlapped->base.handle_ctx;
  556. overlapped->wsabuf.buf += dwBytesTransfer;
  557. overlapped->wsabuf.len -= dwBytesTransfer;
  558. inc_pending_io(handle_ctx);
  559. overlapped->base.ov.Internal = 0;
  560. overlapped->base.ov.InternalHigh = 0;
  561. overlapped->base.ov.Offset = 0;
  562. overlapped->base.ov.OffsetHigh = 0;
  563. WLog_WARN(TAG, "OV_SENDN: Must be due with this situation %d < %d", overlapped->sended_bytes, overlapped->total_bytes);
  564. evtpoll_subscribe(ioqueue_tcpsock_get_owned_ioqueue(tcpsock)->ep, EV_WRITE, tcpsock->u.sock, NULL, io_ctx);
  565. //if (rc != 0 && WSAGetLastError() != WSA_IO_PENDING) {
  566. // dec_pending_io(handle_ctx);
  567. // overlapped->on_send_callback(handle_ctx, io_ctx, overlapped->original_buf,
  568. // overlapped->sended_bytes, base_ov->user_data, -1);
  569. //}
  570. } else {
  571. ioqueue_tcpsock_t* tcpsock = overlapped->base.handle_ctx;
  572. evtpoll_unsubscribe(ioqueue_tcpsock_get_owned_ioqueue(tcpsock)->ep, EV_WRITE, tcpsock->u.sock, 0);
  573. overlapped->on_send_callback(handle_ctx, io_ctx, overlapped->original_buf,
  574. overlapped->sended_bytes, base_ov->user_data, err);
  575. }
  576. }
  577. break;
  578. case OV_RECVSOME: {
  579. ioqueue_recvsome_overlapped_t *overlapped = (ioqueue_recvsome_overlapped_t*)io_ctx;
  580. overlapped->on_recv_callback(handle_ctx, io_ctx, overlapped->wsabuf.buf,
  581. dwBytesTransfer, base_ov->user_data, err);
  582. }
  583. break;
  584. case OV_RECVUNTIL: {
  585. ioqueue_recvuntil_overlapped_t *overlapped = (ioqueue_recvuntil_overlapped_t*)io_ctx;
  586. if (err == 0 && dwBytesTransfer) {
  587. const char *pos;
  588. overlapped->recved_bytes += dwBytesTransfer;
  589. pos = memstr(overlapped->original_buf, overlapped->recved_bytes, overlapped->delimer);
  590. if (pos) {
  591. free(overlapped->delimer);
  592. overlapped->on_recvuntil_callback(handle_ctx, io_ctx, overlapped->original_buf,
  593. overlapped->recved_bytes, (int)strlen(overlapped->delimer)+(int)(pos-overlapped->original_buf),
  594. base_ov->user_data, err);
  595. } else if (overlapped->recved_bytes < overlapped->total_bytes) {
  596. DWORD bytesRead;
  597. int rc;
  598. overlapped->wsabuf.buf += dwBytesTransfer;
  599. overlapped->wsabuf.len -= dwBytesTransfer;
  600. inc_pending_io(handle_ctx);
  601. overlapped->base.ov.Internal = 0;
  602. overlapped->base.ov.InternalHigh = 0;
  603. overlapped->base.ov.Offset = 0;
  604. overlapped->base.ov.OffsetHigh = 0;
  605. overlapped->dwFlags = 0;
  606. rc = WSARecv(handle_ctx->u.sock, &overlapped->wsabuf, 1, &bytesRead, &overlapped->dwFlags,
  607. &overlapped->base.ov, NULL);
  608. if (rc != 0 && WSAGetLastError() != WSA_IO_PENDING) {
  609. dec_pending_io(handle_ctx);
  610. free(overlapped->delimer);
  611. overlapped->on_recvuntil_callback(handle_ctx, io_ctx, overlapped->original_buf,
  612. overlapped->recved_bytes, 0, base_ov->user_data, -1);
  613. }
  614. } else {
  615. free(overlapped->delimer);
  616. overlapped->on_recvuntil_callback(handle_ctx, io_ctx, overlapped->original_buf,
  617. overlapped->recved_bytes, 0, base_ov->user_data, -1);
  618. }
  619. } else {
  620. free(overlapped->delimer);
  621. overlapped->on_recvuntil_callback(handle_ctx, io_ctx, overlapped->original_buf,
  622. overlapped->recved_bytes, 0, base_ov->user_data, err);
  623. }
  624. }
  625. break;
  626. case OV_RECVN: {
  627. ioqueue_recvn_overlapped_t *overlapped = (ioqueue_recvn_overlapped_t*)io_ctx;
  628. overlapped->recved_bytes += dwBytesTransfer;
  629. if (err == 0 && overlapped->recved_bytes < overlapped->total_bytes) {
  630. ioqueue_tcpsock_t* tcpsock = overlapped->base.handle_ctx;
  631. overlapped->wsabuf.buf += dwBytesTransfer;
  632. overlapped->wsabuf.len -= dwBytesTransfer;
  633. inc_pending_io(handle_ctx);
  634. overlapped->base.ov.Internal = 0;
  635. overlapped->base.ov.InternalHigh = 0;
  636. overlapped->base.ov.Offset = 0;
  637. overlapped->base.ov.OffsetHigh = 0;
  638. overlapped->dwFlags = 0;
  639. WLog_WARN(TAG, "OV_RECVN: Must be due with this situation %d < %d", overlapped->recved_bytes, overlapped->total_bytes);
  640. evtpoll_subscribe(ioqueue_tcpsock_get_owned_ioqueue(tcpsock)->ep, EV_READ, tcpsock->u.sock, io_ctx, NULL);
  641. //if (rc != 0 && WSAGetLastError() != WSA_IO_PENDING) {
  642. // dec_pending_io(handle_ctx);
  643. // overlapped->on_recv_callback(handle_ctx, io_ctx, overlapped->original_buf,
  644. // overlapped->recved_bytes, base_ov->user_data, -1);
  645. //}
  646. } else {
  647. overlapped->on_recv_callback(handle_ctx, io_ctx, overlapped->original_buf,
  648. overlapped->recved_bytes, base_ov->user_data, err);
  649. }
  650. }
  651. break;
  652. case OV_SENDTO: {
  653. ioqueue_sendto_overlapped_t *overlapped = (ioqueue_sendto_overlapped_t*)io_ctx;
  654. overlapped->on_sendto_callback(handle_ctx, io_ctx, overlapped->wsabuf.buf,
  655. dwBytesTransfer, base_ov->user_data, err);
  656. }
  657. break;
  658. case OV_RECVFROM: {
  659. ioqueue_recvfrom_overlapped_t *overlapped = (ioqueue_recvfrom_overlapped_t*)io_ctx;
  660. overlapped->on_recvfrom_callback(handle_ctx, io_ctx, (struct sockaddr*)&overlapped->peer,
  661. overlapped->addrlen, overlapped->wsabuf.buf, dwBytesTransfer, base_ov->user_data, err);
  662. }
  663. break;
  664. case OV_READFILESOME: {
  665. ioqueue_readfilesome_overlapped_t *overlapped = (ioqueue_readfilesome_overlapped_t*)io_ctx;
  666. CloseHandle(overlapped->hevt);
  667. overlapped->hevt = NULL;
  668. overlapped->on_read_callback(handle_ctx, io_ctx, overlapped->buf,
  669. dwBytesTransfer, base_ov->user_data, err);
  670. }
  671. break;
  672. case OV_READFILEN: {
  673. ioqueue_readfilen_overlapped_t *overlapped = (ioqueue_readfilen_overlapped_t*)io_ctx;
  674. CloseHandle(overlapped->hevt);
  675. overlapped->hevt = NULL;
  676. overlapped->recved_bytes += dwBytesTransfer;
  677. if (err == 0 && overlapped->recved_bytes < overlapped->total_bytes) {
  678. BOOL ret;
  679. DWORD left = overlapped->total_bytes - overlapped->recved_bytes;
  680. inc_pending_io(handle_ctx);
  681. overlapped->base.ov.Internal = 0;
  682. overlapped->base.ov.InternalHigh = 0;
  683. overlapped->base.ov.Offset += dwBytesTransfer;
  684. if (overlapped->base.ov.Offset < dwBytesTransfer)
  685. overlapped->base.ov.OffsetHigh += 1;
  686. ret = ReadFile(handle_ctx->u.file, overlapped->buf+overlapped->recved_bytes, left, NULL, &overlapped->base.ov);
  687. if (!ret && GetLastError() != ERROR_IO_PENDING) {
  688. dec_pending_io(handle_ctx);
  689. overlapped->on_read_callback(handle_ctx, io_ctx, overlapped->buf,
  690. overlapped->recved_bytes, base_ov->user_data, -1);
  691. }
  692. } else {
  693. overlapped->on_read_callback(handle_ctx, io_ctx, overlapped->buf,
  694. overlapped->recved_bytes, base_ov->user_data, err);
  695. }
  696. }
  697. break;
  698. case OV_WRITEFILESOME: {
  699. ioqueue_writefilesome_overlapped_t *overlapped = (ioqueue_writefilesome_overlapped_t*)io_ctx;
  700. CloseHandle(overlapped->hevt);
  701. overlapped->hevt = NULL;
  702. overlapped->on_write_callback(handle_ctx, io_ctx, overlapped->buf, dwBytesTransfer,
  703. base_ov->user_data, err);
  704. }
  705. break;
  706. case OV_WRITEFILEN: {
  707. ioqueue_writefilen_overlapped_t *overlapped = (ioqueue_writefilen_overlapped_t*)io_ctx;
  708. CloseHandle(overlapped->hevt);
  709. overlapped->hevt = NULL;
  710. overlapped->sended_bytes += dwBytesTransfer;
  711. if (err == 0 && overlapped->sended_bytes < overlapped->total_bytes) {
  712. BOOL ret;
  713. DWORD left = overlapped->total_bytes - overlapped->sended_bytes;
  714. inc_pending_io(handle_ctx);
  715. overlapped->base.ov.Internal = 0;
  716. overlapped->base.ov.InternalHigh = 0;
  717. overlapped->base.ov.Offset += dwBytesTransfer;
  718. if (overlapped->base.ov.Offset < dwBytesTransfer)
  719. overlapped->base.ov.OffsetHigh += 1;
  720. ret = WriteFile(handle_ctx->u.file, overlapped->buf+overlapped->sended_bytes, left, NULL, &overlapped->base.ov);
  721. if (!ret && GetLastError() != ERROR_IO_PENDING) {
  722. dec_pending_io(handle_ctx);
  723. overlapped->on_write_callback(handle_ctx, io_ctx, overlapped->buf,
  724. overlapped->sended_bytes, base_ov->user_data, -1);
  725. }
  726. } else {
  727. overlapped->on_write_callback(handle_ctx, io_ctx, overlapped->buf,
  728. overlapped->sended_bytes, base_ov->user_data, err);
  729. }
  730. }
  731. break;
  732. default:
  733. assert(0);
  734. break;
  735. }
  736. break;
  737. default:
  738. assert(0);
  739. break;
  740. }
  741. }
  742. static void dispatch_msg(ioqueue_t *ioq, int msg_type, int param1, int param2, HANDLE evt)
  743. {
  744. int chain = 1, i;
  745. for (i = 0; chain && i < MAX_MSG_PRIORITY; ++i) {
  746. ioqueue_on_msg_callback cb = ioq->msg_handlers[msg_type][i];
  747. if (cb)
  748. chain = cb(msg_type, param1, param2);
  749. }
  750. if (evt)
  751. SetEvent(evt);
  752. }
  753. TOOLKIT_API int ioqueue_post_message(ioqueue_t *ioq, int msg_type, int param1, int param2)
  754. {
  755. ioqueue_msg *msg;
  756. assert(ioq);
  757. msg = MALLOC_T(ioqueue_msg);
  758. msg->msg_type = msg_type;
  759. msg->param1 = param1;
  760. msg->param2 = param2;
  761. msg->evt = NULL;
  762. inc_msg_cnt(ioq);
  763. if (!PostQueuedCompletionStatus(ioq->iocp, 0, (ULONG_PTR)msg, NULL)) {
  764. dec_msg_cnt(ioq);
  765. free(msg);
  766. return -1;
  767. }
  768. return 0;
  769. }
  770. TOOLKIT_API int ioqueue_send_message(ioqueue_t *ioq, int msg_type, int param1, int param2)
  771. {
  772. ioqueue_msg msg = {msg_type, param1, param2};
  773. assert(ioq);
  774. msg.evt = CreateEventA(NULL, TRUE, FALSE, NULL);
  775. inc_msg_cnt(ioq);
  776. if (!PostQueuedCompletionStatus(ioq->iocp, 0, (ULONG_PTR)&msg, NULL)) {
  777. CloseHandle(msg.evt);
  778. dec_msg_cnt(ioq);
  779. return -1;
  780. }
  781. WaitForSingleObject(msg.evt, INFINITE);
  782. CloseHandle(msg.evt);
  783. dec_msg_cnt(ioq);
  784. return 0;
  785. }
  786. static int poll_all_events(ioqueue_t *ioq, HANDLE *hevts, ioqueue_connect_overlapped_t **ovs, int i)
  787. {
  788. int count = 0;
  789. int left = i;
  790. int pos = 0;
  791. while (left > 0) {
  792. DWORD idx = WaitForMultipleObjects(left, &hevts[pos], FALSE, 0) - WAIT_OBJECT_0;
  793. if (idx <= (DWORD)left && idx >= 0) {
  794. WSANETWORKEVENTS net_events;
  795. ioqueue_connect_overlapped_t *triggered = ovs[idx+pos];
  796. list_del(&triggered->node);
  797. WSAEnumNetworkEvents(triggered->base.handle_ctx->u.sock, hevts[idx+pos], &net_events);
  798. WSAEventSelect(triggered->base.handle_ctx->u.sock, hevts[idx+pos], 0);
  799. CloseHandle(hevts[idx+pos]);
  800. triggered->on_connect_callback(triggered->base.handle_ctx, (ioqueue_overlapped_t*)triggered,
  801. triggered->base.user_data, net_events.iErrorCode[FD_CONNECT_BIT] == 0 ? 0 : -1);
  802. left -= (int)idx + 1;
  803. pos += idx+1;
  804. count ++;
  805. } else {
  806. break;
  807. }
  808. }
  809. return count;
  810. }
  811. static int poll_connect_list(ioqueue_t *ioq)
  812. {
  813. int count = 0, i = 0;
  814. HANDLE hevts[MAXIMUM_WAIT_OBJECTS];
  815. ioqueue_connect_overlapped_t *ovs[MAXIMUM_WAIT_OBJECTS];
  816. ioqueue_connect_overlapped_t *pos, *n;
  817. list_for_each_entry_safe(pos, n, &ioq->connect_list, ioqueue_connect_overlapped_t, node) {
  818. hevts[i] = pos->hevt;
  819. ovs[i] = pos;
  820. i++;
  821. if (i == MAXIMUM_WAIT_OBJECTS) {
  822. count += poll_all_events(ioq, hevts, ovs, i);
  823. i = 0;
  824. }
  825. }
  826. if (i > 0) {
  827. count += poll_all_events(ioq, hevts, ovs, i);
  828. }
  829. return count;
  830. }
  831. TOOLKIT_API void* ioqueue_set_user_data(ioqueue_t* ioq, void* user_data)
  832. {
  833. void* old;
  834. assert(ioq);
  835. old = ioq->user_data;
  836. ioq->user_data = user_data;
  837. return old;
  838. }
  839. TOOLKIT_API void* ioqueue_get_user_data(ioqueue_t* ioq)
  840. {
  841. assert(ioq);
  842. return ioq->user_data;
  843. }
  844. TOOLKIT_API int ioqueue_poll(ioqueue_t* q, int timeout)
  845. {
  846. ioqueue_t *ioq = (ioqueue_t*)q;
  847. int count = 0, t = 0;
  848. int nfds = 0, i = 0;
  849. static int flag = 0;
  850. bus_daemon_t* deamon;
  851. struct epoll_event events[MAX_EPOLL_EVENT];
  852. deamon = (bus_daemon_t*)ioqueue_get_user_data(ioq);
  853. assert(deamon != NULL);
  854. /* network and msg, dispatch until no events */
  855. do
  856. {
  857. BOOL ret;
  858. DWORD dwBytesTransfer = 0;
  859. //有时会出现惊群的问题!!
  860. int epfd = evtpoll_get_epoll_fd(ioq->ep);
  861. nfds = epoll_wait(epfd, events, MAX_EPOLL_EVENT, t ? 0 : timeout);
  862. if ((nfds < 0 && EINTR != errno) || nfds == 0) {
  863. t = 0;
  864. }
  865. for (i = 0; i < nfds; ++i) {
  866. int n = 1;
  867. WLog_INFO(TAG, "poll events[%d] OUT:%d, IN:%d", i
  868. ,events[i].events & EPOLLOUT ? 1: 0
  869. ,events[i].events & EPOLLIN ? 1: 0);
  870. if (events[i].events & EPOLLOUT) {
  871. flag = 1;
  872. }
  873. while (n >= 0) {
  874. ioqueue_overlapped_t* io_ctx = NULL;
  875. n = evtpoll_deal(ioq->ep, &events[i], &io_ctx);
  876. if (n > 0) {
  877. assert(io_ctx);
  878. pre_dispatch_network(&ret, &dwBytesTransfer, io_ctx, epfd);
  879. dispatch_network(ret, dwBytesTransfer, io_ctx);
  880. t++;
  881. count++;
  882. }
  883. }
  884. }
  885. if (nfds < 0) {
  886. WLog_ERR(TAG, "nfds < 0, error: %s(%d)", strerror(errno), errno);
  887. break;
  888. }
  889. } while (t > 0);
  890. /* win2k connect event */
  891. if (!is_os_gte_xp()) {
  892. spinlock_enter(&ioq->connect_list_lock, -1);
  893. poll_connect_list(ioq);
  894. spinlock_leave(&ioq->connect_list_lock);
  895. }
  896. /* timer heap */
  897. spinlock_enter(&ioq->tm_queue_lock, -1);
  898. count += timer_queue_poll(ioq->tm_queue, NULL); /* dispatch timer heap */
  899. spinlock_leave(&ioq->tm_queue_lock);
  900. if (ioq->stop == -1) {
  901. if (InterlockedCompareExchange(&ioq->stop, -2, -1) == -1) { /* close all handler */
  902. ioqueue_handle_context *pos, *n;
  903. spinlock_enter(&ioq->handler_list_lock, -1);
  904. list_for_each_entry_safe(pos, n, &ioq->handler_list, ioqueue_handle_context, node) {
  905. if (pos->type != HANDLE_TYPE_FILE) {
  906. closesocket(pos->u.sock);
  907. pos->u.sock = INVALID_SOCKET;
  908. } else {
  909. CloseHandle(pos->u.file);
  910. pos->u.file = INVALID_HANDLE_VALUE;
  911. }
  912. }
  913. spinlock_leave(&ioq->handler_list_lock);
  914. }
  915. }
  916. return count;
  917. }
  918. TOOLKIT_API void ioqueue_stop(ioqueue_t *ioq)
  919. {
  920. assert(ioq);
  921. ioq->stop = -1;
  922. }
  923. /* timer */
  924. TOOLKIT_API int ioqueue_timer_schedule(ioqueue_t *ioq, timer_entry *entry, unsigned int delay)
  925. {
  926. int err;
  927. assert(ioq);
  928. assert(entry);
  929. if (ioq->stop)
  930. return -1;
  931. spinlock_enter(&ioq->tm_queue_lock, -1);
  932. err = timer_queue_schedule(ioq->tm_queue, entry, delay);
  933. spinlock_leave(&ioq->tm_queue_lock);
  934. return err;
  935. }
  936. TOOLKIT_API int ioqueue_timer_cancel(ioqueue_t *ioq, timer_entry *entry, int cancel)
  937. {
  938. int err;
  939. assert(ioq);
  940. assert(entry);
  941. spinlock_enter(&ioq->tm_queue_lock, -1);
  942. err = timer_queue_cancel(ioq->tm_queue, entry, cancel);
  943. spinlock_leave(&ioq->tm_queue_lock);
  944. return err;
  945. }
  946. /* acceptor */
  947. TOOLKIT_API int ioqueue_acceptor_create(ioqueue_t *ioq,
  948. const char *ip,
  949. unsigned short port,
  950. ioqueue_acceptor_t* acceptor)
  951. {
  952. WLog_INFO(TAG, "==> %s", __FUNCTION__);
  953. struct sockaddr_in service = {0};
  954. assert(ioq);
  955. assert(acceptor);
  956. assert(port);
  957. if (ioq->stop) {
  958. WLog_INFO(TAG, "<== %s", __FUNCTION__);
  959. return -1;
  960. }
  961. memset(acceptor, 0, sizeof(ioqueue_acceptor_t));
  962. /*Warning: only the front third params are effective !!*/
  963. acceptor->u.sock = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
  964. if (acceptor->u.sock == INVALID_SOCKET) {
  965. WLog_ERR(TAG, "accept socket create failed: %s", strerror(errno));
  966. goto on_error;
  967. }
  968. nonblock_sock(acceptor->u.sock);
  969. service.sin_family = AF_INET;
  970. service.sin_port = htons(port);
  971. service.sin_addr.s_addr = ip ? inet_addr(ip) : htonl(INADDR_ANY);
  972. if (bind(acceptor->u.sock, (struct sockaddr*)&service, sizeof(struct sockaddr)) != 0)
  973. goto on_error;
  974. if (evtpoll_attach(ioq->ep, acceptor->u.sock)) {
  975. goto on_error;
  976. }
  977. acceptor->type = HANDLE_TYPE_ACCEPTOR;
  978. acceptor->owner = ioq;
  979. fastlock_init(acceptor->ov_pending_list_lock);
  980. INIT_LIST_HEAD(&acceptor->ov_pending_list);
  981. add_handler_list(acceptor, ioq);
  982. inc_ref(ioqueue_handle_context, acceptor);
  983. WLog_INFO(TAG, "<== normal %s", __FUNCTION__);
  984. return 0;
  985. on_error:
  986. if (acceptor->u.sock != INVALID_SOCKET)
  987. closesocket(acceptor->u.sock);
  988. WLog_INFO(TAG, "<== error %s", __FUNCTION__);
  989. return -1;
  990. }
  991. TOOLKIT_API int ioqueue_acceptor_listen(ioqueue_acceptor_t* acceptor, int backlog)
  992. {
  993. assert(acceptor);
  994. return listen(acceptor->u.sock, backlog);
  995. }
  996. TOOLKIT_API void ioqueue_acceptor_destroy(ioqueue_acceptor_t* acceptor)
  997. {
  998. assert(acceptor);
  999. dec_ref(ioqueue_handle_context, acceptor);
  1000. }
  1001. TOOLKIT_API void ioqueue_acceptor_close(ioqueue_acceptor_t* acceptor)
  1002. {
  1003. SOCKET s;
  1004. assert(acceptor);
  1005. s = acceptor->u.sock;
  1006. if (s != INVALID_SOCKET) {
  1007. acceptor->u.sock = INVALID_SOCKET;
  1008. closesocket(s);
  1009. }
  1010. }
  1011. TOOLKIT_API int ioqueue_acceptor_async_accept(ioqueue_acceptor_t* acceptor,
  1012. ioqueue_overlapped_t *ov,
  1013. ioqueue_on_accept_callback on_accept_callback,
  1014. void *user_data)
  1015. {
  1016. ioqueue_t *ioq;
  1017. ioqueue_accept_overlapped_t *overlapped;
  1018. DWORD bytesTransfer;
  1019. BOOL ret;
  1020. assert(acceptor);
  1021. assert(ov);
  1022. assert(acceptor->type == HANDLE_TYPE_ACCEPTOR);
  1023. assert(on_accept_callback);
  1024. ioq = acceptor->owner;
  1025. if (ioq->stop)
  1026. return -1;
  1027. overlapped = (ioqueue_accept_overlapped_t*)ov;
  1028. memset(overlapped, 0, sizeof(ioqueue_accept_overlapped_t));
  1029. overlapped->client = INVALID_SOCKET;
  1030. fastlock_enter(acceptor->ov_pending_list_lock);
  1031. list_add_tail(&overlapped->base.pending_entry, &acceptor->ov_pending_list);
  1032. fastlock_leave(acceptor->ov_pending_list_lock);
  1033. overlapped->base.type = OV_ACCEPT;
  1034. overlapped->base.user_data = user_data;
  1035. overlapped->base.handle_ctx = acceptor;
  1036. inc_pending_io(acceptor);
  1037. overlapped->on_accept_callback = on_accept_callback;
  1038. ret = evtpoll_subscribe(ioq->ep, EV_ACCEPT, acceptor->u.sock, ov, NULL);
  1039. if (ret == 0) {
  1040. WLog_INFO(TAG, "acceptor subscribes accept event succ.");
  1041. return 0;
  1042. }
  1043. WLog_ERR(TAG, "acceptor: epoll_ctl failed error: %d ", ret);
  1044. fastlock_enter(acceptor->ov_pending_list_lock);
  1045. list_del(&overlapped->base.pending_entry);
  1046. fastlock_leave(acceptor->ov_pending_list_lock);
  1047. dec_pending_io(acceptor);
  1048. return -1;
  1049. }
  1050. TOOLKIT_API int ioqueue_acceptor_accept(ioqueue_acceptor_t* acceptor, SOCKET *s, struct sockaddr *addr, int *addrlen, int timeout)
  1051. {
  1052. struct timeval tm;
  1053. fd_set set;
  1054. fd_set ex_set;
  1055. int rc;
  1056. FD_ZERO(&set);
  1057. FD_ZERO(&ex_set);
  1058. FD_SET(acceptor->u.sock, &set);
  1059. FD_SET(acceptor->u.sock, &ex_set);
  1060. tm.tv_sec = timeout / 1000;
  1061. tm.tv_usec = 1000 * (timeout % 1000);
  1062. rc = select(acceptor->u.sock+1, &set, NULL, &ex_set, &tm);
  1063. if (rc > 0) {
  1064. if (FD_ISSET(acceptor->u.sock, &ex_set))
  1065. return -1;
  1066. if (FD_ISSET(acceptor->u.sock, &set)) {
  1067. SOCKET fd = accept(acceptor->u.sock, addr, addrlen);
  1068. if (fd != INVALID_SOCKET) {
  1069. *s = fd;
  1070. return 0;
  1071. }
  1072. }
  1073. }
  1074. return -1;
  1075. }
  1076. TOOLKIT_API int ioqueue_acceptor_create_client(ioqueue_acceptor_t* acceptor, SOCKET s, ioqueue_tcpsock_t *tcpsock)
  1077. {
  1078. ioqueue_t *ioq;
  1079. assert(acceptor);
  1080. assert(tcpsock);
  1081. assert(s != INVALID_SOCKET);
  1082. ioq = acceptor->owner;
  1083. if (ioq->stop)
  1084. return -1;
  1085. memset(tcpsock, 0, sizeof(ioqueue_tcpsock_t));
  1086. tcpsock->type = HANDLE_TYPE_TCPSOCK;
  1087. tcpsock->u.sock = s;
  1088. tcpsock->owner = ioq;
  1089. tcpsock->user_data = NULL;
  1090. fastlock_init(tcpsock->ov_pending_list_lock);
  1091. INIT_LIST_HEAD(&tcpsock->ov_pending_list);
  1092. if (0 != evtpoll_attach(ioq->ep, s)) {
  1093. return -1;
  1094. }
  1095. add_handler_list(tcpsock, ioq);
  1096. inc_ref(ioqueue_handle_context, tcpsock);
  1097. return 0;
  1098. }
  1099. TOOLKIT_API SOCKET ioqueue_acceptor_get_raw_socket(ioqueue_acceptor_t* acceptor)
  1100. {
  1101. assert(acceptor);
  1102. assert(acceptor->type == HANDLE_TYPE_ACCEPTOR);
  1103. assert(acceptor->u.sock != INVALID_SOCKET);
  1104. return acceptor->u.sock;
  1105. }
  1106. TOOLKIT_API ioqueue_t* ioqueue_acceptor_get_owned_ioqueue(ioqueue_acceptor_t* acceptor)
  1107. {
  1108. assert(acceptor);
  1109. assert(acceptor->type == HANDLE_TYPE_ACCEPTOR);
  1110. return acceptor->owner;
  1111. }
  1112. TOOLKIT_API void *ioqueue_acceptor_set_user_data(ioqueue_acceptor_t* acceptor, void *user_data)
  1113. {
  1114. void *old;
  1115. assert(acceptor);
  1116. assert(acceptor->type == HANDLE_TYPE_ACCEPTOR);
  1117. old = acceptor->user_data;
  1118. acceptor->user_data = user_data;
  1119. return old;
  1120. }
  1121. TOOLKIT_API void *ioqueue_acceptor_get_user_data(ioqueue_acceptor_t* acceptor)
  1122. {
  1123. assert(acceptor);
  1124. assert(acceptor->type == HANDLE_TYPE_ACCEPTOR);
  1125. return acceptor->user_data;
  1126. }
  1127. TOOLKIT_API int ioqueue_acceptor_cancel(ioqueue_acceptor_t* acceptor)
  1128. {
  1129. assert(acceptor);
  1130. return CancelIo(acceptor->u.file) ? 0 : -1;
  1131. }
  1132. /* tcpsock */
  1133. TOOLKIT_API int ioqueue_tcpsock_create(ioqueue_t *ioq, ioqueue_tcpsock_t *tcpsock)
  1134. {
  1135. SOCKET s;
  1136. assert(ioq);
  1137. assert(tcpsock);
  1138. if (ioq->stop)
  1139. return -1;
  1140. s = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
  1141. if (s == INVALID_SOCKET)
  1142. return -1;
  1143. if (ioqueue_tcpsock_create_from_handle(ioq, s, tcpsock) != 0) {
  1144. closesocket(s);
  1145. return -1;
  1146. }
  1147. return 0;
  1148. }
  1149. TOOLKIT_API int ioqueue_tcpsock_create_from_handle(ioqueue_t *ioq, SOCKET s, ioqueue_tcpsock_t *tcpsock)
  1150. {
  1151. assert(ioq);
  1152. assert(s != INVALID_SOCKET);
  1153. assert(tcpsock);
  1154. if (ioq->stop)
  1155. return -1;
  1156. memset(tcpsock, 0, sizeof(ioqueue_tcpsock_t));
  1157. tcpsock->u.sock = s;
  1158. reuse_addr(tcpsock->u.sock);
  1159. nonblock_sock(tcpsock->u.sock);
  1160. /* winxp or more we use ConnectEx, this funtion need bind at first */
  1161. if (is_os_gte_xp()) {
  1162. struct sockaddr_in local = {0};
  1163. local.sin_family = AF_INET;
  1164. local.sin_port = htons(0);
  1165. local.sin_addr.s_addr = INADDR_ANY;
  1166. if (bind(tcpsock->u.sock, (struct sockaddr*)&local, sizeof(struct sockaddr)) != 0)
  1167. return -1;
  1168. } else {
  1169. /* for win2k we use connect, set socket to non-block mode */
  1170. //u_long ul_onoff = 1;
  1171. //if (ioctlsocket(tcpsock->u.sock, FIONBIO, &ul_onoff) != 0)
  1172. // goto on_error;
  1173. }
  1174. if (!CreateIoCompletionPort((HANDLE)tcpsock->u.sock, ioq->iocp, 0, 0))
  1175. return -1;
  1176. fastlock_init(tcpsock->ov_pending_list_lock);
  1177. INIT_LIST_HEAD(&tcpsock->ov_pending_list);
  1178. tcpsock->type = HANDLE_TYPE_TCPSOCK;
  1179. tcpsock->owner = ioq;
  1180. add_handler_list(tcpsock, ioq);
  1181. inc_ref(ioqueue_handle_context, tcpsock);
  1182. return 0;
  1183. }
  1184. TOOLKIT_API int ioqueue_tcpsock_async_connect(ioqueue_tcpsock_t *tcpsock,
  1185. ioqueue_overlapped_t *ov,
  1186. const char *ip,
  1187. unsigned short port,
  1188. ioqueue_on_connect_callback on_connect_callback,
  1189. void* user_data)
  1190. {
  1191. ioqueue_t *ioq;
  1192. ioqueue_connect_overlapped_t *overlapped;
  1193. struct sockaddr_in service;
  1194. assert(tcpsock);
  1195. assert(ov);
  1196. assert(ip);
  1197. assert(port);
  1198. assert(on_connect_callback);
  1199. ioq = tcpsock->owner;
  1200. if (ioq->stop)
  1201. return -1;
  1202. ioq = tcpsock->owner;
  1203. overlapped = (ioqueue_connect_overlapped_t*)ov;
  1204. memset(overlapped, 0, sizeof(ioqueue_connect_overlapped_t));
  1205. fastlock_enter(tcpsock->ov_pending_list_lock);
  1206. list_add_tail(&overlapped->base.pending_entry, &tcpsock->ov_pending_list);
  1207. fastlock_leave(tcpsock->ov_pending_list_lock);
  1208. overlapped->base.type = OV_CONNECT;
  1209. overlapped->base.handle_ctx = (ioqueue_handle_context*)tcpsock;
  1210. overlapped->base.user_data = user_data;
  1211. overlapped->on_connect_callback = on_connect_callback;
  1212. inc_pending_io(tcpsock);
  1213. if (is_os_gte_xp()) { /* use ConnectEx */
  1214. DWORD dwBytes;
  1215. BOOL ret;
  1216. BOOL (PASCAL FAR * lpfnConnectEx) (IN SOCKET s,
  1217. IN const struct sockaddr FAR *name,
  1218. IN int namelen,
  1219. IN PVOID lpSendBuffer OPTIONAL,
  1220. IN DWORD dwSendDataLength,
  1221. OUT LPDWORD lpdwBytesSent,
  1222. IN LPOVERLAPPED lpOverlapped
  1223. );
  1224. // LPFN_CONNECTEX lpfnConnectEx;
  1225. GUID GuidConnectEx = WSAID_CONNECTEX;
  1226. if (WSAIoctl(tcpsock->u.sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &GuidConnectEx,
  1227. sizeof(GuidConnectEx), &lpfnConnectEx, sizeof(lpfnConnectEx), &dwBytes, NULL, NULL) != 0) {
  1228. fastlock_enter(tcpsock->ov_pending_list_lock);
  1229. list_del(&overlapped->base.pending_entry);
  1230. fastlock_leave(tcpsock->ov_pending_list_lock);
  1231. dec_pending_io(tcpsock);
  1232. return -1;
  1233. }
  1234. memset(&service, 0, sizeof(service));
  1235. service.sin_family = AF_INET;
  1236. service.sin_port = htons(port);
  1237. service.sin_addr.s_addr = inet_addr(ip);
  1238. {
  1239. struct sockaddr_in local_addr = {0}; // bind to a INADDR_ANY and port 0 to let OS choose an local address
  1240. local_addr.sin_family = AF_INET;
  1241. local_addr.sin_addr.s_addr = htonl(INADDR_ANY);
  1242. local_addr.sin_port = htons(0);
  1243. ret = bind(tcpsock->u.sock, (SOCKADDR*)&local_addr, sizeof(local_addr)); // caution: ConnectEx need socket to be bounded at first
  1244. }
  1245. if (ret == 0) {
  1246. ret = lpfnConnectEx(tcpsock->u.sock, (struct sockaddr*)&service, sizeof(service), NULL,
  1247. 0, NULL, &overlapped->base.ov);
  1248. if (ret || WSAGetLastError() == WSA_IO_PENDING)
  1249. return 0;
  1250. }
  1251. } else { /* use non-blocking connect */
  1252. overlapped->hevt = WSACreateEvent();
  1253. if (WSAEventSelect(tcpsock->u.sock, overlapped->hevt, FD_CONNECT) == 0) {
  1254. spinlock_enter(&ioq->connect_list_lock, -1);
  1255. list_add_tail(&overlapped->node, &ioq->connect_list);
  1256. spinlock_leave(&ioq->connect_list_lock);
  1257. if (connect(tcpsock->u.sock, (struct sockaddr*)&service, sizeof(service)) == 0) {
  1258. return 0;
  1259. } else {
  1260. spinlock_enter(&ioq->connect_list_lock, -1);
  1261. list_del(&overlapped->node);
  1262. spinlock_leave(&ioq->connect_list_lock);
  1263. }
  1264. }
  1265. WSACloseEvent(overlapped->hevt);
  1266. }
  1267. fastlock_enter(tcpsock->ov_pending_list_lock);
  1268. list_del(&overlapped->base.pending_entry);
  1269. fastlock_leave(tcpsock->ov_pending_list_lock);
  1270. dec_pending_io(tcpsock);
  1271. return -1;
  1272. }
  1273. TOOLKIT_API int ioqueue_tcpsock_conect(ioqueue_tcpsock_t *tcpsock,
  1274. const char *ip,
  1275. unsigned short port,
  1276. int timeout)
  1277. {
  1278. fd_set wr_set;
  1279. fd_set ex_set;
  1280. struct timeval tm;
  1281. assert(tcpsock);
  1282. assert(ip);
  1283. assert(port > 0);
  1284. FD_ZERO(&wr_set);
  1285. FD_ZERO(&ex_set);
  1286. FD_SET(tcpsock->u.sock, &wr_set);
  1287. FD_SET(tcpsock->u.sock, &ex_set);
  1288. tm.tv_sec = timeout / 1000;
  1289. tm.tv_usec = 1000 * (timeout % 1000);
  1290. if (select(tcpsock->u.sock+1, NULL, &wr_set, &ex_set, &tm) > 0) {
  1291. if (FD_ISSET(tcpsock->u.sock, &ex_set))
  1292. return -1;
  1293. if (FD_ISSET(tcpsock->u.sock, &wr_set))
  1294. return 0;
  1295. }
  1296. return -1;
  1297. }
  1298. TOOLKIT_API int ioqueue_tcpsock_async_sendsome(ioqueue_tcpsock_t *tcpsock,
  1299. ioqueue_overlapped_t *ov,
  1300. void *buf,
  1301. unsigned int len,
  1302. ioqueue_on_send_callback on_send_callback,
  1303. void *user_data)
  1304. {
  1305. ioqueue_sendsome_overlapped_t *overlapped;
  1306. DWORD bytesWritten;
  1307. int rc;
  1308. ioqueue_t *ioq;
  1309. assert(tcpsock);
  1310. assert(ov);
  1311. assert(buf);
  1312. assert(on_send_callback);
  1313. ioq = ioqueue_tcpsock_get_owned_ioqueue(tcpsock);
  1314. if (ioq->stop)
  1315. return -1;
  1316. overlapped = (ioqueue_sendsome_overlapped_t*)ov;
  1317. memset(overlapped, 0, sizeof(ioqueue_sendsome_overlapped_t));
  1318. fastlock_enter(tcpsock->ov_pending_list_lock);
  1319. list_add_tail(&overlapped->base.pending_entry, &tcpsock->ov_pending_list);
  1320. fastlock_leave(tcpsock->ov_pending_list_lock);
  1321. overlapped->base.type = OV_SENDSOME;
  1322. overlapped->base.handle_ctx = (ioqueue_handle_context*)tcpsock;
  1323. overlapped->base.user_data = user_data;
  1324. overlapped->on_send_callback = on_send_callback;
  1325. overlapped->wsabuf.len = len;
  1326. overlapped->wsabuf.buf = buf;
  1327. inc_pending_io(tcpsock);
  1328. rc = WSASend(tcpsock->u.sock, &overlapped->wsabuf, 1, &bytesWritten,
  1329. 0, &overlapped->base.ov, NULL);
  1330. if (rc == 0 || WSAGetLastError() == WSA_IO_PENDING)
  1331. return 0;
  1332. fastlock_enter(tcpsock->ov_pending_list_lock);
  1333. list_del(&overlapped->base.pending_entry);
  1334. fastlock_leave(tcpsock->ov_pending_list_lock);
  1335. dec_pending_io(tcpsock);
  1336. return -1;
  1337. }
  1338. TOOLKIT_API int ioqueue_tcpsock_async_sendn(ioqueue_tcpsock_t *tcpsock,
  1339. ioqueue_overlapped_t *ov,
  1340. void *buf,
  1341. unsigned int len,
  1342. ioqueue_on_send_callback on_send_callback,
  1343. void* user_data)
  1344. {
  1345. ioqueue_sendn_overlapped_t *overlapped;
  1346. DWORD bytesWritten;
  1347. int rc;
  1348. ioqueue_t *ioq;
  1349. assert(tcpsock);
  1350. assert(ov);
  1351. assert(buf);
  1352. assert(on_send_callback);
  1353. ioq = ioqueue_tcpsock_get_owned_ioqueue(tcpsock);
  1354. if (ioq->stop)
  1355. return -1;
  1356. overlapped = (ioqueue_sendn_overlapped_t*)ov;
  1357. memset(overlapped, 0, sizeof(ioqueue_sendn_overlapped_t));
  1358. fastlock_enter(tcpsock->ov_pending_list_lock);
  1359. list_add_tail(&overlapped->base.pending_entry, &tcpsock->ov_pending_list);
  1360. fastlock_leave(tcpsock->ov_pending_list_lock);
  1361. overlapped->base.type = OV_SENDN;
  1362. overlapped->base.handle_ctx = (ioqueue_handle_context*)tcpsock;
  1363. overlapped->base.user_data = user_data;
  1364. overlapped->on_send_callback = on_send_callback;
  1365. overlapped->wsabuf.len = len;
  1366. overlapped->wsabuf.buf = buf;
  1367. overlapped->original_buf = buf;
  1368. overlapped->sended_bytes = 0;
  1369. overlapped->total_bytes = len;
  1370. inc_pending_io(tcpsock);
  1371. WLog_INFO(TAG, "ioqueue_tcpsock_async_sendn fired! sock: %d", tcpsock->u.sock);
  1372. rc = evtpoll_subscribe(ioq->ep, EV_WRITE, tcpsock->u.sock, NULL, ov);
  1373. if (!rc) {
  1374. return 0;
  1375. }
  1376. WLog_ERR(TAG, "epoll_ctl for write failed return: %d", rc);
  1377. fastlock_enter(tcpsock->ov_pending_list_lock);
  1378. list_del(&overlapped->base.pending_entry);
  1379. fastlock_leave(tcpsock->ov_pending_list_lock);
  1380. dec_pending_io(tcpsock);
  1381. return -1;
  1382. }
  1383. TOOLKIT_API int ioqueue_tcpsock_async_senduntil(ioqueue_tcpsock_t *tcpsock,
  1384. ioqueue_overlapped_t *ov,
  1385. void *buf,
  1386. unsigned int len,
  1387. const char *delimer,
  1388. ioqueue_on_send_callback on_send_cb,
  1389. void* user_data)
  1390. {
  1391. const char *p;
  1392. assert(tcpsock);
  1393. assert(ov);
  1394. assert(buf);
  1395. assert(on_send_cb);
  1396. assert(delimer);
  1397. p = memstr(buf, len, delimer);
  1398. if (!p)
  1399. return -1;
  1400. p += strlen(delimer);
  1401. return ioqueue_tcpsock_async_sendn(tcpsock, ov, buf, p - (char*)buf, on_send_cb, user_data);
  1402. }
  1403. TOOLKIT_API int ioqueue_tcpsock_sendsome(ioqueue_tcpsock_t *tcpsock,
  1404. void *buf,
  1405. unsigned int len,
  1406. int timeout)
  1407. {
  1408. assert(tcpsock);
  1409. return send(tcpsock->u.sock, buf, len, 0);
  1410. }
  1411. TOOLKIT_API int ioqueue_tcpsock_sendn(ioqueue_tcpsock_t *tcpsock,
  1412. void *buf,
  1413. unsigned int len,
  1414. int timeout)
  1415. {
  1416. return tsend_n(tcpsock->u.sock, buf, len, timeout);
  1417. }
  1418. TOOLKIT_API int ioqueue_tcpsock_senduntil(ioqueue_tcpsock_t *tcpsock,
  1419. void *buf,
  1420. unsigned int len,
  1421. const char *delimer,
  1422. int timeout)
  1423. {
  1424. return tsend_until(tcpsock->u.sock, buf, len, delimer, timeout);
  1425. }
  1426. TOOLKIT_API int ioqueue_tcpsock_async_recvsome(ioqueue_tcpsock_t *tcpsock,
  1427. ioqueue_overlapped_t *ov,
  1428. void *buf,
  1429. unsigned int len,
  1430. ioqueue_on_recv_callback on_recv_callback,
  1431. void *user_data)
  1432. {
  1433. ioqueue_recvsome_overlapped_t *overlapped;
  1434. DWORD bytesRead;
  1435. int rc;
  1436. ioqueue_t *ioq;
  1437. assert(tcpsock);
  1438. assert(ov);
  1439. assert(buf);
  1440. assert(on_recv_callback);
  1441. ioq = tcpsock->owner;
  1442. if (ioq->stop)
  1443. return -1;
  1444. overlapped = (ioqueue_recvsome_overlapped_t*)ov;
  1445. memset(overlapped, 0, sizeof(ioqueue_recvsome_overlapped_t));
  1446. fastlock_enter(tcpsock->ov_pending_list_lock);
  1447. list_add_tail(&overlapped->base.pending_entry, &tcpsock->ov_pending_list);
  1448. fastlock_leave(tcpsock->ov_pending_list_lock);
  1449. overlapped->base.type = OV_RECVSOME;
  1450. overlapped->base.handle_ctx = (ioqueue_handle_context*)tcpsock;
  1451. overlapped->base.user_data = user_data;
  1452. overlapped->on_recv_callback = on_recv_callback;
  1453. overlapped->wsabuf.len = len;
  1454. overlapped->wsabuf.buf = buf;
  1455. overlapped->dwFlags = 0;
  1456. inc_pending_io(tcpsock);
  1457. rc = WSARecv(tcpsock->u.sock, &overlapped->wsabuf, 1, &bytesRead, &overlapped->dwFlags,
  1458. &overlapped->base.ov, NULL);
  1459. if (rc == 0 || WSAGetLastError() == WSA_IO_PENDING)
  1460. return 0;
  1461. fastlock_enter(tcpsock->ov_pending_list_lock);
  1462. list_del(&overlapped->base.pending_entry);
  1463. fastlock_leave(tcpsock->ov_pending_list_lock);
  1464. dec_pending_io(tcpsock);
  1465. return -1;
  1466. }
  1467. TOOLKIT_API int ioqueue_tcpsock_async_recvn(ioqueue_tcpsock_t *tcpsock,
  1468. ioqueue_overlapped_t *ov,
  1469. void *buf,
  1470. unsigned int len,
  1471. ioqueue_on_recv_callback on_recv_callback,
  1472. void *user_data)
  1473. {
  1474. ioqueue_recvn_overlapped_t *overlapped;
  1475. DWORD bytesRead;
  1476. int rc;
  1477. ioqueue_t *ioq;
  1478. assert(tcpsock);
  1479. assert(ov);
  1480. assert(buf);
  1481. assert(on_recv_callback);
  1482. ioq = tcpsock->owner;
  1483. if (ioq->stop)
  1484. return -1;
  1485. overlapped = (ioqueue_recvn_overlapped_t*)ov;
  1486. memset(overlapped, 0, sizeof(ioqueue_recvn_overlapped_t));
  1487. fastlock_enter(tcpsock->ov_pending_list_lock);
  1488. list_add_tail(&overlapped->base.pending_entry, &tcpsock->ov_pending_list);
  1489. fastlock_leave(tcpsock->ov_pending_list_lock);
  1490. overlapped->base.type = OV_RECVN;
  1491. overlapped->base.handle_ctx = (ioqueue_handle_context*)tcpsock;
  1492. overlapped->base.user_data = user_data;
  1493. overlapped->on_recv_callback = on_recv_callback;
  1494. overlapped->wsabuf.len = len;
  1495. overlapped->wsabuf.buf = buf;
  1496. overlapped->original_buf = buf;
  1497. overlapped->recved_bytes = 0;
  1498. overlapped->total_bytes = len;
  1499. overlapped->dwFlags = 0;
  1500. inc_pending_io(tcpsock);
  1501. rc = evtpoll_subscribe(ioq->ep, EV_READ, tcpsock->u.sock, ov, NULL);
  1502. if (!rc) {
  1503. return 0;
  1504. }
  1505. WLog_ERR(TAG, "epoll_ctl for read failed return: %d", rc);
  1506. fastlock_enter(tcpsock->ov_pending_list_lock);
  1507. list_del(&overlapped->base.pending_entry);
  1508. fastlock_leave(tcpsock->ov_pending_list_lock);
  1509. dec_pending_io(tcpsock);
  1510. return -1;
  1511. }
  1512. TOOLKIT_API int ioqueue_tcpsock_async_recvuntil(ioqueue_tcpsock_t *tcpsock,
  1513. ioqueue_overlapped_t *ov,
  1514. void *buf,
  1515. unsigned int len,
  1516. const char *delimer,
  1517. ioqueue_on_recvuntil_callback on_recvuntil_callback,
  1518. void *user_data)
  1519. {
  1520. ioqueue_recvuntil_overlapped_t *overlapped;
  1521. DWORD bytesRead;
  1522. int rc;
  1523. ioqueue_t *ioq;
  1524. assert(tcpsock);
  1525. assert(ov);
  1526. assert(buf);
  1527. assert(delimer);
  1528. assert(on_recvuntil_callback);
  1529. ioq = tcpsock->owner;
  1530. if (ioq->stop)
  1531. return -1;
  1532. overlapped = (ioqueue_recvuntil_overlapped_t*)ov;
  1533. memset(overlapped, 0, sizeof(ioqueue_recvuntil_overlapped_t));
  1534. fastlock_enter(tcpsock->ov_pending_list_lock);
  1535. list_add_tail(&overlapped->base.pending_entry, &tcpsock->ov_pending_list);
  1536. fastlock_leave(tcpsock->ov_pending_list_lock);
  1537. overlapped->base.type = OV_RECVUNTIL;
  1538. overlapped->base.handle_ctx = (ioqueue_handle_context*)tcpsock;
  1539. overlapped->base.user_data = user_data;
  1540. overlapped->on_recvuntil_callback = on_recvuntil_callback;
  1541. overlapped->wsabuf.len = len;
  1542. overlapped->wsabuf.buf = buf;
  1543. overlapped->original_buf = buf;
  1544. overlapped->recved_bytes = 0;
  1545. overlapped->total_bytes = len;
  1546. overlapped->delimer = _strdup(delimer);
  1547. overlapped->dwFlags = 0;
  1548. inc_pending_io(tcpsock);
  1549. rc = WSARecv(tcpsock->u.sock, &overlapped->wsabuf, 1, &bytesRead, &overlapped->dwFlags,
  1550. &overlapped->base.ov, NULL);
  1551. if (rc == 0 || WSAGetLastError() == WSA_IO_PENDING)
  1552. return 0;
  1553. fastlock_enter(tcpsock->ov_pending_list_lock);
  1554. list_del(&overlapped->base.pending_entry);
  1555. fastlock_leave(tcpsock->ov_pending_list_lock);
  1556. dec_pending_io(tcpsock);
  1557. return -1;
  1558. }
  1559. TOOLKIT_API int ioqueue_tcpsock_recvsome(ioqueue_tcpsock_t *tcpsock,
  1560. void *buf,
  1561. unsigned int len,
  1562. int timeout)
  1563. {
  1564. return recv(tcpsock->u.sock, buf, len, 0);
  1565. }
  1566. TOOLKIT_API int ioqueue_tcpsock_recvn(ioqueue_tcpsock_t *tcpsock,
  1567. void *buf,
  1568. unsigned int len,
  1569. int timeout)
  1570. {
  1571. return trecv_n(tcpsock->u.sock, buf, len, timeout);
  1572. }
  1573. TOOLKIT_API int ioqueue_tcpsock_recvuntil(ioqueue_tcpsock_t *tcpsock,
  1574. void *buf,
  1575. unsigned int len,
  1576. const char *delimer,
  1577. unsigned int *header_len,
  1578. int timeout)
  1579. {
  1580. return trecv_until(tcpsock->u.sock, buf, len, delimer, header_len, timeout);
  1581. }
  1582. TOOLKIT_API void ioqueue_tcpsock_close(ioqueue_tcpsock_t *tcpsock)
  1583. {
  1584. SOCKET s;
  1585. assert(tcpsock);
  1586. s = tcpsock->u.sock;
  1587. if (s != INVALID_SOCKET) {
  1588. tcpsock->u.sock = INVALID_SOCKET;
  1589. closesocket(s);
  1590. }
  1591. }
  1592. TOOLKIT_API void ioqueue_tcpsock_destroy(ioqueue_tcpsock_t *tcpsock)
  1593. {
  1594. assert(tcpsock);
  1595. dec_ref(ioqueue_handle_context, tcpsock);
  1596. }
  1597. TOOLKIT_API int ioqueue_tcpsock_shutdown(ioqueue_tcpsock_t *tcpsock, int how)
  1598. {
  1599. assert(tcpsock);
  1600. return shutdown(tcpsock->u.sock, how);
  1601. }
  1602. TOOLKIT_API SOCKET ioqueue_tcpsock_get_raw_socket(ioqueue_tcpsock_t *tcpsock)
  1603. {
  1604. assert(tcpsock);
  1605. return tcpsock->u.sock;
  1606. }
  1607. TOOLKIT_API ioqueue_t* ioqueue_tcpsock_get_owned_ioqueue(ioqueue_tcpsock_t *tcpsock)
  1608. {
  1609. assert(tcpsock);
  1610. return tcpsock->owner;
  1611. }
  1612. TOOLKIT_API void *ioqueue_tcpsock_set_user_data(ioqueue_tcpsock_t *tcpsock, void *user_data)
  1613. {
  1614. void *old;
  1615. assert(tcpsock);
  1616. old = tcpsock->user_data;
  1617. tcpsock->user_data = user_data;
  1618. return old;
  1619. }
  1620. TOOLKIT_API void *ioqueue_tcpsock_get_user_data(ioqueue_tcpsock_t *tcpsock)
  1621. {
  1622. assert(tcpsock);
  1623. return tcpsock->user_data;
  1624. }
  1625. TOOLKIT_API int ioqueue_tcpsock_cancel(ioqueue_tcpsock_t* tcpsock)
  1626. {
  1627. assert(tcpsock);
  1628. return CancelIo(tcpsock->u.file) ? 0 : -1;
  1629. }
  1630. /* udpsock */
  1631. TOOLKIT_API int ioqueue_udpsock_create(ioqueue_t *ioq, ioqueue_udpsock_t *udpsock)
  1632. {
  1633. SOCKET s;
  1634. assert(ioq);
  1635. assert(udpsock);
  1636. if (ioq->stop)
  1637. return -1;
  1638. s = WSASocket(AF_INET, SOCK_DGRAM, IPPROTO_UDP, NULL, 0, WSA_FLAG_OVERLAPPED);
  1639. if (s == INVALID_SOCKET)
  1640. return -1;
  1641. if (ioqueue_udpsock_create_from_handle(ioq, s, udpsock) != 0) {
  1642. closesocket(s);
  1643. return -1;
  1644. }
  1645. return 0;
  1646. }
  1647. TOOLKIT_API int ioqueue_udpsock_create_from_handle(ioqueue_t *ioq, SOCKET s, ioqueue_udpsock_t *udpsock)
  1648. {
  1649. assert(ioq);
  1650. assert(udpsock);
  1651. assert(s != INVALID_SOCKET);
  1652. if (ioq->stop)
  1653. return -1;
  1654. memset(udpsock, 0, sizeof(ioqueue_udpsock_t));
  1655. udpsock->u.sock = s;
  1656. nonblock_sock(udpsock->u.sock);
  1657. if (!CreateIoCompletionPort((HANDLE)udpsock->u.sock, ioq->iocp, 0, 0))
  1658. return -1;
  1659. fastlock_init(udpsock->ov_pending_list_lock);
  1660. INIT_LIST_HEAD(&udpsock->ov_pending_list);
  1661. udpsock->type = HANDLE_TYPE_UDPSOCK;
  1662. udpsock->owner = ioq;
  1663. add_handler_list(udpsock, ioq);
  1664. inc_ref(ioqueue_handle_context, udpsock);
  1665. return 0;
  1666. }
  1667. TOOLKIT_API void ioqueue_udpsock_close(ioqueue_udpsock_t *udpsock)
  1668. {
  1669. SOCKET s;
  1670. assert(udpsock);
  1671. s = udpsock->u.sock;
  1672. if (s != INVALID_SOCKET) {
  1673. udpsock->u.sock = INVALID_SOCKET;
  1674. closesocket(s);
  1675. }
  1676. }
  1677. TOOLKIT_API void ioqueue_udpsock_destroy(ioqueue_udpsock_t *udpsock)
  1678. {
  1679. assert(udpsock);
  1680. dec_ref(ioqueue_handle_context, udpsock);
  1681. }
  1682. TOOLKIT_API int ioqueue_udpsock_async_sendto(ioqueue_udpsock_t* udpsock,
  1683. ioqueue_overlapped_t *ov,
  1684. void *buf,
  1685. int len,
  1686. const struct sockaddr* to,
  1687. int tolen,
  1688. ioqueue_on_sendto_callback on_sendto_callback,
  1689. void *user_data)
  1690. {
  1691. ioqueue_sendto_overlapped_t *overlapped;
  1692. int rc;
  1693. DWORD bytesWritten;
  1694. ioqueue_t *ioq;
  1695. assert(udpsock);
  1696. assert(ov);
  1697. assert(buf);
  1698. assert(to);
  1699. assert(on_sendto_callback);
  1700. assert(ov);
  1701. ioq = udpsock->owner;
  1702. if (ioq->stop)
  1703. return -1;
  1704. overlapped = (ioqueue_sendto_overlapped_t*)ov;
  1705. memset(overlapped, 0, sizeof(ioqueue_sendto_overlapped_t));
  1706. fastlock_enter(udpsock->ov_pending_list_lock);
  1707. list_add_tail(&overlapped->base.pending_entry, &udpsock->ov_pending_list);
  1708. fastlock_leave(udpsock->ov_pending_list_lock);
  1709. overlapped->base.type = OV_SENDTO;
  1710. overlapped->base.user_data = user_data;
  1711. overlapped->base.handle_ctx = (ioqueue_handle_context*)udpsock;
  1712. overlapped->on_sendto_callback = on_sendto_callback;
  1713. overlapped->wsabuf.len = len;
  1714. overlapped->wsabuf.buf = buf;
  1715. inc_pending_io(udpsock);
  1716. rc = WSASendTo(udpsock->u.sock, &overlapped->wsabuf, 1, &bytesWritten, 0,
  1717. to, tolen, &overlapped->base.ov, NULL);
  1718. if (rc == 0 || WSAGetLastError() == WSA_IO_PENDING)
  1719. return 0;
  1720. fastlock_enter(udpsock->ov_pending_list_lock);
  1721. list_del(&overlapped->base.pending_entry);
  1722. fastlock_leave(udpsock->ov_pending_list_lock);
  1723. dec_pending_io(udpsock);
  1724. return -1;
  1725. }
  1726. TOOLKIT_API int ioqueue_udpsock_sendto(ioqueue_udpsock_t *udpsock,
  1727. void *buf,
  1728. int len,
  1729. const struct sockaddr* to,
  1730. int tolen,
  1731. int timeout)
  1732. {
  1733. return tsendto(udpsock->u.sock, buf, len, to, tolen, timeout);
  1734. }
  1735. TOOLKIT_API int ioqueue_udpsock_async_recvfrom(ioqueue_udpsock_t* udpsock,
  1736. ioqueue_overlapped_t *ov,
  1737. void* buf,
  1738. int len,
  1739. ioqueue_on_recvfrom_callback on_recvfrom_callback,
  1740. void *user_data)
  1741. {
  1742. ioqueue_recvfrom_overlapped_t *overlapped;
  1743. int rc;
  1744. DWORD bytesRead;
  1745. ioqueue_t *ioq;
  1746. assert(udpsock);
  1747. assert(ov);
  1748. assert(buf);
  1749. assert(on_recvfrom_callback);
  1750. ioq = udpsock->owner;
  1751. if (ioq->stop)
  1752. return -1;
  1753. overlapped = (ioqueue_recvfrom_overlapped_t*)ov;
  1754. memset(overlapped, 0, sizeof(ioqueue_recvfrom_overlapped_t));
  1755. fastlock_enter(udpsock->ov_pending_list_lock);
  1756. list_add_tail(&overlapped->base.pending_entry, &udpsock->ov_pending_list);
  1757. fastlock_leave(udpsock->ov_pending_list_lock);
  1758. overlapped->base.type = OV_RECVFROM;
  1759. overlapped->base.user_data = user_data;
  1760. overlapped->base.handle_ctx = (ioqueue_handle_context*)udpsock;
  1761. overlapped->on_recvfrom_callback = on_recvfrom_callback;
  1762. overlapped->wsabuf.len = len;
  1763. overlapped->wsabuf.buf = buf;
  1764. overlapped->dwFlags = 0;
  1765. inc_pending_io(udpsock);
  1766. rc = WSARecvFrom(udpsock->u.sock, &overlapped->wsabuf, 1, &bytesRead, &overlapped->dwFlags,
  1767. (struct sockaddr*)&overlapped->peer, &overlapped->addrlen, &overlapped->base.ov, NULL);
  1768. if (rc == 0 || WSAGetLastError() == WSA_IO_PENDING)
  1769. return 0;
  1770. fastlock_enter(udpsock->ov_pending_list_lock);
  1771. list_del(&overlapped->base.pending_entry);
  1772. fastlock_leave(udpsock->ov_pending_list_lock);
  1773. dec_pending_io(udpsock);
  1774. return -1;
  1775. }
  1776. TOOLKIT_API int ioqueue_udpsock_recvfrom(ioqueue_udpsock_t* udpsock,
  1777. ioqueue_overlapped_t *overlapped,
  1778. void* buf,
  1779. int len,
  1780. struct sockaddr *fromaddr,
  1781. int *addrlen,
  1782. int timeout)
  1783. {
  1784. return trecvfrom(udpsock->u.sock, buf, len, fromaddr, addrlen, timeout);
  1785. }
  1786. TOOLKIT_API SOCKET ioqueue_udpsock_get_raw_socket(ioqueue_udpsock_t *udpsock)
  1787. {
  1788. assert(udpsock);
  1789. return udpsock->u.sock;
  1790. }
  1791. TOOLKIT_API ioqueue_t* ioqueue_udpsock_get_owned_ioqueue(ioqueue_udpsock_t *udpsock)
  1792. {
  1793. assert(udpsock);
  1794. return udpsock->owner;
  1795. }
  1796. TOOLKIT_API void *ioqueue_udpsock_set_user_data(ioqueue_udpsock_t *udpsock, void *user_data)
  1797. {
  1798. void *old;
  1799. assert(udpsock);
  1800. old = udpsock->user_data;
  1801. udpsock->user_data = user_data;
  1802. return old;
  1803. }
  1804. TOOLKIT_API void *ioqueue_udpsock_get_user_data(ioqueue_udpsock_t *udpsock)
  1805. {
  1806. assert(udpsock);
  1807. return udpsock->user_data;
  1808. }
  1809. TOOLKIT_API int ioqueue_udpsock_cancel(ioqueue_udpsock_t *udpsock)
  1810. {
  1811. assert(udpsock);
  1812. return CancelIo(udpsock->u.file) ? 0 : -1;
  1813. }
  1814. /* file */
  1815. TOOLKIT_API int ioqueue_file_create(ioqueue_t *ioq,
  1816. const char *path,
  1817. DWORD dwDesiredAccess,
  1818. DWORD dwShareMode,
  1819. DWORD dwCreationDisposition,
  1820. DWORD dwFlagsAndAttributes,
  1821. ioqueue_file_t *file)
  1822. {
  1823. HANDLE hFile;
  1824. assert(ioq);
  1825. assert(path);
  1826. assert(file);
  1827. if (ioq->stop)
  1828. return -1;
  1829. hFile = CreateFileA(path, dwDesiredAccess, dwShareMode,
  1830. NULL, dwCreationDisposition,
  1831. dwFlagsAndAttributes|FILE_FLAG_OVERLAPPED, NULL);
  1832. return ioqueue_file_create_from_handle(ioq, hFile, file);
  1833. }
  1834. TOOLKIT_API int ioqueue_file_create_from_handle(ioqueue_t *ioq, HANDLE h, ioqueue_file_t *file)
  1835. {
  1836. assert(ioq);
  1837. assert(file);
  1838. if (ioq->stop)
  1839. return -1;
  1840. memset(file, 0, sizeof(ioqueue_file_t));
  1841. file->u.file = h;
  1842. if (file->u.file == INVALID_HANDLE_VALUE)
  1843. return -1;
  1844. if (!CreateIoCompletionPort(file->u.file, ioq->iocp, 0, 0)) {
  1845. CloseHandle(file->u.file);
  1846. file->u.file = INVALID_HANDLE_VALUE;
  1847. return -1;
  1848. }
  1849. fastlock_init(file->ov_pending_list_lock);
  1850. INIT_LIST_HEAD(&file->ov_pending_list);
  1851. file->type = HANDLE_TYPE_FILE;
  1852. file->owner = ioq;
  1853. add_handler_list(file, ioq);
  1854. inc_ref(ioqueue_handle_context, file);
  1855. return 0;
  1856. }
  1857. TOOLKIT_API void ioqueue_file_close(ioqueue_file_t* file)
  1858. {
  1859. HANDLE s;
  1860. assert(file);
  1861. s = file->u.file;
  1862. if (s != INVALID_HANDLE_VALUE) {
  1863. file->u.file = INVALID_HANDLE_VALUE;
  1864. CloseHandle(s);
  1865. }
  1866. }
  1867. TOOLKIT_API void ioqueue_file_destroy(ioqueue_file_t* file)
  1868. {
  1869. assert(file);
  1870. dec_ref(ioqueue_handle_context, file);
  1871. }
  1872. TOOLKIT_API int ioqueue_file_async_readsome(ioqueue_file_t* file,
  1873. ioqueue_overlapped_t *ov,
  1874. void *buf,
  1875. unsigned int len,
  1876. ioqueue_on_read_callback on_read_callback,
  1877. void *user_data)
  1878. {
  1879. return ioqueue_file_async_readsome_at(file, ov, buf, len, 0, 0, on_read_callback, user_data);
  1880. }
  1881. TOOLKIT_API int ioqueue_file_async_readn(ioqueue_file_t* file,
  1882. ioqueue_overlapped_t *overlapped,
  1883. void *buf,
  1884. unsigned int len,
  1885. ioqueue_on_read_callback on_read_cb,
  1886. void *user_data)
  1887. {
  1888. return ioqueue_file_async_readn_at(file, overlapped, buf, len, 0, 0, on_read_cb, user_data);
  1889. }
  1890. TOOLKIT_API int ioqueue_file_readsome(ioqueue_file_t *file, void *buf, unsigned int len)
  1891. {
  1892. return ioqueue_file_readsome_at(file, buf, len, 0, 0);
  1893. }
  1894. TOOLKIT_API int ioqueue_file_readn(ioqueue_file_t *file, void *buf, unsigned int len)
  1895. {
  1896. return ioqueue_file_readn_at(file, buf, len, 0, 0);
  1897. }
  1898. TOOLKIT_API int ioqueue_file_async_readsome_at(ioqueue_file_t* file,
  1899. ioqueue_overlapped_t *ov,
  1900. void *buf,
  1901. unsigned int len,
  1902. DWORD posLow,
  1903. DWORD posHigh,
  1904. ioqueue_on_read_callback on_read_callback,
  1905. void *user_data)
  1906. {
  1907. ioqueue_readfilesome_overlapped_t *overlapped;
  1908. BOOL rc;
  1909. ioqueue_t *ioq;
  1910. assert(file);
  1911. assert(ov);
  1912. assert(buf);
  1913. assert(on_read_callback);
  1914. ioq = file->owner;
  1915. if (ioq->stop)
  1916. return -1;
  1917. overlapped = (ioqueue_readfilesome_overlapped_t*)ov;
  1918. memset(overlapped, 0, sizeof(ioqueue_readfilesome_overlapped_t));
  1919. fastlock_enter(file->ov_pending_list_lock);
  1920. list_add_tail(&overlapped->base.pending_entry, &file->ov_pending_list);
  1921. fastlock_leave(file->ov_pending_list_lock);
  1922. overlapped->hevt = CreateEventA(NULL, TRUE, FALSE, NULL);
  1923. overlapped->base.type = OV_READFILESOME;
  1924. overlapped->base.user_data = user_data;
  1925. overlapped->base.handle_ctx = (ioqueue_handle_context*)file;
  1926. overlapped->base.ov.Offset = posLow;
  1927. overlapped->base.ov.OffsetHigh = posHigh;
  1928. overlapped->on_read_callback = on_read_callback;
  1929. overlapped->buf = buf;
  1930. inc_pending_io(file);
  1931. rc = ReadFile(file->u.file, buf, (DWORD)len, NULL, &overlapped->base.ov);
  1932. if (rc || GetLastError() == ERROR_IO_PENDING)
  1933. return 0;
  1934. fastlock_enter(file->ov_pending_list_lock);
  1935. list_del(&overlapped->base.pending_entry);
  1936. fastlock_leave(file->ov_pending_list_lock);
  1937. dec_pending_io(file);
  1938. CloseHandle(overlapped->hevt);
  1939. return -1;
  1940. }
  1941. TOOLKIT_API int ioqueue_file_async_readn_at(ioqueue_file_t* file,
  1942. ioqueue_overlapped_t *ov,
  1943. void *buf,
  1944. unsigned int len,
  1945. DWORD posLow,
  1946. DWORD posHigh,
  1947. ioqueue_on_read_callback on_read_cb,
  1948. void *user_data)
  1949. {
  1950. ioqueue_readfilen_overlapped_t *overlapped;
  1951. BOOL rc;
  1952. ioqueue_t *ioq;
  1953. assert(file);
  1954. assert(ov);
  1955. assert(buf);
  1956. assert(on_read_cb);
  1957. ioq = file->owner;
  1958. if (ioq->stop)
  1959. return -1;
  1960. overlapped = (ioqueue_readfilen_overlapped_t*)ov;
  1961. memset(overlapped, 0, sizeof(ioqueue_readfilen_overlapped_t));
  1962. fastlock_enter(file->ov_pending_list_lock);
  1963. list_add_tail(&overlapped->base.pending_entry, &file->ov_pending_list);
  1964. fastlock_leave(file->ov_pending_list_lock);
  1965. overlapped->hevt = CreateEventA(NULL, TRUE, FALSE, NULL);
  1966. overlapped->base.type = OV_READFILEN;
  1967. overlapped->base.user_data = user_data;
  1968. overlapped->base.handle_ctx = (ioqueue_handle_context*)file;
  1969. overlapped->base.ov.Offset = posLow;
  1970. overlapped->base.ov.OffsetHigh = posHigh;
  1971. overlapped->on_read_callback = on_read_cb;
  1972. overlapped->buf = buf;
  1973. overlapped->recved_bytes = 0;
  1974. overlapped->total_bytes = len;
  1975. inc_pending_io(file);
  1976. rc = ReadFile(file->u.file, buf, (DWORD)len, NULL, &overlapped->base.ov);
  1977. if (rc || GetLastError() == ERROR_IO_PENDING)
  1978. return 0;
  1979. fastlock_enter(file->ov_pending_list_lock);
  1980. list_del(&overlapped->base.pending_entry);
  1981. fastlock_leave(file->ov_pending_list_lock);
  1982. dec_pending_io(file);
  1983. return -1;
  1984. }
  1985. TOOLKIT_API int ioqueue_file_readsome_at(ioqueue_file_t *file,
  1986. void *buf,
  1987. unsigned int len,
  1988. DWORD posLow,
  1989. DWORD posHigh)
  1990. {
  1991. OVERLAPPED ov;
  1992. BOOL ret;
  1993. DWORD dwTransferBytes;
  1994. int rc = -1;
  1995. /* (MSDN)
  1996. Even if you have passed the function a file handle associated with a completion port and
  1997. a valid OVERLAPPED structure, an application can prevent completion port notification.
  1998. This is done by specifying a valid event handle for the hEvent member of the OVERLAPPED structure,
  1999. and setting its low-order bit. A valid event handle whose low-order bit is set keeps I/O completion
  2000. from being queued to the completion port.
  2001. */
  2002. memset(&ov, 0, sizeof(ov));
  2003. ov.Offset = posLow;
  2004. ov.OffsetHigh = posHigh;
  2005. ov.hEvent = CreateEventA(NULL, TRUE, FALSE, NULL);
  2006. ov.hEvent = (HANDLE)((DWORD)ov.hEvent & 0x1);
  2007. ret = ReadFile(file->u.file, buf, len, &dwTransferBytes, &ov);
  2008. if (!ret && GetLastError() == ERROR_IO_PENDING) {
  2009. ret = GetOverlappedResult(file->u.file, &ov, &dwTransferBytes, TRUE);
  2010. }
  2011. CloseHandle((HANDLE)((DWORD)ov.hEvent & ~1));
  2012. if (ret && dwTransferBytes > 0)
  2013. rc = dwTransferBytes;
  2014. return rc;
  2015. }
  2016. TOOLKIT_API int ioqueue_file_readn_at(ioqueue_file_t *file,
  2017. void *buf,
  2018. unsigned int len,
  2019. DWORD posLow,
  2020. DWORD posHigh)
  2021. {
  2022. OVERLAPPED ov;
  2023. int rc = 0;
  2024. DWORD left = len;
  2025. DWORD offset = 0;
  2026. memset(&ov, 0, sizeof(ov));
  2027. ov.Offset = posLow;
  2028. ov.OffsetHigh = posHigh;
  2029. ov.hEvent = CreateEventA(NULL, TRUE, FALSE, NULL);
  2030. ov.hEvent = (HANDLE)((DWORD)ov.hEvent & 0x1);
  2031. while (left > 0) {
  2032. BOOL ret;
  2033. DWORD dwTransferBytes;
  2034. ret = ReadFile(file->u.file, (char*)buf+offset, len, &dwTransferBytes, &ov);
  2035. if (!ret && GetLastError() == ERROR_IO_PENDING) {
  2036. ret = GetOverlappedResult(file->u.file, &ov, &dwTransferBytes, TRUE);
  2037. }
  2038. if (rc && dwTransferBytes) {
  2039. offset += dwTransferBytes;
  2040. left -= dwTransferBytes;
  2041. ov.Internal = 0;
  2042. ov.InternalHigh = 0;
  2043. ov.Offset += dwTransferBytes;
  2044. if (ov.Offset < dwTransferBytes)
  2045. ov.OffsetHigh++;
  2046. } else {
  2047. rc = -1;
  2048. break;
  2049. }
  2050. }
  2051. CloseHandle((HANDLE)((DWORD)ov.hEvent & ~1));
  2052. return rc;
  2053. }
  2054. TOOLKIT_API int ioqueue_file_async_writesome(ioqueue_file_t* file,
  2055. ioqueue_overlapped_t *ov,
  2056. void* buf,
  2057. unsigned int len,
  2058. ioqueue_on_write_callback on_write_callback,
  2059. void *user_data)
  2060. {
  2061. return ioqueue_file_async_writesome_at(file, ov, buf, len, 0, 0, on_write_callback, user_data);
  2062. }
  2063. TOOLKIT_API int ioqueue_file_async_writen(ioqueue_file_t* file,
  2064. ioqueue_overlapped_t *overlapped,
  2065. void* buf,
  2066. unsigned int len,
  2067. ioqueue_on_write_callback on_write_cb,
  2068. void *user_data)
  2069. {
  2070. return ioqueue_file_async_writen_at(file, overlapped, buf, len, 0, 0, on_write_cb, user_data);
  2071. }
  2072. TOOLKIT_API int ioqueue_file_writesome(ioqueue_file_t* file, const void *buf, unsigned int len)
  2073. {
  2074. return ioqueue_file_writesome_at(file, buf, len, 0, 0);
  2075. }
  2076. TOOLKIT_API int ioqueue_file_writen(ioqueue_file_t* file, const void *buf, unsigned int len)
  2077. {
  2078. return ioqueue_file_writen_at(file, buf, len, 0, 0);
  2079. }
  2080. TOOLKIT_API int ioqueue_file_async_writesome_at(ioqueue_file_t* file,
  2081. ioqueue_overlapped_t *ov,
  2082. void* buf,
  2083. unsigned int len,
  2084. DWORD posLow,
  2085. DWORD posHigh,
  2086. ioqueue_on_write_callback on_write_callback,
  2087. void *user_data)
  2088. {
  2089. ioqueue_writefilesome_overlapped_t *overlapped;
  2090. BOOL rc;
  2091. ioqueue_t *ioq;
  2092. assert(file);
  2093. assert(ov);
  2094. assert(buf);
  2095. assert(on_write_callback);
  2096. ioq = file->owner;
  2097. if (ioq->stop)
  2098. return -1;
  2099. overlapped = (ioqueue_writefilesome_overlapped_t*)ov;
  2100. memset(overlapped, 0, sizeof(ioqueue_writefilesome_overlapped_t));
  2101. fastlock_enter(file->ov_pending_list_lock);
  2102. list_add_tail(&overlapped->base.pending_entry, &file->ov_pending_list);
  2103. fastlock_leave(file->ov_pending_list_lock);
  2104. overlapped->hevt = CreateEventA(NULL, TRUE, FALSE, NULL);
  2105. overlapped->base.type = OV_WRITEFILESOME;
  2106. overlapped->base.user_data = user_data;
  2107. overlapped->base.handle_ctx = (ioqueue_handle_context*)file;
  2108. overlapped->base.ov.Offset = posLow;
  2109. overlapped->base.ov.OffsetHigh = posHigh;
  2110. overlapped->on_write_callback = on_write_callback;
  2111. overlapped->buf = buf;
  2112. inc_pending_io(file);
  2113. rc = WriteFile(file->u.file, buf, (DWORD)len, NULL, &overlapped->base.ov);
  2114. if (rc || GetLastError() == ERROR_IO_PENDING)
  2115. return 0;
  2116. fastlock_enter(file->ov_pending_list_lock);
  2117. list_del(&overlapped->base.pending_entry);
  2118. fastlock_leave(file->ov_pending_list_lock);
  2119. dec_pending_io(file);
  2120. CloseHandle(overlapped->hevt);
  2121. return -1;
  2122. }
  2123. TOOLKIT_API int ioqueue_file_async_writen_at(ioqueue_file_t* file,
  2124. ioqueue_overlapped_t *ov,
  2125. void* buf,
  2126. unsigned int len,
  2127. DWORD posLow,
  2128. DWORD posHigh,
  2129. ioqueue_on_write_callback on_write_cb,
  2130. void *user_data)
  2131. {
  2132. ioqueue_writefilen_overlapped_t *overlapped;
  2133. BOOL rc;
  2134. ioqueue_t *ioq;
  2135. assert(file);
  2136. assert(ov);
  2137. assert(buf);
  2138. assert(on_write_cb);
  2139. ioq = file->owner;
  2140. if (ioq->stop)
  2141. return -1;
  2142. overlapped = (ioqueue_writefilen_overlapped_t*)ov;
  2143. memset(overlapped, 0, sizeof(ioqueue_writefilen_overlapped_t));
  2144. fastlock_enter(file->ov_pending_list_lock);
  2145. list_add_tail(&overlapped->base.pending_entry, &file->ov_pending_list);
  2146. fastlock_leave(file->ov_pending_list_lock);
  2147. overlapped->hevt = CreateEventA(NULL, TRUE, FALSE, NULL);
  2148. overlapped->base.type = OV_WRITEFILEN;
  2149. overlapped->base.user_data = user_data;
  2150. overlapped->base.handle_ctx = (ioqueue_handle_context*)file;
  2151. overlapped->base.ov.Offset = posLow;
  2152. overlapped->base.ov.OffsetHigh = posHigh;
  2153. overlapped->on_write_callback = on_write_cb;
  2154. overlapped->buf = buf;
  2155. overlapped->sended_bytes = 0;
  2156. overlapped->total_bytes = len;
  2157. inc_pending_io(file);
  2158. rc = WriteFile(file->u.file, buf, (DWORD)len, NULL, &overlapped->base.ov);
  2159. if (rc || GetLastError() == ERROR_IO_PENDING)
  2160. return 0;
  2161. fastlock_enter(file->ov_pending_list_lock);
  2162. list_del(&overlapped->base.pending_entry);
  2163. fastlock_leave(file->ov_pending_list_lock);
  2164. dec_pending_io(file);
  2165. CloseHandle(overlapped->hevt);
  2166. return -1;
  2167. }
  2168. TOOLKIT_API int ioqueue_file_writesome_at(ioqueue_file_t* file,
  2169. const void *buf,
  2170. unsigned int len,
  2171. DWORD posLow,
  2172. DWORD posHigh)
  2173. {
  2174. OVERLAPPED ov;
  2175. BOOL ret;
  2176. DWORD dwTransferBytes;
  2177. int rc = -1;
  2178. memset(&ov, 0, sizeof(ov));
  2179. ov.Offset = posLow;
  2180. ov.OffsetHigh = posHigh;
  2181. ov.hEvent = CreateEventA(NULL, TRUE, FALSE, NULL);
  2182. ov.hEvent = (HANDLE)((DWORD)ov.hEvent & 0x1);
  2183. ret = WriteFile(file->u.file, buf, len, &dwTransferBytes, &ov);
  2184. if (!ret && GetLastError() == ERROR_IO_PENDING) {
  2185. ret = GetOverlappedResult(file->u.file, &ov, &dwTransferBytes, TRUE);
  2186. }
  2187. CloseHandle((HANDLE)((DWORD)ov.hEvent & ~1));
  2188. if (ret && dwTransferBytes > 0)
  2189. rc = dwTransferBytes;
  2190. return rc;
  2191. }
  2192. TOOLKIT_API int ioqueue_file_writen_at(ioqueue_file_t* file,
  2193. const void *buf,
  2194. unsigned int len,
  2195. DWORD posLow,
  2196. DWORD posHigh)
  2197. {
  2198. OVERLAPPED ov;
  2199. int rc = 0;
  2200. DWORD offset = 0;
  2201. DWORD left = len;
  2202. memset(&ov, 0, sizeof(ov));
  2203. ov.Offset = posLow;
  2204. ov.OffsetHigh = posHigh;
  2205. ov.hEvent = CreateEventA(NULL, TRUE, FALSE, NULL);
  2206. ov.hEvent = (HANDLE)((DWORD)ov.hEvent & 0x1);
  2207. while (left > 0) {
  2208. BOOL ret;
  2209. DWORD dwTransferBytes;
  2210. ret = WriteFile(file->u.file, (char*)buf+offset, left, &dwTransferBytes, &ov);
  2211. if (!ret && GetLastError() == ERROR_IO_PENDING) {
  2212. ret = GetOverlappedResult(file->u.file, &ov, &dwTransferBytes, TRUE);
  2213. }
  2214. if (ret && dwTransferBytes > 0) {
  2215. offset += dwTransferBytes;
  2216. left -= dwTransferBytes;
  2217. ov.Internal = 0;
  2218. ov.InternalHigh = 0;
  2219. ov.Offset += dwTransferBytes;
  2220. if (ov.Offset < dwTransferBytes)
  2221. ov.OffsetHigh ++;
  2222. } else {
  2223. rc = -1;
  2224. break;
  2225. }
  2226. }
  2227. CloseHandle((HANDLE)((DWORD)ov.hEvent & ~1));
  2228. return rc;
  2229. }
  2230. TOOLKIT_API ioqueue_t* ioqueue_file_get_owned_ioqueue(ioqueue_file_t* file)
  2231. {
  2232. assert(file);
  2233. return file->owner;
  2234. }
  2235. TOOLKIT_API HANDLE ioqueue_file_get_raw_handle(ioqueue_file_t* file)
  2236. {
  2237. assert(file);
  2238. return file->u.file;
  2239. }
  2240. TOOLKIT_API void *ioqueue_file_set_user_data(ioqueue_file_t* file, void* user_data)
  2241. {
  2242. void *old;
  2243. assert(file);
  2244. old = file->user_data;
  2245. file->user_data = user_data;
  2246. return old;
  2247. }
  2248. TOOLKIT_API void *ioqueue_file_get_user_data(ioqueue_file_t* file)
  2249. {
  2250. assert(file);
  2251. return file->user_data;
  2252. }
  2253. TOOLKIT_API int ioqueue_file_cancel(ioqueue_file_t* file)
  2254. {
  2255. assert(file);
  2256. return CancelIo(file->u.file) ? 0 : -1;
  2257. }
  2258. /* pipe acceptor */
  2259. TOOLKIT_API int ioqueue_pipe_acceptor_create(ioqueue_t *ioq,
  2260. const char *name,
  2261. ioqueue_pipe_acceptor_t *acceptor)
  2262. {
  2263. assert(ioq);
  2264. assert(name);
  2265. assert(acceptor);
  2266. memset(acceptor, 0, sizeof(ioqueue_pipe_acceptor_t));
  2267. acceptor->u.pipe_name = strdup_printf("\\\\.\\pipe\\%s", name);
  2268. acceptor->type = HANDLE_TYPE_PIPEACCEPTOR;
  2269. acceptor->owner = ioq;
  2270. fastlock_init(acceptor->ov_pending_list_lock);
  2271. INIT_LIST_HEAD(&acceptor->ov_pending_list);
  2272. add_handler_list(acceptor, ioq);
  2273. inc_ref(ioqueue_handle_context, acceptor);
  2274. return 0;
  2275. }
  2276. TOOLKIT_API void ioqueue_pipe_acceptor_destroy(ioqueue_pipe_acceptor_t *acceptor)
  2277. {
  2278. assert(acceptor);
  2279. dec_ref(ioqueue_handle_context, acceptor);
  2280. }
  2281. TOOLKIT_API ioqueue_t* ioqueue_pipe_acceptor_get_owned_ioqueue(ioqueue_pipe_acceptor_t *acceptor)
  2282. {
  2283. assert(acceptor);
  2284. assert(acceptor->type == HANDLE_TYPE_PIPEACCEPTOR);
  2285. return acceptor->owner;
  2286. }
  2287. TOOLKIT_API void *ioqueue_pipe_acceptor_set_user_data(ioqueue_pipe_acceptor_t *acceptor, void *user_data)
  2288. {
  2289. void *old;
  2290. assert(acceptor);
  2291. assert(acceptor->type == HANDLE_TYPE_PIPEACCEPTOR);
  2292. old = acceptor->user_data;
  2293. acceptor->user_data = user_data;
  2294. return old;
  2295. }
  2296. TOOLKIT_API void *ioqueue_pipe_acceptor_get_user_data(ioqueue_pipe_acceptor_t *acceptor)
  2297. {
  2298. assert(acceptor);
  2299. assert(acceptor->type == HANDLE_TYPE_ACCEPTOR);
  2300. return acceptor->user_data;
  2301. }
  2302. TOOLKIT_API int ioqueue_pipe_acceptor_async_accept(ioqueue_pipe_acceptor_t *acceptor,
  2303. ioqueue_overlapped_t *ov,
  2304. ioqueue_on_pipe_accept_callback on_accept_callback,
  2305. void *user_data)
  2306. {
  2307. ioqueue_t *ioq;
  2308. ioqueue_connectpipe_overlapped_t *overlapped;
  2309. BOOL ret;
  2310. assert(acceptor);
  2311. assert(ov);
  2312. assert(acceptor->type == HANDLE_TYPE_PIPEACCEPTOR);
  2313. assert(on_accept_callback);
  2314. ioq = acceptor->owner;
  2315. if (ioq->stop)
  2316. return -1;
  2317. overlapped = (ioqueue_connectpipe_overlapped_t*)ov;
  2318. memset(overlapped, 0, sizeof(ioqueue_connectpipe_overlapped_t));
  2319. overlapped->client = CreateNamedPipeA(acceptor->u.pipe_name,
  2320. PIPE_ACCESS_DUPLEX|FILE_FLAG_OVERLAPPED, PIPE_TYPE_BYTE,
  2321. PIPE_UNLIMITED_INSTANCES, 3072, 3072, NMPWAIT_WAIT_FOREVER, NULL);
  2322. if (overlapped->client == INVALID_HANDLE_VALUE)
  2323. return -1;
  2324. if (!CreateIoCompletionPort(overlapped->client, ioq->iocp, 0, 0)) {
  2325. CloseHandle(overlapped->client);
  2326. return -1;
  2327. }
  2328. overlapped->hevt = CreateEventA(NULL, TRUE, FALSE, NULL); // must be use event, from MSDN
  2329. overlapped->base.type = OV_CONNECTPIPE;
  2330. overlapped->base.user_data = user_data;
  2331. overlapped->base.handle_ctx = acceptor;
  2332. overlapped->base.ov.hEvent = overlapped->hevt;
  2333. fastlock_enter(acceptor->ov_pending_list_lock);
  2334. list_add_tail(&overlapped->base.pending_entry, &acceptor->ov_pending_list);
  2335. fastlock_leave(acceptor->ov_pending_list_lock);
  2336. inc_pending_io(acceptor);
  2337. overlapped->on_accept_callback = on_accept_callback;
  2338. ret = ConnectNamedPipe(overlapped->client, &overlapped->base.ov);
  2339. if (ret || GetLastError() == ERROR_IO_PENDING)
  2340. return 0;
  2341. fastlock_enter(acceptor->ov_pending_list_lock);
  2342. list_del(&overlapped->base.pending_entry);
  2343. fastlock_leave(acceptor->ov_pending_list_lock);
  2344. dec_pending_io(acceptor);
  2345. CloseHandle(overlapped->client);
  2346. CloseHandle(overlapped->hevt);
  2347. return -1;
  2348. }
  2349. TOOLKIT_API int ioqueue_pipe_acceptor_accept(ioqueue_pipe_acceptor_t *acceptor, HANDLE *p_pipe, int timeout)
  2350. {
  2351. ioqueue_t *ioq;
  2352. HANDLE pipe;
  2353. OVERLAPPED ov;
  2354. BOOL ret;
  2355. assert(acceptor);
  2356. assert(p_pipe);
  2357. assert(acceptor->type == HANDLE_TYPE_PIPEACCEPTOR);
  2358. ioq = acceptor->owner;
  2359. if (ioq->stop)
  2360. return -1;
  2361. pipe = CreateNamedPipeA(acceptor->u.pipe_name,
  2362. PIPE_ACCESS_DUPLEX|FILE_FLAG_OVERLAPPED, PIPE_TYPE_BYTE,
  2363. PIPE_UNLIMITED_INSTANCES, 3072, 3072, (DWORD)timeout, NULL);
  2364. if (pipe == INVALID_HANDLE_VALUE)
  2365. return -1;
  2366. memset(&ov, 0, sizeof(ov));
  2367. ov.hEvent = CreateEventA(NULL, TRUE, FALSE, NULL);
  2368. ret = ConnectNamedPipe(pipe, &ov);
  2369. CloseHandle(ov.hEvent);
  2370. if (ret && CreateIoCompletionPort(pipe, ioq->iocp, 0, 0)) {
  2371. *p_pipe = pipe;
  2372. return 0;
  2373. } else {
  2374. CloseHandle(pipe);
  2375. }
  2376. return -1;
  2377. }
  2378. TOOLKIT_API int ioqueue_pipe_acceptor_create_client(ioqueue_pipe_acceptor_t *acceptor,
  2379. HANDLE h,
  2380. ioqueue_file_t *pipe)
  2381. {
  2382. ioqueue_t *ioq;
  2383. assert(acceptor);
  2384. assert(pipe);
  2385. assert(h != INVALID_HANDLE_VALUE);
  2386. ioq = acceptor->owner;
  2387. if (ioq->stop)
  2388. return -1;
  2389. memset(pipe, 0, sizeof(ioqueue_tcpsock_t));
  2390. pipe->type = HANDLE_TYPE_FILE;
  2391. pipe->u.file = h;
  2392. pipe->owner = ioq;
  2393. INIT_LIST_HEAD(&pipe->ov_pending_list);
  2394. add_handler_list(pipe, ioq);
  2395. inc_ref(ioqueue_handle_context, pipe);
  2396. return 0;
  2397. }
  2398. TOOLKIT_API int ioqueue_pipe_acceptor_cancel(ioqueue_pipe_acceptor_t *acceptor)
  2399. {
  2400. //.....
  2401. assert(0);
  2402. return 0;
  2403. }
  2404. TOOLKIT_API int ioqueue_pipe_acceptor_close_pending_handle(ioqueue_pipe_acceptor_t *acceptor)
  2405. {
  2406. assert(acceptor);
  2407. fastlock_enter(acceptor->ov_pending_list_lock);
  2408. {
  2409. ioqueue_base_overlapped_t *pos;
  2410. list_for_each_entry(pos, &acceptor->ov_pending_list, ioqueue_base_overlapped_t, pending_entry) {
  2411. ioqueue_connectpipe_overlapped_t *overlapped = (ioqueue_connectpipe_overlapped_t *)pos;
  2412. if (overlapped->client != INVALID_HANDLE_VALUE) {
  2413. CloseHandle(overlapped->client);
  2414. overlapped->client = INVALID_HANDLE_VALUE;
  2415. }
  2416. }
  2417. }
  2418. fastlock_leave(acceptor->ov_pending_list_lock);
  2419. return 0;
  2420. }
  2421. #define IO_GET_TYPE(x) ((x) & 0x0000ff00)
  2422. #define IO_COMBINE_TYPE(x, y) ((((x) << 8) | (y)) & 0x0000ffff)
  2423. TOOLKIT_API int ioqueue_overlapped_get_mask(const ioqueue_overlapped_t* const io)
  2424. {
  2425. assert(io);
  2426. ioqueue_base_overlapped_t* base_ov = (ioqueue_base_overlapped_t*)io;
  2427. return (int)base_ov->ov.Internal;
  2428. }
  2429. TOOLKIT_API void ioqueue_overlapped_set_mask(ioqueue_overlapped_t* io, int mask_value)
  2430. {
  2431. assert(io);
  2432. ioqueue_base_overlapped_t* base_ov = (ioqueue_base_overlapped_t*)io;
  2433. base_ov->ov.Internal = mask_value;
  2434. }
  2435. TOOLKIT_API int ioqueue_overlapped_get_type(const ioqueue_overlapped_t* const io)
  2436. {
  2437. int sub_type = 0;
  2438. const ioqueue_base_overlapped_t* base_ov = (ioqueue_base_overlapped_t*)io;
  2439. const ioqueue_handle_context* handle_ctx = base_ov->handle_ctx;
  2440. const int type = handle_ctx->type;
  2441. switch (type) {
  2442. case HANDLE_TYPE_ACCEPTOR:
  2443. {
  2444. ioqueue_accept_overlapped_t* overlapped = (ioqueue_accept_overlapped_t*)io;
  2445. sub_type = (int)overlapped->base.user_data;
  2446. }
  2447. break;
  2448. case HANDLE_TYPE_PIPEACCEPTOR:
  2449. {
  2450. ioqueue_connectpipe_overlapped_t* overlapped = (ioqueue_connectpipe_overlapped_t*)io;
  2451. sub_type = (int)overlapped->base.user_data;
  2452. }
  2453. break;
  2454. case HANDLE_TYPE_TCPSOCK:
  2455. case HANDLE_TYPE_UDPSOCK:
  2456. case HANDLE_TYPE_FILE:
  2457. sub_type = base_ov->type;
  2458. break;
  2459. default:
  2460. break;
  2461. }
  2462. return IO_COMBINE_TYPE(type, sub_type);
  2463. }