From c0e743a6f409b372955596ccfbb106dac754a5ed Mon Sep 17 00:00:00 2001 From: Chalith Desaman Date: Thu, 3 Jun 2021 11:05:22 +0530 Subject: [PATCH] Integrated websocket communication with hpws (#3) --- CMakeLists.txt | 7 + README.md | 4 + dependencies/bin/hpws | Bin 0 -> 43656 bytes src/comm/comm_handler.cpp | 80 +++ src/comm/comm_handler.hpp | 21 + src/comm/comm_session.cpp | 91 ++++ src/comm/comm_session.hpp | 42 ++ src/comm/hpws.hpp | 995 ++++++++++++++++++++++++++++++++++++++ src/conf.cpp | 80 ++- src/conf.hpp | 46 +- src/main.cpp | 113 +++-- src/pchheader.hpp | 4 + src/util/util.cpp | 31 ++ src/util/util.hpp | 4 + 14 files changed, 1455 insertions(+), 63 deletions(-) create mode 100755 dependencies/bin/hpws create mode 100644 src/comm/comm_handler.cpp create mode 100644 src/comm/comm_handler.hpp create mode 100644 src/comm/comm_session.cpp create mode 100644 src/comm/comm_session.hpp create mode 100644 src/comm/hpws.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index da3a515..d4b7d08 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -13,6 +13,8 @@ set(CMAKE_RUNTIME_OUTPUT_DIRECTORY build) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-unused-result -Wreturn-type") add_executable(sagent + src/comm/comm_handler.cpp + src/comm/comm_session.cpp src/util/util.cpp src/conf.cpp src/salog.cpp @@ -22,7 +24,12 @@ add_executable(sagent target_link_libraries(sagent sqlite3 + pthread ${CMAKE_DL_LIBS} # Needed for stacktrace support ) +add_custom_command(TARGET sagent POST_BUILD + COMMAND cp ./dependencies/bin/hpws ./build/ +) + target_precompile_headers(sagent PUBLIC src/pchheader.hpp) diff --git a/README.md b/README.md index 9c8112d..53e78b2 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,10 @@ Code is divided into subsystems via namespaces. **conf::** Handles configuration. Loads and holds the central configuration object. Used by most of the subsystems. +**salog::** Handles logging. Creates and prints the logs according to the configured log section in the json config. + +**comm::** Handles generic web sockets communication functionality. Mainly acts as a wrapper for [hpws](https://github.com/RichardAH/hpws). + **util::** Contains shared data structures/helper functions used by multiple subsystems. **sqlite::** Contains sqlite database management related helper functions. \ No newline at end of file diff --git a/dependencies/bin/hpws b/dependencies/bin/hpws new file mode 100755 index 0000000000000000000000000000000000000000..69c98f5851719e0fafc60ab09ac9710fc586f9fb GIT binary patch literal 43656 zcmeHw3wTu3wf{*V5Yb_RB^E8p(TakYm;l2=5D5?*6eJZ;tI`mX35kSEI+^gWimAgW z<2ahCSgf_B_GPWN(yA0uASe>F_g1vGNUhdfZFP>RZK_sttu_DOZ|%KjP96hX?!Di4 z|KI0<%-(D5wbxpE?X}l_oHKK7ES*0$J1fgF*8ppVCD5q~GgB9A{Jx>EteMsn>o|Ol zw7y^s0(v0+X6oYJX_sbg(p*h904eKD$ySKo=}1jyQllZ$99`CL@(qnOsbvd-^{8vv zeF7r2C1cXO!3sZ9rqhK+s41ey1aKf2W7lWN?KlJe+pMI zKj#hs`=pB%+A^uB_a)S$o=1D-X~N62yi9i4ss)&ocI0(lUG4Ho6VI!wDyXZiZ(3Wh zcFLrJNfQejgN5Uz-Q-W&i!NP4a*O?~O(EsOHIC&g4}JIYCDUKaD;al1-OQ{__q_7Q zH;YKW0Dq?6tQ_V>W)pWQ{ubiTH!IvU^}YDW@q0fy@zDo9IuWRh*TtybAHl8+`0qj4 z{^;wVSbzBEGSH96KtCV@-jhKe%C!4w-PIZRype&=%Ng*SGU$I+2KxViF8$eIUIu+$ z&cJ_f20m|QpkI>#e>(Ws`0IUrKZE{{WWZkl26k^IfX~Q4|811@XNRR3^c)ZSoY4>e z)fxDgW#IFR408XHf&RD*{qj-<`t=#`!!q!>HiJHO8TjnVfcIp;kIR6+C4=2cGU)kS z2Kt|7z(1Hl?y?N+U6O&%g&Fu-!*+rFM12> zh04&nhH#K;gyOH}-2Vq;^S(QonmMdgiw(CR=4%)&LnDr?cA`Q@`0Us>MN7$~m{ zgu=DewUrg&KzVg7bY!7YfVsv%xV$o0UmvIpvxQL0RHh*2Ya7dJ>uaG0J{xNSin}yr zVOi;=LcwAvB;+f=P+DP8FPB1;R8d(OXmAT6a(Qh~YE}iyYbxri8fz+6rnO)b%0jiP zq1h#Yb?Fuu?W#k8fOHwHG0AEShbk)?VYZOf7_3|w2n(iRofTMH8@6h~!TLJ+Y_wKX z)S|&5rD}CJ)KpIsS;2;IZGDy1pzzun!wdbyayK37p%rKe{5oXS1}np876h+qTwyge)&&9$R%KnV zk>-K=)#9twmg-GtQn|K*?om-!dtHD+Rs~mMER=I3>0z^~qPCvOK@c>qUr~;ZfFkvp zlkueD^5XI$>!SJdX3Z`iUsyabl}+cyPfj7n7y3BTv$1AR|HH8E0kK3;C`x_~6pC!@ z&)jttHcij6fuI;9^vo$vtWR2ZT_a!N1`B_KqmJAk@SkNdUfE1Xt->~Fpw)!<$14l@ z$3VdHGW^~BjeAf&*ovT>xZ$<69`5jN*YGV;o?~s+aMLNZ$Fb$#Pw~sLet>$c%S*}2 zPRTR-NfRfn(-hs6I<6XgtKMJdZn1^H;0?T`7yjYf6`$5#_(wIqy%&Cm##`U($$yW= z+r9AnG=63;{I4{ASugwx8sFRte^BGM^uoWU@vXh^Z)tpcFMNl_TMyV4FJqrhjkkN@ zKhXFC-&OS73!~#w*C7|)^c@S^UHJSIwPht;_z^CArweb+CRl9d%^c*pM>Ut3ABpFl z)m&zNB%Xe0u4dgX;x9Ewq?_-z1>*5UmgLmO78jml)Laj`@Y;2yXsZiv=4=*hci}19 zT&*s=`+Q-)3!geNL8SvOyy-LYI^@C+G0c&+yYR=m@JSc`1Q)*3g~z~6U0p8x&=eM# z@7MxSN3Z16N3IJ$%!SW$;oa-q5ib1aT=cdJ&od2kjd9^mGDxIFF8t?R_$e;@$u9g% z7yc9%-tWSH!G$k#;dO|S(kor~(_Hk+T=-Ege2oilyYLMz{OKP};N4xNwUHG$H_$@B{*)IHpF8ml5eya;V)`j2h!k^>9x4Q7-T=@Mi{JAdt z0T;f&g+JuN7rOB6F8p~eeA0z4a^X8&_+l5n%Y`5B!dp6C(f=p7@VPGhL>E5Kg_mW3 zE*s&(Pjb=QF8pK{evAu0#f2|&;itOrQ(XA-UHF+U{4^Kd@4`=a;mcfjp5>V9N*8{H zK_Xq|!e8jZ*SPRAUHAqUzQl!J>%z}+;hSCf*)IG>7k-Wl-{QiTy6~G___;3p78m{^ z7ydyP-tWS1b>Zi^@Y`MZi(U9u7yc3#e!mMp--Y)_{+#QN=QQ17TmEQkIJ^6RKe9LX znG{90@5HEWb&np7Pw%Li$Pun#xzpZ_YxEGpoZ_4V0%sHE)aJAb{P70BoYI`F0>4Li zAmJ?nze6~OaErii5a!h7GzTwzbzKk%JIL$SD(e7GX|3&P;*NC(J3wDH3=*;gN)GfzKh#smI9^_zc3Q5w-+Ah43iCo&RL} zPb6#;ZWnk6VNNa10fDm#b4qbq1^ze!m{WXmI?ekVNMNBnZQpI=9J*f6nF<=P6bYpz`r2ODZsG>euywbzmq5M z1B4m!9ZTSE5oV}&IzN&AC(IDK4Fq6~8+#_TpFku{ZpYKX#Tao}b7W zvdpsl3C~^SOfG0dBsjg8vY5`QA+bfo^NUH;gg!Y3o--3c0oJkQeZ z_Uc0Ks13q@=`)OSX&Ch#IJ7_EdjrW5|JYAa-@oV60sdI0e^2s4f7U+#i=T!^fZ;sN z@B}i<#IN~+J>X+Co#&5SFbWmG68fCrk6&;Opri*DPWth&FXvv+Wl^1GD)#;}V2SpS ze5q3n3g-N=8dAAMB z1|nAa0SJ`r+r}T>3Hb}SqEZIgX$k&F;oXg63f$sU0LB_{zz$8mT;a{dMLEH zBdbLfE`3In`TC!?BCf=CUh`&mw?FoH8Imf0eBfvfR)4IttGJbS@2uSnmdQ8KxyfB0 zcX!8f=6@Y6pMb#@%eeqqe_J&9$L?+rG25BWwx_d)(%Jp#>;aQaL_6Px4!d~jSJ)6( z<&PEZo0*GVjCLl8Ul~|OG_)#$h%QsDEo%KKB{(;o&C}Vo=w7r*D`ulsGj)t_Vivp` z-spdT=u@PVT&t$r7i~qOt$oo>MUi!M`1C6l)3S|+%_acjEngX!#+barz(+mg<1OlOhDRwABkhJdln=~SgR=nu#%p(b zBtfQG&g<|2mCts5gFFIE4LJJ~o5nyx#-nS1jJ0hiw`2~yErKCBKA!*S4WNqWED~bR zUQ#7|j+=N4M%`|%kdnXt5Uk=+;TUNh>W@j*sZ{d1ZWMX+O|S^LS!_b{A1%7pyCTxlaDsZdype4ZUSjM|LyD1n0U@~rDB0n@g;YKVJQ`V`FENnr)xc` zBsOtGvi)w_O@{dtVScM({<>7~Si1f03Xef)W%H{P^ZOKY4s9^cchgQW%zuwQisu(7 z<~J$kzaeG9H^N=viWKvIen8DVGlW(Gf!lKB=r!kNFje6Ls#_h`f!wRf5h>i{MIi6E z3WoUW4PpoeB#5rP3K|$I--g!E@g~IcqhAA6Jm(c5_FS#>Y;Y4l0*1;6OQ2^9Y|CNe zkIPgMk8<9a7sFKL*+y20{5jwOpU4Gqq^sK-{TUV{k)1N=ZiJ@~R|0F5z*CgK?k~Hv zn?ZqGYbXEszNwnM9#wyLo#gS z+r-ctTARSQSuy@C`z+!4kgZSnR=OEKgx=lFDKS|F#&MYhqOI#tcaExiud4fws(YTh zZawRAZJ8V{N@)$^GFQMv)Zo)~(zHd2%RtpM=SnDI`fLojaK)Vb^?S66aPe;VHa1;t z(T!*nugFfBVVdSwV-806%SJ4#L7DfLt>}^{Dptp$@1bBf7+J}?;Rmtk2aJ=1)kKfj zU?tB%@5O{|$K{&W5cqWba!_c;jEmMK z+XoUAdvOOhfzmJ--~&|PazfPJqbPSY3*oO(+=ND}P7Nw8_Ofpio*bpcRc zswOW4Q^m@Nc^4SP=WNC2dd25;dPTx_qMOf`$me46`KS}hZI?T>O%1ee=Uod=j8ZoyGbDO< zBJ7a`&D4x}00N{XsGPhBO^HSS6+IQ%$IIrVwg=s7V-8iS zW8Q(3QgJ4()*?IkX|1Dj`Bd%|SVt+h1%_*jUXNn!B+SNTjhA*37VW}FqV6xC0!QT; z$x`+#meM&-t%ZFPzJGI%$IY~VB3h<=X$Kp=i_1MF^;Vc9+S+uMp@_8Rq7A*Ns)UMR z_|qmx!2GWSJ?s{A9t0_U*&ly_mA=oJKD~ND_E367*q+WN)7j2+wkw^rpga|iM;9X( zi;lo2BD&;9;uG3;b|8sGTTuye_9IbdRG3(l;!r?Qx^g9uLDJ7CqFGtlD`Xy28Bw4bxMQ1Uvg3Dl>z4t5Rx)k2Q*A2+VIl~ zPs1Jo8I(2CM6o^b{Ao(N7CKqNSKwBw4m7(lr(4MrDM0Nn;xqFuXa;UITGstlREp;; zmb#vuOCUDkdk3aW={c4xt|5yTu;5Fxh&`?I{Q(t%YL=3PT@1zTY8trmTq@R*J>X~Z5>15;G8UU(i!+ypyq)Ks1N6X$JoUQ-zK*vEs%gX z$IUd)2aGFk15I)ux(Y^jdHBzWeixi$QEqb$^^b@?C}{3^4f;Do?-n#Sz-`gH>3q1H zKj0%SpoA5QK#b$`w~lpw4~@68oz8fyQE=G7&0;Tlf9xQ7r$iZDW_8y5UaDr-MSVnQ zMyOdLVWK3~`UIuyz)WqaDDyYehofz0K*y|%q`OIGWip?WVt${CIk*aW?Cxh7)gsa? z#6>oCfcN%%uz=^V2#mSYPsJ;Gtcq97PynrY6SnfpsN%BdC-vyAc+RKvl7#1xMX*G| zclQD$CSv^t9kja&b(8CO^z=VDr>g50%BF}0ig=KgQtDI3lPNJK zXSB)JB;S_wDO@g+EP-nD3AJSPB;!KUb~g=~Ol z0H^zeugE=+CV;JzQ-T7mx%ZA?2#3jOXnri`xmp{OfGk>N>!!vono7&b!YUS>iM;oh zvc-$E{{y>!ZuZb$(~)FZ$jp%p_|p(pc$gvHKTs2+p+%tCA^SoGdbZ_SDiD1YpOKx9 z;nNBaj>u^>;x^#{-a8h|#Qc$SI&_o~k4mK|zy$v}U8QM=I^gEuitHpWYaNX9KJHV( zIDYs#jPuc@w2Ei2*vR)e_i#E1_H>dLrYbjG;&Q2V|3O*a=#waq=YLo6`~Y?b&u10S zJ0dle%Dd@A&9IS9KWo_5)VmZVhEP6Nm5RFjF3uKTx z0OjNeLTyN}yh^V(=Ee4Nb;)9LWN_^p=+)oiBVH6w2xWf93bDgX=v;!^R){6k*C={2 z6piP%DVmY0T@zHhW~+9+JN}I61NuprOqG$cdZK}~VIWV&rRRB|R>08Cu& zV8yT&7S94KQC5Z5-?xK5qgHQQ9}I|~aZ1o*O3-s06$#&mSW~40{U=Q1{0b*NV4S43 zP(JVnH73sO8H_JQn-J^JIq^;W)HPJNeGzT^_royeFJZ!jKsT@EkGlko?(JU2izgK0mcRoZXp?B7ByeB+&OXAxz7m(?l zz0lsdXBseCa}{n~COqRcG2w>J0)`!leBS6=08qnEZ)nPco#OeiEIwM(XGm{ zS~!=>-gXE43etI&but)Zr=7<$v%gi^)OW7Z_c-ZY&uOZYhq?6~PJQ1xA3PG9wty=_ zHJLwg3fS&ayxWp<=>*CUn-$}C;Y2XRTgnj6Dno29r7AiMJ_*}731R%WVw~XpNr>s> z?o%BL&5Qj_MKWxxcYw(*JO!R+vHkaSoD*P)$WDG*>phs0kh!26lgMgcJb#GNrd(;$ zrL>u>v^mGEO#!rNaXy{O{Rlbm<{LJ#%*n)kh&ko1FTus(NyNBl6GCCC$kZ*{6q&k5 zfxD*N?9Wncae8w^S6{&%;}|6=x?gIDs|zB7s~0N;^~yp~9@f>HSs|{3XjfkVI1fTo zD~nHF6<3E%w^i75o#Wy0=;7NK5fYxCNa8y<+toe4gZ3@XLsNj#s(bIq>xL+`l|`=G z`NB9UrT5}Kj#ovEF@Pe;ECjB2f*zk591@vddWF57OPZbhw7j?f>Ku!=w_)%Q?rS3{ zGJ6a;Zc+O;j12DIzKDik6l8N0bS$&kl$r1nH05MYBneN6B);R_O&N-&IPH@uh3t#b znV7kGW)7$a`@QfI6DMrmYuum8YknnYs+f*ny z`w$4RHzIvxClib7$yOF)|D)QnRJG+@Mhe_JDM4b|@+J)6JU$7SxMGsFzf-K;l~^;L>BYg7M-2syDMSLg5;ieT`7%XB+D)CC>a zbT*(d6i&}FM(|G+`p|B&_!qJz3#Mrn209L*SC#pvNv$>iYFNe-B*MZ*%sKW&f9!ex zSUfY?HF^!KchgsFTJL2z1e!QgjhROnfD*n3W+E}x`yrg$S>ppn%Srg8E;{C0=vwX+ zM2txHkA2$D+oFD)ovGlgO|EA6pRfYLDO?oclri>+QShr>!|}nMZlNVr)P7^BP5mY- z{azH?d%Q}&Y`1MptHJCMm{=)R4$5hzybWn`&(7$gC(28=2o?1)R83NT$fV9IBufVbhV<*zCox z!r5Tc_c(MC9xnq;!Z&?}%cd8CZHtpP0Vu7yip=|DZbuCL2Am}wLuJB;(^xxn41FCv zYGUY*ueK@l6Z9v9{+3ZQ;rXW|zBaBxjT<}> z*=*4{^ibR%Rz7(f6VsG#QE=xLICR8IW3ly7$rX{Kvj5209MzhA#iM>XRRU6a2%UIDx@73EJ3u`lRLMmw@M}Yo-Ar zJDFHFo{St>Vym_P49IGeV*0Bo;B%cK6@y5s;vqU#AW2+dva}n>YzyD`99#iqqT00R zgP`^%SOMJqM~n`S{6_0ZQYaxcBsvHpLv&d8?;uWXQ55B2qr-kyKsa+nhdw7lbD)Zf z@t&utJGVz?BT`8SZGLr`K%$LNsjw@+nr|Y4d zb1C+IXw5Qgoi|4L^rbd?q}{twu+2M(&0ga{0Uz1iC{Ju^s@CM-je-RnL1zI*f1$-! zq9;>F)so~R_gUvSxF?DY5nbmJ}91nEAI z0dphN-3WUF$F9XaRPQbK9f#dZtLrJz_$l!qn2yT9(;jLZiT zC7?Yv2fw9{lwQV4rk4QT=oMe!sWT6x=w6N=U@srL?@MCBPP$sc<8UdE@Lk5l*zVbX z`(N>~^-4x^8`$^k{mf;~`ce;LHZ=@i8wL1)GDu&U}`r@E>xN7cg&<8eJa^XM0+m#8H| zauL`ct%-m9t7A3syQtYy%F&wm^9x~_cupN$5hm{DoSEou; z$NNY*S_?nEKnyelo(K!y&r_>}=Rrw)zda9-apFVI!NLz=1BMm3jvDE2hZw6zM-HUb zmno)?v-c97xzcUE&y%U)b}P6Y#_AJ~(dxffGLl~d`<@<`+3GLsKUS;f_mOh6R=;pQ zEQ9+$@J?9$Y%U@ap7E0S%DGrKc3KYQoQYpFR!68#d3f|uEYRcmA6z1==F!m;o>vr) zpYqVj@HmYP%sO#s!GVz?>!1-X*Y92l%JYSW>_Z{8_FwXbAXJOPv8zpN-ssZ`lD%4Z@b+AQcmNv2R!pel@(IICm4l5NhNloQ*}gm14U?V72DNqf z?`g*)>Zj&RGpGBE)#0gY}PJ|BQ1;?KZjOxzQUPqaf3sp{hMYj=nt)<@IXM zl%DO`(_VxVCp7TpREx(b#4Ra=tq>bih!F}AOCjgYAkhGjUcSXgjRb1+)R(4-?30+zjL%4WxwC#1p}NjvuAsDaClx@^al_F zUKTr-`5tO!yVYc=OHG1MlL^#JIh*pn^%W_EJ^~yow=;15fboI3}53BU*~@8{qFsVDYGA#0-%Yn7MMS*`Vhw__t?#sUf%Oc+R5hzr?es3qBq&mFw7 z8+8~{ND6C94T&j&$PiP0&??%rQy}zbRrS;hbl*=T9z5ysnAiKP5LZGn#grgaQHOeC z=hzIY>*q>L>E@<1;Tg$blJL#L+BOwa=7X(s8n$dk^Byr}fH(TmV3eun3d9b0Du?Qg zi?%Sq7dJ$8b^lBwBgtHpi=oFwrE6l*8bv15(**L~t`3dX&$cE0mm*1c9#(PB$EO!e z;D3-7&b;ad;Y^pEnxUL?*~!4*t0}Oz9HQ%?A-yG!EJ(jd&d(lH!zGDq>NtJllfd_T zoX)y9PQU(m$mnsLE(LO1Rk2rx=SmIS6K$rw62D{ER_E#%nW~WI{dhJMksz{@pF@pX ztSGVR7EjST5}s#Ob5C=x2fnfMux?RyjBat2TF51D0{gyw-2ajA!KaSZEk^c{a<{78gV zH;uz=p{7#zSu{8LNdp}zzKOjI8s%zS5wt3V;HQa%>%Ry(9Wr{+sihnjCEy|I-Rs}; z&I}x1V!42O=JL>@zO}ul>q7jP2Fjj}yn!;#58iG2(-VOR=JeW@#-+sZ&NLSp5W}XH z$Rm;d*vE_o|HSCQ{UbOc+K@n$lME1X`Q&$<$0B&K z%{UPJDp;z+G2AMt0-&gPC**QM`3*CqSD&4UB;h$tb?4b*T;2Jt9T1i|8XoI8&ZC?o zV1YQ;AFbQJ{sh}Cw;lUdIczi>-ABsNy8Xfn#6Ul|P~3hpkHQk3S(5mIqg-|hLAe%Z z_HdwZx`2+>)0g-L9-X3Z;(j0dg+7;Ye*gHFAQUg>6oy-htReF|bMS-xS>$A#9_cF8Jk3%EIfZBYr-F*s;-lI?rtqR_OPz-*T`}w1O}HY^m*=X15{B@cg#57rYqb9kAN!- zx6wHXPq8GvuZ#p_T%iJbv^e9jl{2cVr#qYybf`J8-@q3_K)8J8X{y9H@1|C4U+@K( zR96S_3UT?|lN_;89)vJp-bSftz}$_( zK!F>hRK10S)gV8EZhX*%x5jbr{7KW5n z(h=Qi^P^M+BAS(3F;Njl5Ep2Sd^E-yMba+}ihNREES`aE{z;E?;L{2y3Bhd-u z4!W>1^ zzXv1Vjd(m&EWcpbM;bM`U{FEX#$+&x50Pfj#_vM3fEiB62mXDY&z9;TOY z?Tv26RA2~t#cSt!lqnSlxm9dPtB4hWH zm8xK?HvNtS3D4=OO&!Bs88A;y z4DU8zu7H~qh-F$UVi>vTJqr%`zTPdrZgCTmjrt+Nous&7O@sN0mr%M+j~m{eU?&S`+7h)iu_cxnn^ibhOr2;g7p?az?rH2a83F{m#ZPH4P zQEQYm!d45FGy-d!4|`O>D(9mf2sE@Sjo29dPX*|P4(Zv@@WT+_@4A@>IO+k;9T+@_ zvGNq>Uec>o^bqsRR20XfIjyVR_Gci-+wsI=75+9H^-PAtLFYo z)ZBmkiIkzgtTPl+oOX;cDBFJrJ@wN}dS^Wymxs^O9n=x<0J?>^zhZ-9Km2E{3bJ9afD{LZoNhjILrYaj8gBF05EXb|8Yt zuvDC^)YMBwMS0kz;?1lOS3)vvMzf)cS}Hz;s1A1-RwV9nCpVk;%>zk%uksFpahKOo z#rZimoLY0WR8;#%9Q(6#BCUmLIA;DX&Dsaq_O5L2hApsGJb${9{|$I8-1Qe+oF+VX za&AfZu5rsRr~G2bkLTyAI!o#4ePjMT?gj2x)Qac7d@d;CIp-=NTa}OkCFG-_kYyVA zF|=!OeuYJ#zDXI8H4U7QE1fLw?IS)C<0k2eaI=elNIQBr+=&{_8Tim+DTX6bbB3a5 zc(M@#MGC%(zULnVAIi~M*Cj}E(fatfh6cr1pSkW}KFOodQ6?NiU?ew9_ie!#eGY ztbqGN6q3nl7eE!|w1dYdeOmWlVnZHL`Or1CBTP2@TU0-Eu@5-Tgx_~NV8Wf~!E?3Zg#b+@f*UGjOV2g91d`{)3>*rb2FE*(! zr|%lJn41o@fc@(CARFIQCP!Qy6L8La0EX^|bS_E1E~H#w@8(DH=MRvUHKMKSJy<-E z(=7<2>$hnZa6)czl3Sz`O#nIW`%qAS=TH$oLHRo@1UgV(MOpYwkJm82z$sbOf$#7z z@A?6swI}sLp=Jx@M1&B*i}VB5x3$1~WJsdCjGWDK48P|CQ2%7sT72QQ6%{~q379CY z0L&18TQ>m30&r6a;7kBwMt{7itI9uNqWlVUI48LhR(VeSUUck+L-_G(oa!&Zj~Fay z^~XzF{nJVhG!Bu9&V|`i|Xr(!Q2 zhg$)?S=`ctg{~lMvLo%&Q*0m~S`5m_W@c|TtN`jr`-QQ7qz+4yM@l;fz!YqMr@m#v z`SOmeZuXeQPkq|EL4Ha1@b=h&4UMr!FOFGF7xuAHKZ>n&aY&nLpLIb;Q=jTbYvMEA zJWZdZP27(b?_ngFcJ$DK{kKotV!!@j@?MTMC)T>L^eK?hr;?IJ+Pl!LcnU`Q7GdRY zop8x6Z{%!@)ecxk+%XAv^lmHiZd*bwgm_C!{Ge5ea$SVQzmB~J-;0zcvxInBX=mt{ z%yDB_`jph&So(Iycvl^NJQvL01{L}F`HNvq90w59=`ViXk6&d6KS%)v>V@|HPpP&) z^vjNUE=B$utpux7tU@Kj+0hoImzY@(z+c z{~VAE-pA1PZrCpVz4`t8`LU1Z$3C1B`)f(}@K^njXR`cLUu$}Y?i>tYtX|h~y{^GWxN>u|4|@$6;JOA57;yZc zL4$H~)0dT%Js>A{FtivrsFx}Vv?M{C(WQ79J+5s0b6;u5ol24oG;qG*G}52D%SED znnt@0Z$Gt`)bnMxbV>DEQ!P+gpfcE0S7p_|&9|JUEC)9%k+;m+b-`eREiXUC$YF0cgo2fU#zwoU zHh?$03a(t<3ri7w$Ie9y7ca!fIoHPPc^6)$22r}{6(Xd+dUI1F7B5=0FI_mNbbiT~ zrGcsrx=Hm=p)Id)J%4l+MrU97X;1HAX7;Oa!z*xY#qnI5Z)bINBi;vV27F(>y)0>5 zj$Lb4R~goDN9_~PR}U94^r{NJ_Bds4@d((j7gxNxwr9Tx-Fa1k)#o*ZFpPq$Ts7#t zc8@X(FW0?JJY6eM0B@_nD{$F=fwh%^Kov%jhL<;0R|i7cPx`hu-DjHDx!UeF4Uh4D zHhfjR0=Zv5$lRaWEPZ>EW2HPC#G92vf%0Oj2RI%L2NtD*F764AcLg}z?eYd;D4H5r zskar|x+v`_e08wFDubk}$+%G}m(z4IfOWjl!MkriD%-*@Fl^kztp7lOw?WgC;lSFk zOJC!@jdlGLa*W{J(Kg>$W6KMw-5RShytPIZK|i5QnIcgC)uuob9Hl9|rXo~jFJFgu zW|Kz0gPOzN4h6zZAx6jgz>11+E#6H`w5A}x)N7~haP2D4fm=W0QFSwYCm2FBsB(MJ zn!xgOv0a0}RKFUphSt`WcPH!lyeSnt z80zaQG5cMFNoaJV?O(jO>^!_R-800GnU6u!Gh|7_icm#W;Cwq}CC`xAYQUj6=i9p6 zGh|Vqvfzrq@U9=SDkcm2NqyI|%;-xNiYaj^f zamK6)gz+ABOipN4+D{;=B8(;8irPv{dw2;rX1pdzFd?UU%j(fB>ABN%h+SM%Y%jw4 zpt6P>%0fZRyurF-?;bVFsvd&A>Gtd4j!@kYtjCH~+fe%v8UUw(my7c)53CDfHZPv2 zeHTj`WsiQw-xZ6>%Pzy5JbU5%a+#9tWuqIH4Y6Pa%uUBwE!zP1jof%Tv)54pa;;s>iSStTmP;jpqYs}5D5 zLFkTk(q&)<%nGi=(QmWX1y0xP%uHX0rMvOR;_*I1uh*RvPUotvZ$hjJ)|34z2tmXO zu3^v3#n;*dS12`SyXtX8z}R~pmR7o6Q+*w$%!W{4wY>(5NYu9GF*ad-4q|0jWfrat z73=DP2nWXF)g+6q!rHrbRnsa{ze4xz^8>k(Hfm8?g;d^%F z*~X@57~*uAD{J&!#~6izscI>nQop9*`~qG>Mi-vbI7G4v76nqYhRxZ*!^%(huMAW( zKt)oIrJ6B*%G4&Z%6bEC(f>N5Z~L0+SJu0i7{=f!&6kZnr;r0a1EI3BJ8rIOs6#~M zbZ)N*a_mTGx2#Qfb$5RcfB%BNQ}6EXz7Bu)s^yRk6IXDp0+mruN#E zb*t)w4POm4hMQKeS-bAKqT=xrCi*5#o-%dZd9+T}7Y0l#LyG&s*156n?r$S~IMLm` z7wJ<->yYldy}SEWq>Jw8?&eLOh|}FY5@&#a#!da{NbBD1?!E@;t4M!<6vvlxWnFif zm9;i6>kB6g&TYvWoJagf{Q1EbUv@}c{nK04nV=y3#*KJfcOWuD^X3j6ak2OKHMz~! zg{MqAXTq6y#!JbYgS7p&?ruCtmAY8>8vMNhc?n`mhUVRzJ=-%Vdm}Vbysrm(A=+1T zm8*QfxT{FO`rpQ1d9u5^mPci#{(!1%Q~ngt555Ea(C&2k*RohnonFOX?w`B6B|y#^ zns-O`tf3=rA5c2fP7Itibj)oz{-H%T5AqM45*fT;=*-a2DJ4UTN`{V^HPi+j=w}Vh z<5$uqgmyg{WWYq;z_TXJ~FTrRVQ$8(5sU8&2t9_MAs z|Nd!rknm=`D9#~(t45yy;@Mrf4(M`oF!;y-g~wx9a%uG}JY&-1(qvZi{kGuE+WSGx z*z8;IRROv1ke6H!;%jKU489DHeep7SU#S^!&&A8g*XB>r{i9p859XArc%od|+logr zd11#Xu+g(q!y|P0e`=WfhyRVMLHA2<-Z$$8-J{bV>2#Y;_v!SYPT$h$hdLdk9pfaO zo~6@CI-RT2%XM0<)73h?S*Q2t^hY|~rqg{oJ*d;Sbo!xA2WiJUNvCJ&bdpZz>hyA* zR_k=NPH)!fJv#l7PPgfFpH2_z^evr!sMA4uAe^MrvvfL1r*n0BxlXHfx>~0<>+~L- z{z#|Wbh=Nc2X*?EPCwM?AU)7d(&uSn8d_1`RJgpUwyvt6w#q6b zgrnjbtFUTaedD@SDh-EJ$!c{=s|QeyvQVI|f)#YGp)O33wfID~a77Rqxz$=&4J?R) zRTbe1t1wVgE;k6uYpTG+K&onaMJQCU&XO7?&svlP2BKHRs@h6qgxB)MMyn9_4|#(l zlj;6Tc+SJ@Q!~F#(HWEW4iBdPjSxKiD08i;zgTBXY8yhG@+i@LoeqF=wW%N08Izvr zCPuz@{fWSRh`H9(H|r6TvYeM$pO|%k!dli0WM~6Z->g?mx>+wIxa?rtO?^FGT62Np z8o<;y>mQSv^$tkn>TSPEfkmK9>2KB#CVfEHGx{4jCcPTvT$>o!tWQkZqU%$C>S^@X zXzN;JXj4<)td~rBr7ke?O?}gTdLir2!XGcQ{z|2Ky~g@nqxG)89yr=WlUZiHYErYV z1BqO{`+psU_K>mC9qkL0;&K-)tq>B}p z`s^+L5tO*=Z`SoqTEjxuW$b29ParcCe`XzR_9JEYDT?0o1~%!lAoIHFoBhg_M_K{<8aQK$f*k*YDk5s#Hki;}7^WV~Ys0E^jdPwPB5a8@&zRji}?6Z}vOQ_mi7t ztu-B^z|{CLeBK7$UEkVbE6Ob@S=Ot*E>(s2d>4P@m1_S3HXyYB0ZFVPm83rHR8D2y iLqhqz=}i5xz355H%bgn8sziOpwETa#k*<0M{r>=rrdQ(t literal 0 HcmV?d00001 diff --git a/src/comm/comm_handler.cpp b/src/comm/comm_handler.cpp new file mode 100644 index 0000000..9b920e0 --- /dev/null +++ b/src/comm/comm_handler.cpp @@ -0,0 +1,80 @@ +#include "comm_handler.hpp" +#include "../util/util.hpp" +#include "hpws.hpp" +#include "comm_session.hpp" + +namespace comm +{ + constexpr uint32_t DEFAULT_MAX_MSG_SIZE = 5 * 1024 * 1024; + std::optional session; + bool init_success; + + int init() + { + if (connect(conf::cfg.server.ip_port) == -1) + return -1; + + init_success = true; + + return 0; + } + + void deinit() + { + if (init_success) + disconnect(); + } + + int connect(const conf::host_ip_port &ip_port) + { + std::string_view host = ip_port.host_address; + const uint16_t port = ip_port.port; + + LOG_DEBUG << "Trying to connect " << host << ":" << std::to_string(port); + + std::variant client_result = hpws::client::connect(conf::ctx.hpws_exe_path, DEFAULT_MAX_MSG_SIZE, host, port, "/", {}, util::fork_detach); + + if (std::holds_alternative(client_result)) + { + const hpws::error error = std::get(client_result); + if (error.first != 202) + LOG_ERROR << "Connection hpws error:" << error.first << " " << error.second; + return -1; + } + else + { + hpws::client client = std::move(std::get(client_result)); + const std::variant host_result = client.host_address(); + if (std::holds_alternative(host_result)) + { + const hpws::error error = std::get(host_result); + LOG_ERROR << "Error getting ip from hpws:" << error.first << " " << error.second; + return -1; + } + else + { + const std::string &host_address = std::get(host_result); + session.emplace(host_address, std::move(client)); + session->init(); + } + } + return 0; + } + + void disconnect() + { + if (session.has_value()) + { + session->close(); + session.reset(); + } + } + + /** + * Wait for the session. + */ + void wait() + { + session->wait(); + } +} // namespace comm diff --git a/src/comm/comm_handler.hpp b/src/comm/comm_handler.hpp new file mode 100644 index 0000000..f2e00dd --- /dev/null +++ b/src/comm/comm_handler.hpp @@ -0,0 +1,21 @@ +#ifndef _SA_COMM_COMM_SERVER_ +#define _SA_COMM_COMM_SERVER_ + +#include "../pchheader.hpp" +#include "../conf.hpp" + +namespace comm +{ + int init(); + + void deinit(); + + int connect(const conf::host_ip_port &ip_port); + + void disconnect(); + + void wait(); + +} // namespace comm + +#endif diff --git a/src/comm/comm_session.cpp b/src/comm/comm_session.cpp new file mode 100644 index 0000000..f050949 --- /dev/null +++ b/src/comm/comm_session.cpp @@ -0,0 +1,91 @@ +#include "../pchheader.hpp" +#include "../util/util.hpp" +#include "../conf.hpp" +#include "hpws.hpp" +#include "comm_session.hpp" + +namespace comm +{ + + comm_session::comm_session( + std::string_view host_address, hpws::client &&hpws_client) + : uniqueid(host_address), + host_address(host_address), + hpws_client(std::move(hpws_client)) + { + } + + /** + * Init() should be called to activate the session. + * Because we are starting threads here, after init() is called, the session object must not be "std::moved". + * @return returns 0 on successful init, otherwise -1; + */ + int comm_session::init() + { + if (state == SESSION_STATE::NONE) + { + reader_thread = std::thread(&comm_session::reader_loop, this); + state = SESSION_STATE::ACTIVE; + LOG_DEBUG << "Session started: " << uniqueid; + } + + return 0; + } + + void comm_session::reader_loop() + { + util::mask_signal(); + + while (state != SESSION_STATE::CLOSED && hpws_client) + { + const std::variant read_result = hpws_client->read(); + if (std::holds_alternative(read_result)) + { + const hpws::error error = std::get(read_result); + if (error.first != 1) // 1 indicates channel has closed. + LOG_DEBUG << "hpws client read failed:" << error.first << " " << error.second; + } + else + { + // Enqueue the message for processing. + std::string_view data = std::get(read_result); + + LOG_INFO << "Received message : " << data; + + // Signal the hpws client that we are ready for next message. + const std::optional error = hpws_client->ack(data); + if (error.has_value()) + LOG_DEBUG << "hpws client ack failed:" << error->first << " " << error->second; + } + } + } + + /** + * Close the connection and wrap up any session processing threads. + * This will be only called by the global comm_server thread. + */ + void comm_session::close() + { + if (state == SESSION_STATE::CLOSED) + return; + + state = SESSION_STATE::CLOSED; + + // Destruct the hpws client instance so it will close the sockets and related processes. + hpws_client.reset(); + + if (reader_thread.joinable()) + reader_thread.join(); + + LOG_DEBUG << "Session closed: " << uniqueid; + } + + /** + * Joins the listner thread. + */ + void comm_session::wait() + { + reader_thread.join(); + } + +} // namespace comm \ No newline at end of file diff --git a/src/comm/comm_session.hpp b/src/comm/comm_session.hpp new file mode 100644 index 0000000..ce1ebb8 --- /dev/null +++ b/src/comm/comm_session.hpp @@ -0,0 +1,42 @@ +#ifndef _HP_COMM_COMM_SESSION_ +#define _HP_COMM_COMM_SESSION_ + +#include "../pchheader.hpp" +#include "../conf.hpp" +#include "hpws.hpp" + +namespace comm +{ + enum SESSION_STATE + { + NONE, // Session is not yet initialized properly. + ACTIVE, // Session is active and functioning. + MUST_CLOSE, // Session socket is in unusable state and must be closed. + CLOSED // Session is fully closed. + }; + + /** + * Represents an active WebSocket connection + */ + class comm_session + { + private: + SESSION_STATE state = SESSION_STATE::NONE; + std::optional hpws_client; + const std::string uniqueid; // Verified session: Pubkey in hex format, Unverified session: IP address. + const std::string host_address; // Connection host address of the remote party. + std::thread reader_thread; // The thread responsible for reading messages from the read fd. + + void reader_loop(); + + public: + comm_session( + std::string_view host_address, hpws::client &&hpws_client); + int init(); + void close(); + void wait(); + }; + +} // namespace comm + +#endif diff --git a/src/comm/hpws.hpp b/src/comm/hpws.hpp new file mode 100644 index 0000000..737f538 --- /dev/null +++ b/src/comm/hpws.hpp @@ -0,0 +1,995 @@ +#ifndef HPWS_INCLUDE +#define HPWS_INCLUDE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define DECODE_O_SIZE(control_msg, into) \ + { \ + into = ((uint32_t)control_msg[2] << 24) + ((uint32_t)control_msg[3] << 16) + \ + ((uint32_t)control_msg[4] << 8) + ((uint32_t)control_msg[5] << 0); \ + } + +#define ENCODE_O_SIZE(control_msg, from) \ + { \ + uint32_t f = from; \ + control_msg[2] = (unsigned char)((f >> 24) & 0xff); \ + control_msg[3] = (unsigned char)((f >> 16) & 0xff); \ + control_msg[4] = (unsigned char)((f >> 8) & 0xff); \ + control_msg[5] = (unsigned char)((f >> 0) & 0xff); \ + } + +#define HPWS_DEBUG 0 + +namespace hpws +{ + /*typedef enum e_retcode { + SUCCESS + } retcode; + */ + using error = std::pair; + +// used when waiting for messages that should already be on the pipe +#define HPWS_SMALL_TIMEOUT 10 +// used when waiting for server process to spawn +#define HPWS_LONG_TIMEOUT 1500 // This timeout has to account the possible delays in communication via internet. + + typedef union + { + struct sockaddr sa; + struct sockaddr_in sin; + struct sockaddr_in6 sin6; + struct sockaddr_storage ss; + } addr_t; + + class server; + + class client + { + + private: + pid_t child_pid = 0; // if this client was created by a connect this is set + // this value can't be changed once it's established between the processes + uint32_t max_buffer_size; + bool moved = false; + addr_t endpoint; + std::string get; // the get req this websocket was opened with + int control_line_fd[2]; // see below in client constructor + int buffer_fd[4]; // 0 1 - in buffers, 2 3 - out buffers + int buffer_lock[2] = {0, 0}; // this records if buffers 2 and 3 have been sent out awaiting an ack or not + void *buffer[4]; + + // private constructor + client( + std::string_view get, + addr_t endpoint, + int control_line_fd_0, // hpws -> sagent [ hpws sends bufs to us over this line ] + int control_line_fd_1, // sagent -> hpws [ we send bufs to hpws over this line ] + uint32_t max_buffer_size, + pid_t child_pid, + int buffer_fd[4], + void *buffer[4]) : endpoint(endpoint), + max_buffer_size(max_buffer_size), + child_pid(child_pid), get(get) + { + control_line_fd[0] = control_line_fd_0; + control_line_fd[1] = control_line_fd_1; + for (int i = 0; i < 4; ++i) + { + this->buffer[i] = buffer[i]; + this->buffer_fd[i] = buffer_fd[i]; + } + + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] child constructed pid = %d\n", child_pid); + } + + public: + // No copy constructor + client(const client &) = delete; + + // only a move constructor + client(client &&old) : child_pid(old.child_pid), + max_buffer_size(old.max_buffer_size), + endpoint(old.endpoint), + get(old.get) + { + old.moved = true; + for (int i = 0; i <= 1; ++i) + { + this->control_line_fd[i] = old.control_line_fd[i]; + buffer_lock[i] = old.buffer_lock[i]; + } + for (int i = 0; i < 4; ++i) + { + this->buffer[i] = old.buffer[i]; + this->buffer_fd[i] = old.buffer_fd[i]; + } + } + + ~client() + { + if (!moved) + { + + // RH TODO ensure this pid terminates by following up with a SIGKILL + if (child_pid > 0) + { + kill(child_pid, SIGTERM); + int status; + waitpid(child_pid, &status, 0 /* should we use WNOHANG? */); + } + + for (int i = 0; i < 4; ++i) + { + munmap(buffer[i], max_buffer_size); + ::close(buffer_fd[i]); + } + + ::close(control_line_fd[0]); + ::close(control_line_fd[1]); + + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] child destructed pid = %d\n", child_pid); + } + } + + const std::variant host_address() + { + char hostname[NI_MAXHOST]; + const int ret = getnameinfo((sockaddr *)&endpoint, sizeof(sockaddr), hostname, sizeof(hostname), NULL, 0, NI_NUMERICHOST); + if (ret != 0) + return error{10, gai_strerror(ret)}; + + return hostname; + } + + std::variant read() + { + + unsigned char buf[32]; + int bytes_read = recv(control_line_fd[0], buf, sizeof(buf), 0); + if (bytes_read < 1) + { + if (HPWS_DEBUG) + { + perror("recv"); + fprintf(stderr, "[HPWS.HPP] bytes received %d\n", bytes_read); + } + return error{1, "[read] control line could not be read"}; // todo clean up somehow? + } + + switch (buf[0]) + { + case 'o': + { + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] o message received\n"); + + if (bytes_read != 6) + return error{3, "invalid buffer in 'o' command sent by hpws"}; + + uint32_t len = 0; + DECODE_O_SIZE(buf, len); + + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] o message len: %u\n", len); + + int bufno = buf[1] - '0'; + if (bufno != 0 && bufno != 1) + return error{3, "invalid buffer in 'o' command sent by hpws"}; + + if (HPWS_DEBUG) + { + fprintf(stderr, "[HPWS.HPP] read %d\n", len); + for (uint32_t i = 0; i < len; ++i) + putc(((char *)(buffer[bufno]))[i], stderr); + fprintf(stderr, "\n---\n"); + } + return std::string_view{(const char *)(buffer[bufno]), len}; + } + case 'c': + { + return error{1000, "ws closed"}; + } + default: + { + fprintf(stderr, "[HPWS.HPP] read unknown control message 1: `%.*s`\n", bytes_read, buf); + return error{2, "unknown control line command was sent by hpws"}; + } + } + } + + void close() + { + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] close called\n"); + + // send the control message informing hpws that we wish to close + char buf[1] = {'c'}; + + ::write(control_line_fd[1], buf, 1); + + // wait for the process to end gracefully + int status; + printf("waitpid result: %d\n", waitpid(child_pid, &status, 0)); // add timeout here? + } + + std::optional write(std::string_view to_write) + { + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] write called for message: `%.*s`\n", + (int)to_write.size(), to_write.data()); + + // check if we have any free buffers + if (buffer_lock[0] && buffer_lock[1]) + { + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] no free buffers while writing, waiting for an ack on control_line_fd[1]=%d\n", control_line_fd[1]); + + // no free buffers, wait for a ack + unsigned char buf[32]; + int bytes_read = 0; + + bytes_read = recv(control_line_fd[1], buf, sizeof(buf), 0); + if (bytes_read < 1) + { + perror("recv"); + return error{1, "[write] control line could not be read"}; // todo clean up somehow? + } + + switch (buf[0]) + { + case 'a': + { + if (bytes_read != 2) + return error{4, "received an ack longer than 2 bytes"}; + int bufno = buf[1] - '0'; + if (!(bufno == 0 || bufno == 1)) + return error{5, "received an ack with an invalid buffer, expecting 0 or 1"}; + // unlock the buffer + buffer_lock[bufno] = 0; + break; + } + case 'c': + return error{1000, "ws closed"}; + default: + fprintf(stderr, "[HPWS.HPP] read unknown control message 2: `%.*s`\n", bytes_read, buf); + return error{2, "unknown control line command was sent by hpws"}; + } + } + + // execution to here ensures at least one buffer is free + int bufno = (buffer_lock[0] == 0 ? 2 : 3); + + // update the selected buffer lock + buffer_lock[bufno - 2] = 1; + + // write into the buffer + memcpy(buffer[bufno], to_write.data(), to_write.size()); + + // send the control message informing hpws that a message is ready on this buffer + uint32_t len = to_write.size(); + char buf[6] = {'o', (char)('0' + (bufno - 2)), 0, 0, 0, 0}; + ENCODE_O_SIZE(buf, len); + + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] writing 'o' to control_line_fd[1]=%d\n", + control_line_fd[1]); + + if (::write(control_line_fd[1], buf, 6) != 6) + return error{6, "could not write o message to control line"}; + + return std::nullopt; + } + + std::optional ack(std::string_view from_read) + { + char msg[2] = {'a', '0'}; + if (from_read.data() == buffer[1]) + msg[1]++; + if (send(control_line_fd[0], msg, 2, 0) < 2) + return error{10, "could not send ack down control line"}; + return std::nullopt; + } + + static std::variant connect( + std::string_view bin_path, + uint32_t max_buffer_size, + std::string_view host, + uint16_t port, + std::string_view get, + std::vector argv, + std::function fork_child_init = NULL) + { + +#define HPWS_CONNECT_ERROR(code, msg) \ + { \ + error_code = code; \ + error_msg = msg; \ + goto connect_error; \ + } + + int error_code = -1; + const char *error_msg = NULL; + int fd[4] = {-1, -1, -1, -1}; // 0,1 are hpws->sagent, 2,3 are sagent->hpws + int buffer_fd[4] = {-1, -1, -1, -1}; + void *mapping[4] = {NULL, NULL, NULL, NULL}; + int pid = -1; + int count_args = 14 + argv.size(); + char const **argv_pass = NULL; + + if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, fd)) + HPWS_CONNECT_ERROR(100, "could not create unix domain socket pair"); + + if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, fd + 2)) + HPWS_CONNECT_ERROR(101, "could not create unix domain socket pair"); + + // construct the arguments + char shm_size[32]; + + if (snprintf(shm_size, 32, "%d", max_buffer_size) <= 0) + HPWS_CONNECT_ERROR(90, "couldn't write shm size to string"); + + char port_str[6]; + if (snprintf(port_str, 6, "%d", port) <= 0) + HPWS_CONNECT_ERROR(91, "couldn't write port to string"); + + char cfd1[10]; + char cfd2[10]; + + snprintf(cfd1, 10, "%d", fd[1]); + snprintf(cfd2, 10, "%d", fd[3]); + + argv_pass = + reinterpret_cast(alloca(sizeof(char *) * count_args)); + { + int upto = 0; + argv_pass[upto++] = bin_path.data(); + argv_pass[upto++] = "--client"; + argv_pass[upto++] = "--maxmsg"; + argv_pass[upto++] = shm_size; + argv_pass[upto++] = "--host"; + argv_pass[upto++] = host.data(); + argv_pass[upto++] = "--port"; + argv_pass[upto++] = port_str; + argv_pass[upto++] = "--cntlfd"; + argv_pass[upto++] = cfd1; + argv_pass[upto++] = "--cntlfd2"; + argv_pass[upto++] = cfd2; + argv_pass[upto++] = "--get"; + argv_pass[upto++] = get.data(); + for (std::string_view &arg : argv) + argv_pass[upto++] = arg.data(); + argv_pass[upto] = NULL; + } + + pid = vfork(); + + if (pid) + { + + // --- PARENT + + // Fds are set to -1, so when error occurred these fds won't get closed again. + ::close(fd[1]); + fd[1] = -1; + ::close(fd[3]); + fd[3] = -1; + + int child_fd[2] = {fd[0], fd[2]}; + + int flags[2] = { + fcntl(child_fd[0], F_GETFD, NULL), + fcntl(child_fd[1], F_GETFD, NULL)}; + + if (flags[0] < 0 || flags[1] < 0) + HPWS_CONNECT_ERROR(101, "could not get flags from unix domain socket"); + + flags[0] |= FD_CLOEXEC; + flags[1] |= FD_CLOEXEC; + if (fcntl(child_fd[0], F_SETFD, flags[0]) || fcntl(child_fd[1], F_SETFD, flags[1])) + HPWS_CONNECT_ERROR(102, "could notset flags for unix domain socket"); + + // we will set a timeout and wait for the initial startup message from hpws client mode + struct pollfd pfd; + int ret; + + pfd.fd = child_fd[0]; // we receive setup events on control line 0 (hpws->sagent) + pfd.events = POLLIN; + ret = poll(&pfd, 1, HPWS_LONG_TIMEOUT); // default= 1500 ms timeout + + // timeout or error + if (ret < 1) + HPWS_CONNECT_ERROR(1, "timeout waiting for hpws connect message"); + + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] waiting for addr_t\n"); + // first thing we'll receive is the sockaddr union + addr_t child_addr; + + int bytes_read = + recv(child_fd[0], (unsigned char *)(&child_addr), sizeof(child_addr), 0); + + if (bytes_read < sizeof(child_addr)) + HPWS_CONNECT_ERROR(202, "received message on control line was not sizeof(addr_t)"); + + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] waiting for buffer fds\n"); + + // second thing we will receive is the four fds for the buffers + { + struct msghdr child_msg = {0}; + memset(&child_msg, 0, sizeof(child_msg)); + char cmsgbuf[CMSG_SPACE(sizeof(int) * 4)]; + child_msg.msg_control = cmsgbuf; + child_msg.msg_controllen = sizeof(cmsgbuf); + + int bytes_read = + recvmsg(child_fd[0], &child_msg, 0); + struct cmsghdr *cmsg = CMSG_FIRSTHDR(&child_msg); + if (cmsg == NULL || cmsg->cmsg_type != SCM_RIGHTS) + HPWS_CONNECT_ERROR(203, "non-scm_rights message sent on accept child control line"); + memcpy(&buffer_fd, CMSG_DATA(cmsg), sizeof(buffer_fd)); + for (int i = 0; i < 4; ++i) + { + //fprintf(stderr, "scm passed buffer_fd[%d] = %d\n", i, buffer_fd[i]); + if (buffer_fd[i] < 0) + HPWS_CONNECT_ERROR(203, "child accept scm_rights a passed buffer fd was negative"); + mapping[i] = + mmap(0, max_buffer_size, PROT_READ | PROT_WRITE, MAP_SHARED, buffer_fd[i], 0); + if (mapping[i] == (void *)(-1)) + HPWS_CONNECT_ERROR(204, "could not mmap scm_rights passed buffer fd"); + } + } + + for (int i = 0; i <= 1; ++i) + { + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] waiting for 'r' on child_fd[%d]=%d\n", i, child_fd[i]); + + pfd.fd = child_fd[i]; + // now we wait for a 'r' ready message or for the socket/client to die + ret = poll(&pfd, 1, HPWS_LONG_TIMEOUT); // default= 1500 ms timeout + + char rbuf[2]; + bytes_read = recv(child_fd[i], rbuf, sizeof(rbuf), 0); + if (bytes_read < 1) + HPWS_CONNECT_ERROR(2, "nil message sent by hpws on startup"); + + if (rbuf[0] != 'r') + HPWS_CONNECT_ERROR(3, "unexpected content in message sent by hpws client mode on startup"); + + if (rbuf[1] != '0' + i) + HPWS_CONNECT_ERROR(4, "received wrong r message on control fd"); + + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] received 'r%c' on child_fd[%d]=%d\n", rbuf[1], i, child_fd[i]); + } + + return client{ + get, + child_addr, + child_fd[0], + child_fd[1], + max_buffer_size, + pid, + buffer_fd, + mapping}; + } + else + { + + // --- CHILD + if (fork_child_init) + fork_child_init(); + + ::close(fd[0]); + ::close(fd[2]); + + // dup fd[1] into fd 3 + /*if (dup2(fd[1], 3) == -1) + perror("dup2 fd[1]"); + if (dup2(fd[3], 4) == -1) + perror("dup2 fd[3]"); + */ + // ::close(fd[1]); + // ::close(fd[3]); + + // we're assuming all fds above 3 will have close_exec flag + execv(bin_path.data(), (char *const *)argv_pass); + // we will send a nil message down the pipe to help the parent know somethings gone wrong + char nil[1]; + nil[0] = 0; + send(3, nil, 1, 0); + exit(1); // execl failure as child will always result in exit here + } + + connect_error:; + + // NB: execution to here can only happen in parent process + // clean up any mess after error + if (pid > 0) + { + kill((pid_t)pid, SIGKILL); /* RH TODO change this to SIGTERM and set a timeout? */ + int status; + waitpid(pid, &status, 0 /* should we use WNOHANG? */); + } + for (int i = 0; i < 4; ++i) + { + if (fd[i] > 0) + ::close(fd[i]); + if (mapping[i] != MAP_FAILED && mapping[i] != NULL) + munmap(mapping[i], max_buffer_size); + if (buffer_fd[i] > -1) + ::close(buffer_fd[i]); + } + + return error{error_code, std::string{error_msg}}; + } + friend class server; + }; + + class server + { + + private: + pid_t server_pid_; + int master_control_fd_; + uint32_t max_buffer_size_; + bool moved = false; + + // private constructor + server(pid_t server_pid, int master_control_fd, uint32_t max_buffer_size) + : server_pid_(server_pid), master_control_fd_(master_control_fd), max_buffer_size_(max_buffer_size) {} + + void accept_cleanup(void *mapping[4], int child_fd[2], int buffer_fd[4], uint32_t pid_child) + { + for (int i = 0; i < 4; i++) + { + if (mapping[i] != MAP_FAILED && mapping[i] != NULL) + munmap(mapping[i], max_buffer_size_); + if (i < 2 && child_fd[i] > -1) + ::close(child_fd[i]); + if (buffer_fd[i] > -1) + ::close(buffer_fd[i]); + } + + if (pid_child > 0) + { + int ret1 = kill(pid_child, SIGTERM); + int wstat; + int ret2 = waitpid(pid_child, &wstat, 0); + } + } + + public: + // No copy constructor + server(const server &) = delete; + + // only a move constructor + server(server &&old) : server_pid_(old.server_pid_), + master_control_fd_(old.master_control_fd_), + max_buffer_size_(old.max_buffer_size_) + { + old.moved = true; + } + + pid_t server_pid() + { + return server_pid_; + } + + int master_control_fd() + { + return master_control_fd_; + } + + uint32_t max_buffer_size() + { + return max_buffer_size_; + } + + std::variant accept(const bool no_block = false) + { + + static int calls = 0; + ++calls; + + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] Accept[0] called %d\n", calls); + +#define HPWS_ACCEPT_ERROR(code, msg) \ + { \ + accept_cleanup(mapping, child_fd, buffer_fd, pid); \ + return error{code, msg}; \ + } + + int child_fd[2] = {-1, -1}; + int buffer_fd[4] = {-1, -1, -1, -1}; + void *mapping[4] = {NULL, NULL, NULL, NULL}; + // must not use pid_t here since we transfer across IPC channel as a uint32. + uint32_t pid = 0; + + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] Accept[1] called %d\n", calls); + { + struct msghdr child_msg = {0}; + memset(&child_msg, 0, sizeof(child_msg)); + char cmsgbuf[CMSG_SPACE(sizeof(int) * 2)]; + child_msg.msg_control = cmsgbuf; + child_msg.msg_controllen = sizeof(cmsgbuf); + + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] Accept[2] called %d\n", calls); + + // If no-block is specified, we first check any bytes available on control fd + // before attempting to do a blocking a read. + if (no_block) + { + struct pollfd master_pfd; + master_pfd.fd = this->master_control_fd_; + master_pfd.events = POLLERR | POLLHUP | POLLNVAL | POLLIN; + const int master_poll_result = poll(&master_pfd, 1, HPWS_SMALL_TIMEOUT); + + if (master_poll_result == -1) // 1 ms timeout + HPWS_ACCEPT_ERROR(200, "poll failed on master control line"); + + if (master_poll_result == 0) // No data available + HPWS_ACCEPT_ERROR(199, "no new client available"); + } + + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] Accept[3] called %d\n", calls); + + int bytes_read = + recvmsg(this->master_control_fd_, &child_msg, 0); + struct cmsghdr *cmsg = CMSG_FIRSTHDR(&child_msg); + if (cmsg == NULL || cmsg->cmsg_type != SCM_RIGHTS) + HPWS_ACCEPT_ERROR(200, "non-scm_rights message sent on master control line"); + memcpy(&child_fd, CMSG_DATA(cmsg), sizeof(child_fd)); + if (child_fd[0] < 0 || child_fd[1] < 0) + HPWS_ACCEPT_ERROR(201, "scm_rights passed fd/s were negative"); + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] On accept received SCM: child_fd[0] = %d, child_fd[1] = %d\n", + child_fd[0], child_fd[1]); + + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] Accept[4] called %d\n", calls); + } + + // read info from child control line with a timeout + struct pollfd pfd; + int ret; + + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] Accept[5] called %d\n", calls); + + pfd.fd = child_fd[0]; // expect all setup messages on the hpws->sagent controlfd (0) + pfd.events = POLLIN; + ret = poll(&pfd, 1, HPWS_SMALL_TIMEOUT); // 1 ms timeout + + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] Accept[6] called %d\n", calls); + + // timeout or error + if (ret < 1) + HPWS_ACCEPT_ERROR(202, "timeout waiting for hpws accept child message"); + + // first thing we'll receive is the pid of the client + if (recv(child_fd[0], (unsigned char *)(&pid), sizeof(pid), 0) < sizeof(pid)) + HPWS_ACCEPT_ERROR(212, "did not receive expected 4 byte pid of child process on accept"); + + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] Accept[7] called %d\n", calls); + + // second thing we'll receive is IP address structure of the client + addr_t buf; + int bytes_read = + recv(child_fd[0], (unsigned char *)(&buf), sizeof(buf), 0); + + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] Accept[8] called %d\n", calls); + + if (bytes_read < sizeof(buf)) + HPWS_ACCEPT_ERROR(202, "received message on master control line was not sizeof(sockaddr_in6)"); + + // third thing we will receive is the four fds for the buffers + { + struct msghdr child_msg = {0}; + memset(&child_msg, 0, sizeof(child_msg)); + char cmsgbuf[CMSG_SPACE(sizeof(int) * 4)]; + child_msg.msg_control = cmsgbuf; + child_msg.msg_controllen = sizeof(cmsgbuf); + + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] Accept[9] called %d\n", calls); + + int bytes_read = + recvmsg(child_fd[0], &child_msg, 0); + struct cmsghdr *cmsg = CMSG_FIRSTHDR(&child_msg); + if (cmsg == NULL || cmsg->cmsg_type != SCM_RIGHTS) + HPWS_ACCEPT_ERROR(203, "non-scm_rights message sent on accept child control line"); + memcpy(&buffer_fd, CMSG_DATA(cmsg), sizeof(buffer_fd)); + + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] Accept[10] called %d\n", calls); + + for (int i = 0; i < 4; ++i) + { + //fprintf(stderr, "scm passed buffer_fd[%d] = %d\n", i, buffer_fd[i]); + if (buffer_fd[i] < 0) + HPWS_ACCEPT_ERROR(203, "child accept scm_rights a passed buffer fd was negative"); + mapping[i] = + mmap(0, max_buffer_size_, PROT_READ | PROT_WRITE, MAP_SHARED, buffer_fd[i], 0); + if (mapping[i] == MAP_FAILED) + HPWS_ACCEPT_ERROR(204, "could not mmap scm_rights passed buffer fd"); + } + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] Accept[11] called %d\n", calls); + } + { + struct pollfd pfd; + int ret; + + for (int i = 0; i <= 1; ++i) + { + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] waiting for 'r' on child_fd[%d]=%d accept\n", i, child_fd[i]); + pfd.fd = child_fd[i]; + pfd.events = POLLERR | POLLHUP | POLLNVAL | POLLIN; + + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] Accept[12] called %d\n", calls); + + // now we wait for a 'r' ready message or for the socket/client to die + ret = poll(&pfd, 1, HPWS_LONG_TIMEOUT); // default= 1500 ms timeout + + if (!(pfd.revents & POLLIN)) + HPWS_ACCEPT_ERROR(5, "could not read from client_fd"); + + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] Accept[12a] called %d - ret %d\n", calls, ret); + char rbuf[2]; + bytes_read = recv(child_fd[i], rbuf, sizeof(rbuf), 0); + if (bytes_read < 1) + HPWS_ACCEPT_ERROR(2, "nil message sent by hpws on startup on accept"); + + if (rbuf[0] != 'r') + HPWS_ACCEPT_ERROR(3, "unexpected content in message sent by hpws client mode on startup"); + + if (rbuf[1] != '0' + i) + HPWS_ACCEPT_ERROR(4, "received wrong r message on control line fd"); + + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] 'r%c' received on child_fd[%d]=%d\n", rbuf[1], i, child_fd[i]); + } + + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] Accept[13] called %d\n", calls); + } + + return client{ + "", + buf, + child_fd[0], + child_fd[1], + max_buffer_size_, + (pid_t)pid, + buffer_fd, + mapping}; + } + + ~server() + { + if (!moved) + { + + // RH TODO ensure this pid terminates by following up with a SIGKILL + if (server_pid_ > 0) + { + kill(server_pid_, SIGTERM); + int status; + waitpid(server_pid_, &status, 0 /* should we use WNOHANG? */); + } + + ::close(master_control_fd_); + } + } + + static std::variant create( + std::string_view bin_path, + uint32_t max_buffer_size, + uint16_t port, + uint32_t max_con, + uint16_t max_con_per_ip, + std::string_view cert_path, + std::string_view key_path, + std::vector argv, //additional_arguments + std::function fork_child_init = NULL) + { +#define HPWS_SERVER_ERROR(code, msg) \ + { \ + error_code = code; \ + error_msg = msg; \ + goto server_error; \ + } + + int error_code = -1; + const char *error_msg = NULL; + int fd[2] = {-1, -1}; + pid_t pid = -1; + int count_args = 17 + argv.size(); + char const **argv_pass = NULL; + + if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, fd)) + HPWS_SERVER_ERROR(100, "could not create unix domain socket pair"); + + // construct the arguments + char shm_size[32]; + + if (snprintf(shm_size, 32, "%d", max_buffer_size) <= 0) + HPWS_SERVER_ERROR(90, "couldn't write shm size to string"); + + char port_str[6]; + if (snprintf(port_str, 6, "%d", port) <= 0) + HPWS_SERVER_ERROR(91, "couldn't write port to string"); + + char max_con_str[11]; + if (snprintf(max_con_str, 11, "%d", max_con) <= 0) + HPWS_SERVER_ERROR(92, "couldn't write max_con to string"); + + char max_con_per_ip_str[6]; + if (snprintf(max_con_per_ip_str, 6, "%d", max_con_per_ip) <= 0) + HPWS_SERVER_ERROR(93, "couldn't write max_con_per_ip to string"); + + argv_pass = + reinterpret_cast(alloca(sizeof(char *) * count_args)); + { + int upto = 0; + argv_pass[upto++] = bin_path.data(); + argv_pass[upto++] = "--server"; + argv_pass[upto++] = "--maxmsg"; + argv_pass[upto++] = shm_size; + argv_pass[upto++] = "--port"; + argv_pass[upto++] = port_str; + argv_pass[upto++] = "--cert"; + argv_pass[upto++] = cert_path.data(); + argv_pass[upto++] = "--key"; + argv_pass[upto++] = key_path.data(); + argv_pass[upto++] = "--cntlfd"; + argv_pass[upto++] = "3"; + argv_pass[upto++] = "--maxcon"; + argv_pass[upto++] = max_con_str; + argv_pass[upto++] = "--maxconip"; + argv_pass[upto++] = max_con_per_ip_str; + for (std::string_view &arg : argv) + argv_pass[upto++] = arg.data(); + argv_pass[upto] = NULL; + } + + pid = vfork(); + if (pid) + { + + // --- PARENT + + // Fds are set to -1, so when error occurred these fds won't get closed again. + ::close(fd[1]); + fd[1] = -1; + + int flags = fcntl(fd[0], F_GETFD, NULL); + if (flags < 0) + HPWS_SERVER_ERROR(101, "could not get flags from unix domain socket"); + + flags |= FD_CLOEXEC; + if (fcntl(fd[0], F_SETFD, flags)) + HPWS_SERVER_ERROR(102, "could notset flags for unix domain socket"); + + // we will set a timeout and wait for the initial startup message from hpws server mode + struct pollfd pfd; + int ret; + + pfd.fd = fd[0]; + pfd.events = POLLIN; + ret = poll(&pfd, 1, HPWS_LONG_TIMEOUT); // default= 1500 ms timeout + + // timeout or error + if (ret < 1) + HPWS_SERVER_ERROR(1, "timeout waiting for hpws startup message"); + + char buf[1024]; + int bytes_read = recv(fd[0], buf, sizeof(buf) - 1, 0); + if (bytes_read < 1) + { + int status; + // Wait and obtain exit status code of hpws. + if (waitpid(pid, &status, 0) > 0) + { + switch (WEXITSTATUS(status)) + { + case 70: + HPWS_SERVER_ERROR(31, "Could not create listen socket."); + + case 72: + HPWS_SERVER_ERROR(32, "Could not bind socket for listen."); + + case 74: + HPWS_SERVER_ERROR(33, "Listen() failed."); + + default: + break; + } + } + HPWS_SERVER_ERROR(2, "nil message sent by hpws on startup"); + } + + buf[bytes_read] = '\0'; + if (strncmp(buf, "startup", 7) != 0) + { + fprintf(stderr, "startup message: `%.*s`\n", bytes_read, buf); + HPWS_SERVER_ERROR(3, "unexpected content in message sent by hpws on startup"); + } + return server{ + pid, + fd[0], + max_buffer_size}; + } + else + { + + // --- CHILD + if (fork_child_init) + fork_child_init(); + + ::close(fd[0]); + + // dup fd[1] into fd 3 + dup2(fd[1], 3); + ::close(fd[1]); + + // we're assuming all fds above 3 will have close_exec flag + execv(bin_path.data(), (char *const *)argv_pass); + // we will send a nil message down the pipe to help the parent know somethings gone wrong + char nil[1]; + nil[0] = 0; + send(3, nil, 1, 0); + exit(1); // execl failure as child will always result in exit here + } + + server_error:; + + // NB: execution to here can only happen in parent process + // clean up any mess after error + if (pid > 0) + { + kill(pid, SIGKILL); /* RH TODO change this to SIGTERM and set a timeout? */ + int status; + waitpid(pid, &status, 0 /* should we use WNOHANG? */); + } + if (fd[0] > 0) + ::close(fd[0]); + if (fd[1] > 0) + ::close(fd[1]); + + return error{error_code, std::string{error_msg}}; + } + }; +} // namespace hpws + +#endif diff --git a/src/conf.cpp b/src/conf.cpp index 3807030..60b0559 100644 --- a/src/conf.cpp +++ b/src/conf.cpp @@ -28,17 +28,6 @@ namespace conf return 0; } - /** - * Cleanup any resources. - */ - void deinit() - { - if (init_success) - { - // Deinit here. - } - } - /** * Create config here. * @return 0 for success. -1 for failure. @@ -70,6 +59,7 @@ namespace conf sa_config cfg = {}; cfg.version = "0.0.1"; + cfg.server.ip_port = {}; cfg.log.max_file_count = 50; cfg.log.max_mbytes_per_file = 10; cfg.log.log_level = "inf"; @@ -87,25 +77,28 @@ namespace conf } /** - * Updates the context with directory paths based on provided base directory. + * Updates the context with directory paths based on provided executable path. * This is called after parsing SA command line arg in order to populate the ctx. - * @param basedir Path to base directory. + * @param exepath Path to executable. */ - void set_dir_paths(std::string basedir) + void set_dir_paths(std::string exepath) { - if (basedir.empty()) + if (exepath.empty()) { // This code branch will never execute the way main is currently coded, but it might change in future - std::cerr << "Base directory must be specified\n"; + std::cerr << "Executeble path must be specified\n"; exit(1); } - // Resolving the path through realpath will remove any trailing slash if present - // Set config directory to the parent of the exec binary. - basedir = dirname((char *)util::realpath(basedir).data()); - ctx.config_dir = basedir + "/cfg"; + // resolving the path through realpath will remove any trailing slash if present + exepath = util::realpath(exepath); + + // Take the parent directory path. + ctx.exe_dir = dirname(exepath.data()); + ctx.hpws_exe_path = ctx.exe_dir + "/" + "hpws"; + ctx.config_dir = ctx.exe_dir + "/cfg"; ctx.config_file = ctx.config_dir + "/sa.cfg"; - ctx.log_dir = basedir + "/log"; + ctx.log_dir = ctx.exe_dir + "/log"; } /** @@ -114,15 +107,19 @@ namespace conf */ int validate_dir_paths() { - const std::string paths[2] = { + const std::string paths[3] = { ctx.config_file, - ctx.log_dir}; + ctx.log_dir, + ctx.hpws_exe_path}; for (const std::string &path : paths) { if (!util::is_file_exists(path) && !util::is_dir_exists(path)) { - std::cerr << path << " does not exist.\n"; + if (path == ctx.hpws_exe_path) + std::cerr << path << " binary does not exist.\n"; + else + std::cerr << path << " does not exist.\n"; return -1; } } @@ -179,6 +176,31 @@ namespace conf std::string jpath; + // server + { + jpath = "server"; + + try + { + const jsoncons::ojson &server = d["server"]; + + cfg.server.ip_port.host_address = server["host"].as(); + cfg.server.ip_port.port = server["port"].as(); + + // Push the peer address and the port to peers set + if (cfg.server.ip_port.host_address.empty()) + { + std::cerr << "Configured server host_address is empty.\n"; + return -1; + } + } + catch (const std::exception &e) + { + print_missing_field_error(jpath, e); + return -1; + } + } + // log { jpath = "log"; @@ -217,6 +239,16 @@ namespace conf jsoncons::ojson d; d.insert_or_assign("version", cfg.version); + // Server configs. + { + jsoncons::ojson server_config; + + server_config.insert_or_assign("host", cfg.server.ip_port.host_address); + server_config.insert_or_assign("port", cfg.server.ip_port.port); + + d.insert_or_assign("server", server_config); + } + // Log configs. { jsoncons::ojson log_config; diff --git a/src/conf.hpp b/src/conf.hpp index 0ae6d62..ec7143a 100644 --- a/src/conf.hpp +++ b/src/conf.hpp @@ -14,6 +14,32 @@ namespace conf ERROR }; + struct host_ip_port + { + std::string host_address; + uint16_t port = 0; + + bool operator==(const host_ip_port &other) const + { + return host_address == other.host_address && port == other.port; + } + + bool operator!=(const host_ip_port &other) const + { + return !(host_address == other.host_address && port == other.port); + } + + bool operator<(const host_ip_port &other) const + { + return (host_address == other.host_address) ? port < other.port : host_address < other.host_address; + } + + const std::string to_string() const + { + return host_address + ":" + std::to_string(port); + } + }; + struct log_config { std::string log_level; // Log severity level (dbg, inf, wrn, wrr) @@ -23,19 +49,27 @@ namespace conf size_t max_file_count = 0; // Max no. of log files to keep. }; + struct server_config + { + host_ip_port ip_port; + }; + struct sa_config { std::string version; + server_config server; log_config log; }; struct sa_context { - std::string command; // The CLI command issued to launch Sashimono agent + std::string command; // The CLI command issued to launch Sashimono agent + std::string exe_dir; // Hot Pocket executable dir. + std::string hpws_exe_path; // hpws executable file path. - std::string config_dir; // Config dir full path. - std::string config_file; // Full path to the config file. - std::string log_dir; // Log directory full path. + std::string config_dir; // Config dir full path. + std::string config_file; // Full path to the config file. + std::string log_dir; // Log directory full path. }; // Global context struct exposed to the application. @@ -48,11 +82,9 @@ namespace conf int init(); - void deinit(); - int create(); - void set_dir_paths(std::string basedir); + void set_dir_paths(std::string exepath); int validate_dir_paths(); diff --git a/src/main.cpp b/src/main.cpp index d67aacf..077971f 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -5,6 +5,7 @@ #include "conf.hpp" #include "sqlite.hpp" #include "salog.hpp" +#include "comm/comm_handler.hpp" /** * Parses CLI args and extracts sashimono agent command and parameters given. @@ -14,13 +15,16 @@ */ int parse_cmd(int argc, char **argv) { - conf::ctx.command = argv[1]; - if (argc == 2 && //We get working dir as an arg anyway. So we need to check for ==2 args. - (conf::ctx.command == "new" || conf::ctx.command == "run" || conf::ctx.command == "version")) + if (argc > 1) { - // We populate the global contract ctx with the detected command. - conf::set_dir_paths(argv[0]); - return 0; + conf::ctx.command = argv[1]; + if (argc == 2 && //We get working dir as an arg anyway. So we need to check for ==2 args. + (conf::ctx.command == "new" || conf::ctx.command == "run" || conf::ctx.command == "version")) + { + // We populate the global contract ctx with the detected command. + conf::set_dir_paths(argv[0]); + return 0; + } } // If all extractions fail display help message. @@ -38,11 +42,70 @@ int parse_cmd(int argc, char **argv) */ void deinit() { - conf::deinit(); + comm::deinit(); +} + +void sig_exit_handler(int signum) +{ + LOG_WARNING << "Interrupt signal (" << signum << ") received."; + deinit(); + LOG_WARNING << "sagent exited due to signal."; + exit(signum); +} + +void segfault_handler(int signum) +{ + exit(SIGABRT); +} + +/** + * Global exception handler for std exceptions. + */ +void std_terminate() noexcept +{ + const std::exception_ptr exptr = std::current_exception(); + if (exptr != 0) + { + try + { + std::rethrow_exception(exptr); + } + catch (std::exception &ex) + { + LOG_ERROR << "std error: " << ex.what(); + } + catch (...) + { + LOG_ERROR << "std error: Terminated due to unknown exception"; + } + } + else + { + LOG_ERROR << "std error: Terminated due to unknown reason"; + } + + exit(1); } int main(int argc, char **argv) { + // Register exception and segfault handlers. + std::set_terminate(&std_terminate); + signal(SIGSEGV, &segfault_handler); + signal(SIGABRT, &segfault_handler); + + // Become a sub-reaper so we can gracefully reap hpws child processes via hpws.hpp. + // (Otherwise they will get reaped by OS init process and we'll end up with race conditions with gracefull kills) + prctl(PR_SET_CHILD_SUBREAPER, 1); + + // Disable SIGPIPE to avoid crashing on broken pipe IO. + { + sigset_t mask; + sigemptyset(&mask); + sigaddset(&mask, SIGPIPE); + pthread_sigmask(SIG_BLOCK, &mask, NULL); + } + // Extract the CLI args // This call will populate conf::ctx if (parse_cmd(argc, argv) != 0) @@ -65,40 +128,26 @@ int main(int argc, char **argv) { LOG_INFO << "Sashimono agent started. Version : " << conf::cfg.version << " Log level : " << conf::cfg.log.log_level; - // Run the program. - - sqlite3 *db = NULL; - const char *path = "db.sqlite"; - - if (sqlite::open_db(path, &db, true) == -1) + if (comm::init() == -1) { - LOG_ERROR << "Error opening database"; + deinit(); return -1; } - LOG_INFO << "Database " << path << " opened successfully"; - const std::vector column_info{ - sqlite::table_column_info("VERSION", sqlite::COLUMN_DATA_TYPE::TEXT)}; + // After initializing primary subsystems, register the exit handler. + signal(SIGINT, &sig_exit_handler); + signal(SIGTERM, &sig_exit_handler); - if (create_table(db, "SA_VERSION", column_info) == -1) - return -1; + // Waiting for the websocket sessions. + comm::wait(); - if (sqlite::insert_row(db, "SA_VERSION", "VERSION", "\"0.0.0\"") == -1) - return -1; + deinit(); - if (sqlite::close_db(&db) == -1) - { - LOG_ERROR << "Error closing database"; - return -1; - } + LOG_INFO << "sashimono agent exited normally."; } else if (conf::ctx.command == "version") - // Print the version LOG_INFO << "Sashimono Agent " << conf::cfg.version; - deinit(); + return 0; } - - LOG_INFO << "sashimono agent exited normally."; - return 0; -} +} \ No newline at end of file diff --git a/src/pchheader.hpp b/src/pchheader.hpp index df888d0..b877602 100644 --- a/src/pchheader.hpp +++ b/src/pchheader.hpp @@ -2,6 +2,8 @@ #define _SA_PCHHEADER_ #include +#include +#include #include #include #include @@ -9,10 +11,12 @@ #include #include #include +#include #include #include #include #include +#include #include #include diff --git a/src/util/util.cpp b/src/util/util.cpp index f161724..cf78346 100644 --- a/src/util/util.cpp +++ b/src/util/util.cpp @@ -97,4 +97,35 @@ namespace util return buffer.data(); } + /** + * Clears signal mask and signal handlers from the caller. + * Called by other processes forked from sagent threads so they get detatched from + * the sagent signal setup. + */ + void fork_detach() + { + // Restore signal handlers to defaults. + signal(SIGINT, SIG_DFL); + signal(SIGSEGV, SIG_DFL); + signal(SIGABRT, SIG_DFL); + + // Remove any signal masks applied by sagent. + sigset_t mask; + sigemptyset(&mask); + pthread_sigmask(SIG_SETMASK, &mask, NULL); + + // Set process group id (so the terminal doesn't send kill signals to forked children). + setpgrp(); + } + + // Applies signal mask to the calling thread. + void mask_signal() + { + sigset_t mask; + sigemptyset(&mask); + sigaddset(&mask, SIGINT); + sigaddset(&mask, SIGPIPE); + pthread_sigmask(SIG_BLOCK, &mask, NULL); + } + } // namespace util diff --git a/src/util/util.hpp b/src/util/util.hpp index a885ec7..6f971af 100644 --- a/src/util/util.hpp +++ b/src/util/util.hpp @@ -18,6 +18,10 @@ namespace util const std::string realpath(std::string_view path); + void fork_detach(); + + void mask_signal(); + } // namespace util #endif