From 66b0c86bebc417c47988db9da6b5f53e16c1d200 Mon Sep 17 00:00:00 2001 From: Savinda Senevirathne Date: Wed, 21 Jul 2021 16:35:17 +0530 Subject: [PATCH] Unix socket interface. (#37) --- CMakeLists.txt | 5 +- README.md | 2 +- dependencies/hpws | Bin 43656 -> 0 bytes dev-setup.sh | 4 + examples/message-board/message-board.js | 4 - installer/sashimono-install.sh | 6 +- installer/sashimono-uninstall.sh | 3 + src/comm/comm_handler.cpp | 242 ++++-- src/comm/comm_handler.hpp | 13 +- src/comm/comm_session.cpp | 302 ------- src/comm/comm_session.hpp | 54 -- src/comm/hpws.hpp | 995 ------------------------ src/conf.cpp | 48 +- src/conf.hpp | 9 +- src/main.cpp | 3 - src/msg/json/msg_json.cpp | 61 +- src/msg/json/msg_json.hpp | 2 - src/msg/msg_common.hpp | 4 - src/pchheader.hpp | 3 + 19 files changed, 243 insertions(+), 1517 deletions(-) delete mode 100755 dependencies/hpws delete mode 100644 src/comm/comm_session.cpp delete mode 100644 src/comm/comm_session.hpp delete mode 100644 src/comm/hpws.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index b97cefa..d8e5747 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -25,7 +25,6 @@ add_subdirectory(src/killswitch) add_executable(sagent src/conf.cpp src/comm/comm_handler.cpp - src/comm/comm_session.cpp src/util/util.cpp src/salog.cpp src/crypto.cpp @@ -51,7 +50,7 @@ add_dependencies(sagent ) add_custom_command(TARGET sagent POST_BUILD - COMMAND bash -c "cp -r ./dependencies/{hpfs,hpws,user-install.sh,user-uninstall.sh} ./build/" + COMMAND bash -c "cp -r ./dependencies/{hpfs,user-install.sh,user-uninstall.sh} ./build/" COMMAND tar xf ./dependencies/contract_template.tar -C ./build/ --no-same-owner COMMAND cp ./dependencies/hp.cfg ./build/contract_template/cfg/ COMMAND cp ./bootstrap-contract/script.sh ./build/contract_template/contract_fs/seed/state/ @@ -64,7 +63,7 @@ target_precompile_headers(sagent PUBLIC src/pchheader.hpp) # Add target to generate the installer setup. add_custom_target(installer COMMAND mkdir -p ./build/sashimono-installer - COMMAND bash -c "cp -r ./build/{sagent,hpfs,hpws,user-install.sh,user-uninstall.sh,contract_template} ./build/sashimono-installer/" + COMMAND bash -c "cp -r ./build/{sagent,hpfs,user-install.sh,user-uninstall.sh,contract_template} ./build/sashimono-installer/" COMMAND bash -c "cp -r ./installer/{docker-install.sh,registry-install.sh,registry-uninstall.sh,sashimono-install.sh,sashimono-uninstall.sh} ./build/sashimono-installer/" COMMAND bash -c "cp -r ./dependencies/{user-cgcreate.sh,libblake3.so} ./build/sashimono-installer/" COMMAND tar cfz ./build/sashimono-installer.tar.gz --directory=./build/ sashimono-installer diff --git a/README.md b/README.md index ce893ce..6e6d5f5 100644 --- a/README.md +++ b/README.md @@ -39,7 +39,7 @@ Run `make installer` ('sashimono-installer.tar.gz' will be placed in build direc ## Code structure Code is divided into subsystems via namespaces. -**comm::** Handles generic web sockets communication functionality. Mainly acts as a wrapper for [hpws](https://github.com/RichardAH/hpws). +**comm::** Handles socket related functionality. **conf::** Handles configuration. Loads and holds the central configuration object. Used by most of the subsystems. diff --git a/dependencies/hpws b/dependencies/hpws deleted file mode 100755 index 69c98f5851719e0fafc60ab09ac9710fc586f9fb..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 43656 zcmeHw3wTu3wf{*V5Yb_RB^E8p(TakYm;l2=5D5?*6eJZ;tI`mX35kSEI+^gWimAgW z<2ahCSgf_B_GPWN(yA0uASe>F_g1vGNUhdfZFP>RZK_sttu_DOZ|%KjP96hX?!Di4 z|KI0<%-(D5wbxpE?X}l_oHKK7ES*0$J1fgF*8ppVCD5q~GgB9A{Jx>EteMsn>o|Ol zw7y^s0(v0+X6oYJX_sbg(p*h904eKD$ySKo=}1jyQllZ$99`CL@(qnOsbvd-^{8vv zeF7r2C1cXO!3sZ9rqhK+s41ey1aKf2W7lWN?KlJe+pMI zKj#hs`=pB%+A^uB_a)S$o=1D-X~N62yi9i4ss)&ocI0(lUG4Ho6VI!wDyXZiZ(3Wh zcFLrJNfQejgN5Uz-Q-W&i!NP4a*O?~O(EsOHIC&g4}JIYCDUKaD;al1-OQ{__q_7Q zH;YKW0Dq?6tQ_V>W)pWQ{ubiTH!IvU^}YDW@q0fy@zDo9IuWRh*TtybAHl8+`0qj4 z{^;wVSbzBEGSH96KtCV@-jhKe%C!4w-PIZRype&=%Ng*SGU$I+2KxViF8$eIUIu+$ z&cJ_f20m|QpkI>#e>(Ws`0IUrKZE{{WWZkl26k^IfX~Q4|811@XNRR3^c)ZSoY4>e z)fxDgW#IFR408XHf&RD*{qj-<`t=#`!!q!>HiJHO8TjnVfcIp;kIR6+C4=2cGU)kS z2Kt|7z(1Hl?y?N+U6O&%g&Fu-!*+rFM12> zh04&nhH#K;gyOH}-2Vq;^S(QonmMdgiw(CR=4%)&LnDr?cA`Q@`0Us>MN7$~m{ zgu=DewUrg&KzVg7bY!7YfVsv%xV$o0UmvIpvxQL0RHh*2Ya7dJ>uaG0J{xNSin}yr zVOi;=LcwAvB;+f=P+DP8FPB1;R8d(OXmAT6a(Qh~YE}iyYbxri8fz+6rnO)b%0jiP zq1h#Yb?Fuu?W#k8fOHwHG0AEShbk)?VYZOf7_3|w2n(iRofTMH8@6h~!TLJ+Y_wKX z)S|&5rD}CJ)KpIsS;2;IZGDy1pzzun!wdbyayK37p%rKe{5oXS1}np876h+qTwyge)&&9$R%KnV zk>-K=)#9twmg-GtQn|K*?om-!dtHD+Rs~mMER=I3>0z^~qPCvOK@c>qUr~;ZfFkvp zlkueD^5XI$>!SJdX3Z`iUsyabl}+cyPfj7n7y3BTv$1AR|HH8E0kK3;C`x_~6pC!@ z&)jttHcij6fuI;9^vo$vtWR2ZT_a!N1`B_KqmJAk@SkNdUfE1Xt->~Fpw)!<$14l@ z$3VdHGW^~BjeAf&*ovT>xZ$<69`5jN*YGV;o?~s+aMLNZ$Fb$#Pw~sLet>$c%S*}2 zPRTR-NfRfn(-hs6I<6XgtKMJdZn1^H;0?T`7yjYf6`$5#_(wIqy%&Cm##`U($$yW= z+r9AnG=63;{I4{ASugwx8sFRte^BGM^uoWU@vXh^Z)tpcFMNl_TMyV4FJqrhjkkN@ zKhXFC-&OS73!~#w*C7|)^c@S^UHJSIwPht;_z^CArweb+CRl9d%^c*pM>Ut3ABpFl z)m&zNB%Xe0u4dgX;x9Ewq?_-z1>*5UmgLmO78jml)Laj`@Y;2yXsZiv=4=*hci}19 zT&*s=`+Q-)3!geNL8SvOyy-LYI^@C+G0c&+yYR=m@JSc`1Q)*3g~z~6U0p8x&=eM# z@7MxSN3Z16N3IJ$%!SW$;oa-q5ib1aT=cdJ&od2kjd9^mGDxIFF8t?R_$e;@$u9g% z7yc9%-tWSH!G$k#;dO|S(kor~(_Hk+T=-Ege2oilyYLMz{OKP};N4xNwUHG$H_$@B{*)IHpF8ml5eya;V)`j2h!k^>9x4Q7-T=@Mi{JAdt z0T;f&g+JuN7rOB6F8p~eeA0z4a^X8&_+l5n%Y`5B!dp6C(f=p7@VPGhL>E5Kg_mW3 zE*s&(Pjb=QF8pK{evAu0#f2|&;itOrQ(XA-UHF+U{4^Kd@4`=a;mcfjp5>V9N*8{H zK_Xq|!e8jZ*SPRAUHAqUzQl!J>%z}+;hSCf*)IG>7k-Wl-{QiTy6~G___;3p78m{^ z7ydyP-tWS1b>Zi^@Y`MZi(U9u7yc3#e!mMp--Y)_{+#QN=QQ17TmEQkIJ^6RKe9LX znG{90@5HEWb&np7Pw%Li$Pun#xzpZ_YxEGpoZ_4V0%sHE)aJAb{P70BoYI`F0>4Li zAmJ?nze6~OaErii5a!h7GzTwzbzKk%JIL$SD(e7GX|3&P;*NC(J3wDH3=*;gN)GfzKh#smI9^_zc3Q5w-+Ah43iCo&RL} zPb6#;ZWnk6VNNa10fDm#b4qbq1^ze!m{WXmI?ekVNMNBnZQpI=9J*f6nF<=P6bYpz`r2ODZsG>euywbzmq5M z1B4m!9ZTSE5oV}&IzN&AC(IDK4Fq6~8+#_TpFku{ZpYKX#Tao}b7W zvdpsl3C~^SOfG0dBsjg8vY5`QA+bfo^NUH;gg!Y3o--3c0oJkQeZ z_Uc0Ks13q@=`)OSX&Ch#IJ7_EdjrW5|JYAa-@oV60sdI0e^2s4f7U+#i=T!^fZ;sN z@B}i<#IN~+J>X+Co#&5SFbWmG68fCrk6&;Opri*DPWth&FXvv+Wl^1GD)#;}V2SpS ze5q3n3g-N=8dAAMB z1|nAa0SJ`r+r}T>3Hb}SqEZIgX$k&F;oXg63f$sU0LB_{zz$8mT;a{dMLEH zBdbLfE`3In`TC!?BCf=CUh`&mw?FoH8Imf0eBfvfR)4IttGJbS@2uSnmdQ8KxyfB0 zcX!8f=6@Y6pMb#@%eeqqe_J&9$L?+rG25BWwx_d)(%Jp#>;aQaL_6Px4!d~jSJ)6( z<&PEZo0*GVjCLl8Ul~|OG_)#$h%QsDEo%KKB{(;o&C}Vo=w7r*D`ulsGj)t_Vivp` z-spdT=u@PVT&t$r7i~qOt$oo>MUi!M`1C6l)3S|+%_acjEngX!#+barz(+mg<1OlOhDRwABkhJdln=~SgR=nu#%p(b zBtfQG&g<|2mCts5gFFIE4LJJ~o5nyx#-nS1jJ0hiw`2~yErKCBKA!*S4WNqWED~bR zUQ#7|j+=N4M%`|%kdnXt5Uk=+;TUNh>W@j*sZ{d1ZWMX+O|S^LS!_b{A1%7pyCTxlaDsZdype4ZUSjM|LyD1n0U@~rDB0n@g;YKVJQ`V`FENnr)xc` zBsOtGvi)w_O@{dtVScM({<>7~Si1f03Xef)W%H{P^ZOKY4s9^cchgQW%zuwQisu(7 z<~J$kzaeG9H^N=viWKvIen8DVGlW(Gf!lKB=r!kNFje6Ls#_h`f!wRf5h>i{MIi6E z3WoUW4PpoeB#5rP3K|$I--g!E@g~IcqhAA6Jm(c5_FS#>Y;Y4l0*1;6OQ2^9Y|CNe zkIPgMk8<9a7sFKL*+y20{5jwOpU4Gqq^sK-{TUV{k)1N=ZiJ@~R|0F5z*CgK?k~Hv zn?ZqGYbXEszNwnM9#wyLo#gS z+r-ctTARSQSuy@C`z+!4kgZSnR=OEKgx=lFDKS|F#&MYhqOI#tcaExiud4fws(YTh zZawRAZJ8V{N@)$^GFQMv)Zo)~(zHd2%RtpM=SnDI`fLojaK)Vb^?S66aPe;VHa1;t z(T!*nugFfBVVdSwV-806%SJ4#L7DfLt>}^{Dptp$@1bBf7+J}?;Rmtk2aJ=1)kKfj zU?tB%@5O{|$K{&W5cqWba!_c;jEmMK z+XoUAdvOOhfzmJ--~&|PazfPJqbPSY3*oO(+=ND}P7Nw8_Ofpio*bpcRc zswOW4Q^m@Nc^4SP=WNC2dd25;dPTx_qMOf`$me46`KS}hZI?T>O%1ee=Uod=j8ZoyGbDO< zBJ7a`&D4x}00N{XsGPhBO^HSS6+IQ%$IIrVwg=s7V-8iS zW8Q(3QgJ4()*?IkX|1Dj`Bd%|SVt+h1%_*jUXNn!B+SNTjhA*37VW}FqV6xC0!QT; z$x`+#meM&-t%ZFPzJGI%$IY~VB3h<=X$Kp=i_1MF^;Vc9+S+uMp@_8Rq7A*Ns)UMR z_|qmx!2GWSJ?s{A9t0_U*&ly_mA=oJKD~ND_E367*q+WN)7j2+wkw^rpga|iM;9X( zi;lo2BD&;9;uG3;b|8sGTTuye_9IbdRG3(l;!r?Qx^g9uLDJ7CqFGtlD`Xy28Bw4bxMQ1Uvg3Dl>z4t5Rx)k2Q*A2+VIl~ zPs1Jo8I(2CM6o^b{Ao(N7CKqNSKwBw4m7(lr(4MrDM0Nn;xqFuXa;UITGstlREp;; zmb#vuOCUDkdk3aW={c4xt|5yTu;5Fxh&`?I{Q(t%YL=3PT@1zTY8trmTq@R*J>X~Z5>15;G8UU(i!+ypyq)Ks1N6X$JoUQ-zK*vEs%gX z$IUd)2aGFk15I)ux(Y^jdHBzWeixi$QEqb$^^b@?C}{3^4f;Do?-n#Sz-`gH>3q1H zKj0%SpoA5QK#b$`w~lpw4~@68oz8fyQE=G7&0;Tlf9xQ7r$iZDW_8y5UaDr-MSVnQ zMyOdLVWK3~`UIuyz)WqaDDyYehofz0K*y|%q`OIGWip?WVt${CIk*aW?Cxh7)gsa? z#6>oCfcN%%uz=^V2#mSYPsJ;Gtcq97PynrY6SnfpsN%BdC-vyAc+RKvl7#1xMX*G| zclQD$CSv^t9kja&b(8CO^z=VDr>g50%BF}0ig=KgQtDI3lPNJK zXSB)JB;S_wDO@g+EP-nD3AJSPB;!KUb~g=~Ol z0H^zeugE=+CV;JzQ-T7mx%ZA?2#3jOXnri`xmp{OfGk>N>!!vono7&b!YUS>iM;oh zvc-$E{{y>!ZuZb$(~)FZ$jp%p_|p(pc$gvHKTs2+p+%tCA^SoGdbZ_SDiD1YpOKx9 z;nNBaj>u^>;x^#{-a8h|#Qc$SI&_o~k4mK|zy$v}U8QM=I^gEuitHpWYaNX9KJHV( zIDYs#jPuc@w2Ei2*vR)e_i#E1_H>dLrYbjG;&Q2V|3O*a=#waq=YLo6`~Y?b&u10S zJ0dle%Dd@A&9IS9KWo_5)VmZVhEP6Nm5RFjF3uKTx z0OjNeLTyN}yh^V(=Ee4Nb;)9LWN_^p=+)oiBVH6w2xWf93bDgX=v;!^R){6k*C={2 z6piP%DVmY0T@zHhW~+9+JN}I61NuprOqG$cdZK}~VIWV&rRRB|R>08Cu& zV8yT&7S94KQC5Z5-?xK5qgHQQ9}I|~aZ1o*O3-s06$#&mSW~40{U=Q1{0b*NV4S43 zP(JVnH73sO8H_JQn-J^JIq^;W)HPJNeGzT^_royeFJZ!jKsT@EkGlko?(JU2izgK0mcRoZXp?B7ByeB+&OXAxz7m(?l zz0lsdXBseCa}{n~COqRcG2w>J0)`!leBS6=08qnEZ)nPco#OeiEIwM(XGm{ zS~!=>-gXE43etI&but)Zr=7<$v%gi^)OW7Z_c-ZY&uOZYhq?6~PJQ1xA3PG9wty=_ zHJLwg3fS&ayxWp<=>*CUn-$}C;Y2XRTgnj6Dno29r7AiMJ_*}731R%WVw~XpNr>s> z?o%BL&5Qj_MKWxxcYw(*JO!R+vHkaSoD*P)$WDG*>phs0kh!26lgMgcJb#GNrd(;$ zrL>u>v^mGEO#!rNaXy{O{Rlbm<{LJ#%*n)kh&ko1FTus(NyNBl6GCCC$kZ*{6q&k5 zfxD*N?9Wncae8w^S6{&%;}|6=x?gIDs|zB7s~0N;^~yp~9@f>HSs|{3XjfkVI1fTo zD~nHF6<3E%w^i75o#Wy0=;7NK5fYxCNa8y<+toe4gZ3@XLsNj#s(bIq>xL+`l|`=G z`NB9UrT5}Kj#ovEF@Pe;ECjB2f*zk591@vddWF57OPZbhw7j?f>Ku!=w_)%Q?rS3{ zGJ6a;Zc+O;j12DIzKDik6l8N0bS$&kl$r1nH05MYBneN6B);R_O&N-&IPH@uh3t#b znV7kGW)7$a`@QfI6DMrmYuum8YknnYs+f*ny z`w$4RHzIvxClib7$yOF)|D)QnRJG+@Mhe_JDM4b|@+J)6JU$7SxMGsFzf-K;l~^;L>BYg7M-2syDMSLg5;ieT`7%XB+D)CC>a zbT*(d6i&}FM(|G+`p|B&_!qJz3#Mrn209L*SC#pvNv$>iYFNe-B*MZ*%sKW&f9!ex zSUfY?HF^!KchgsFTJL2z1e!QgjhROnfD*n3W+E}x`yrg$S>ppn%Srg8E;{C0=vwX+ zM2txHkA2$D+oFD)ovGlgO|EA6pRfYLDO?oclri>+QShr>!|}nMZlNVr)P7^BP5mY- z{azH?d%Q}&Y`1MptHJCMm{=)R4$5hzybWn`&(7$gC(28=2o?1)R83NT$fV9IBufVbhV<*zCox z!r5Tc_c(MC9xnq;!Z&?}%cd8CZHtpP0Vu7yip=|DZbuCL2Am}wLuJB;(^xxn41FCv zYGUY*ueK@l6Z9v9{+3ZQ;rXW|zBaBxjT<}> z*=*4{^ibR%Rz7(f6VsG#QE=xLICR8IW3ly7$rX{Kvj5209MzhA#iM>XRRU6a2%UIDx@73EJ3u`lRLMmw@M}Yo-Ar zJDFHFo{St>Vym_P49IGeV*0Bo;B%cK6@y5s;vqU#AW2+dva}n>YzyD`99#iqqT00R zgP`^%SOMJqM~n`S{6_0ZQYaxcBsvHpLv&d8?;uWXQ55B2qr-kyKsa+nhdw7lbD)Zf z@t&utJGVz?BT`8SZGLr`K%$LNsjw@+nr|Y4d zb1C+IXw5Qgoi|4L^rbd?q}{twu+2M(&0ga{0Uz1iC{Ju^s@CM-je-RnL1zI*f1$-! zq9;>F)so~R_gUvSxF?DY5nbmJ}91nEAI z0dphN-3WUF$F9XaRPQbK9f#dZtLrJz_$l!qn2yT9(;jLZiT zC7?Yv2fw9{lwQV4rk4QT=oMe!sWT6x=w6N=U@srL?@MCBPP$sc<8UdE@Lk5l*zVbX z`(N>~^-4x^8`$^k{mf;~`ce;LHZ=@i8wL1)GDu&U}`r@E>xN7cg&<8eJa^XM0+m#8H| zauL`ct%-m9t7A3syQtYy%F&wm^9x~_cupN$5hm{DoSEou; z$NNY*S_?nEKnyelo(K!y&r_>}=Rrw)zda9-apFVI!NLz=1BMm3jvDE2hZw6zM-HUb zmno)?v-c97xzcUE&y%U)b}P6Y#_AJ~(dxffGLl~d`<@<`+3GLsKUS;f_mOh6R=;pQ zEQ9+$@J?9$Y%U@ap7E0S%DGrKc3KYQoQYpFR!68#d3f|uEYRcmA6z1==F!m;o>vr) zpYqVj@HmYP%sO#s!GVz?>!1-X*Y92l%JYSW>_Z{8_FwXbAXJOPv8zpN-ssZ`lD%4Z@b+AQcmNv2R!pel@(IICm4l5NhNloQ*}gm14U?V72DNqf z?`g*)>Zj&RGpGBE)#0gY}PJ|BQ1;?KZjOxzQUPqaf3sp{hMYj=nt)<@IXM zl%DO`(_VxVCp7TpREx(b#4Ra=tq>bih!F}AOCjgYAkhGjUcSXgjRb1+)R(4-?30+zjL%4WxwC#1p}NjvuAsDaClx@^al_F zUKTr-`5tO!yVYc=OHG1MlL^#JIh*pn^%W_EJ^~yow=;15fboI3}53BU*~@8{qFsVDYGA#0-%Yn7MMS*`Vhw__t?#sUf%Oc+R5hzr?es3qBq&mFw7 z8+8~{ND6C94T&j&$PiP0&??%rQy}zbRrS;hbl*=T9z5ysnAiKP5LZGn#grgaQHOeC z=hzIY>*q>L>E@<1;Tg$blJL#L+BOwa=7X(s8n$dk^Byr}fH(TmV3eun3d9b0Du?Qg zi?%Sq7dJ$8b^lBwBgtHpi=oFwrE6l*8bv15(**L~t`3dX&$cE0mm*1c9#(PB$EO!e z;D3-7&b;ad;Y^pEnxUL?*~!4*t0}Oz9HQ%?A-yG!EJ(jd&d(lH!zGDq>NtJllfd_T zoX)y9PQU(m$mnsLE(LO1Rk2rx=SmIS6K$rw62D{ER_E#%nW~WI{dhJMksz{@pF@pX ztSGVR7EjST5}s#Ob5C=x2fnfMux?RyjBat2TF51D0{gyw-2ajA!KaSZEk^c{a<{78gV zH;uz=p{7#zSu{8LNdp}zzKOjI8s%zS5wt3V;HQa%>%Ry(9Wr{+sihnjCEy|I-Rs}; z&I}x1V!42O=JL>@zO}ul>q7jP2Fjj}yn!;#58iG2(-VOR=JeW@#-+sZ&NLSp5W}XH z$Rm;d*vE_o|HSCQ{UbOc+K@n$lME1X`Q&$<$0B&K z%{UPJDp;z+G2AMt0-&gPC**QM`3*CqSD&4UB;h$tb?4b*T;2Jt9T1i|8XoI8&ZC?o zV1YQ;AFbQJ{sh}Cw;lUdIczi>-ABsNy8Xfn#6Ul|P~3hpkHQk3S(5mIqg-|hLAe%Z z_HdwZx`2+>)0g-L9-X3Z;(j0dg+7;Ye*gHFAQUg>6oy-htReF|bMS-xS>$A#9_cF8Jk3%EIfZBYr-F*s;-lI?rtqR_OPz-*T`}w1O}HY^m*=X15{B@cg#57rYqb9kAN!- zx6wHXPq8GvuZ#p_T%iJbv^e9jl{2cVr#qYybf`J8-@q3_K)8J8X{y9H@1|C4U+@K( zR96S_3UT?|lN_;89)vJp-bSftz}$_( zK!F>hRK10S)gV8EZhX*%x5jbr{7KW5n z(h=Qi^P^M+BAS(3F;Njl5Ep2Sd^E-yMba+}ihNREES`aE{z;E?;L{2y3Bhd-u z4!W>1^ zzXv1Vjd(m&EWcpbM;bM`U{FEX#$+&x50Pfj#_vM3fEiB62mXDY&z9;TOY z?Tv26RA2~t#cSt!lqnSlxm9dPtB4hWH zm8xK?HvNtS3D4=OO&!Bs88A;y z4DU8zu7H~qh-F$UVi>vTJqr%`zTPdrZgCTmjrt+Nous&7O@sN0mr%M+j~m{eU?&S`+7h)iu_cxnn^ibhOr2;g7p?az?rH2a83F{m#ZPH4P zQEQYm!d45FGy-d!4|`O>D(9mf2sE@Sjo29dPX*|P4(Zv@@WT+_@4A@>IO+k;9T+@_ zvGNq>Uec>o^bqsRR20XfIjyVR_Gci-+wsI=75+9H^-PAtLFYo z)ZBmkiIkzgtTPl+oOX;cDBFJrJ@wN}dS^Wymxs^O9n=x<0J?>^zhZ-9Km2E{3bJ9afD{LZoNhjILrYaj8gBF05EXb|8Yt zuvDC^)YMBwMS0kz;?1lOS3)vvMzf)cS}Hz;s1A1-RwV9nCpVk;%>zk%uksFpahKOo z#rZimoLY0WR8;#%9Q(6#BCUmLIA;DX&Dsaq_O5L2hApsGJb${9{|$I8-1Qe+oF+VX za&AfZu5rsRr~G2bkLTyAI!o#4ePjMT?gj2x)Qac7d@d;CIp-=NTa}OkCFG-_kYyVA zF|=!OeuYJ#zDXI8H4U7QE1fLw?IS)C<0k2eaI=elNIQBr+=&{_8Tim+DTX6bbB3a5 zc(M@#MGC%(zULnVAIi~M*Cj}E(fatfh6cr1pSkW}KFOodQ6?NiU?ew9_ie!#eGY ztbqGN6q3nl7eE!|w1dYdeOmWlVnZHL`Or1CBTP2@TU0-Eu@5-Tgx_~NV8Wf~!E?3Zg#b+@f*UGjOV2g91d`{)3>*rb2FE*(! zr|%lJn41o@fc@(CARFIQCP!Qy6L8La0EX^|bS_E1E~H#w@8(DH=MRvUHKMKSJy<-E z(=7<2>$hnZa6)czl3Sz`O#nIW`%qAS=TH$oLHRo@1UgV(MOpYwkJm82z$sbOf$#7z z@A?6swI}sLp=Jx@M1&B*i}VB5x3$1~WJsdCjGWDK48P|CQ2%7sT72QQ6%{~q379CY z0L&18TQ>m30&r6a;7kBwMt{7itI9uNqWlVUI48LhR(VeSUUck+L-_G(oa!&Zj~Fay z^~XzF{nJVhG!Bu9&V|`i|Xr(!Q2 zhg$)?S=`ctg{~lMvLo%&Q*0m~S`5m_W@c|TtN`jr`-QQ7qz+4yM@l;fz!YqMr@m#v z`SOmeZuXeQPkq|EL4Ha1@b=h&4UMr!FOFGF7xuAHKZ>n&aY&nLpLIb;Q=jTbYvMEA zJWZdZP27(b?_ngFcJ$DK{kKotV!!@j@?MTMC)T>L^eK?hr;?IJ+Pl!LcnU`Q7GdRY zop8x6Z{%!@)ecxk+%XAv^lmHiZd*bwgm_C!{Ge5ea$SVQzmB~J-;0zcvxInBX=mt{ z%yDB_`jph&So(Iycvl^NJQvL01{L}F`HNvq90w59=`ViXk6&d6KS%)v>V@|HPpP&) z^vjNUE=B$utpux7tU@Kj+0hoImzY@(z+c z{~VAE-pA1PZrCpVz4`t8`LU1Z$3C1B`)f(}@K^njXR`cLUu$}Y?i>tYtX|h~y{^GWxN>u|4|@$6;JOA57;yZc zL4$H~)0dT%Js>A{FtivrsFx}Vv?M{C(WQ79J+5s0b6;u5ol24oG;qG*G}52D%SED znnt@0Z$Gt`)bnMxbV>DEQ!P+gpfcE0S7p_|&9|JUEC)9%k+;m+b-`eREiXUC$YF0cgo2fU#zwoU zHh?$03a(t<3ri7w$Ie9y7ca!fIoHPPc^6)$22r}{6(Xd+dUI1F7B5=0FI_mNbbiT~ zrGcsrx=Hm=p)Id)J%4l+MrU97X;1HAX7;Oa!z*xY#qnI5Z)bINBi;vV27F(>y)0>5 zj$Lb4R~goDN9_~PR}U94^r{NJ_Bds4@d((j7gxNxwr9Tx-Fa1k)#o*ZFpPq$Ts7#t zc8@X(FW0?JJY6eM0B@_nD{$F=fwh%^Kov%jhL<;0R|i7cPx`hu-DjHDx!UeF4Uh4D zHhfjR0=Zv5$lRaWEPZ>EW2HPC#G92vf%0Oj2RI%L2NtD*F764AcLg}z?eYd;D4H5r zskar|x+v`_e08wFDubk}$+%G}m(z4IfOWjl!MkriD%-*@Fl^kztp7lOw?WgC;lSFk zOJC!@jdlGLa*W{J(Kg>$W6KMw-5RShytPIZK|i5QnIcgC)uuob9Hl9|rXo~jFJFgu zW|Kz0gPOzN4h6zZAx6jgz>11+E#6H`w5A}x)N7~haP2D4fm=W0QFSwYCm2FBsB(MJ zn!xgOv0a0}RKFUphSt`WcPH!lyeSnt z80zaQG5cMFNoaJV?O(jO>^!_R-800GnU6u!Gh|7_icm#W;Cwq}CC`xAYQUj6=i9p6 zGh|Vqvfzrq@U9=SDkcm2NqyI|%;-xNiYaj^f zamK6)gz+ABOipN4+D{;=B8(;8irPv{dw2;rX1pdzFd?UU%j(fB>ABN%h+SM%Y%jw4 zpt6P>%0fZRyurF-?;bVFsvd&A>Gtd4j!@kYtjCH~+fe%v8UUw(my7c)53CDfHZPv2 zeHTj`WsiQw-xZ6>%Pzy5JbU5%a+#9tWuqIH4Y6Pa%uUBwE!zP1jof%Tv)54pa;;s>iSStTmP;jpqYs}5D5 zLFkTk(q&)<%nGi=(QmWX1y0xP%uHX0rMvOR;_*I1uh*RvPUotvZ$hjJ)|34z2tmXO zu3^v3#n;*dS12`SyXtX8z}R~pmR7o6Q+*w$%!W{4wY>(5NYu9GF*ad-4q|0jWfrat z73=DP2nWXF)g+6q!rHrbRnsa{ze4xz^8>k(Hfm8?g;d^%F z*~X@57~*uAD{J&!#~6izscI>nQop9*`~qG>Mi-vbI7G4v76nqYhRxZ*!^%(huMAW( zKt)oIrJ6B*%G4&Z%6bEC(f>N5Z~L0+SJu0i7{=f!&6kZnr;r0a1EI3BJ8rIOs6#~M zbZ)N*a_mTGx2#Qfb$5RcfB%BNQ}6EXz7Bu)s^yRk6IXDp0+mruN#E zb*t)w4POm4hMQKeS-bAKqT=xrCi*5#o-%dZd9+T}7Y0l#LyG&s*156n?r$S~IMLm` z7wJ<->yYldy}SEWq>Jw8?&eLOh|}FY5@&#a#!da{NbBD1?!E@;t4M!<6vvlxWnFif zm9;i6>kB6g&TYvWoJagf{Q1EbUv@}c{nK04nV=y3#*KJfcOWuD^X3j6ak2OKHMz~! zg{MqAXTq6y#!JbYgS7p&?ruCtmAY8>8vMNhc?n`mhUVRzJ=-%Vdm}Vbysrm(A=+1T zm8*QfxT{FO`rpQ1d9u5^mPci#{(!1%Q~ngt555Ea(C&2k*RohnonFOX?w`B6B|y#^ zns-O`tf3=rA5c2fP7Itibj)oz{-H%T5AqM45*fT;=*-a2DJ4UTN`{V^HPi+j=w}Vh z<5$uqgmyg{WWYq;z_TXJ~FTrRVQ$8(5sU8&2t9_MAs z|Nd!rknm=`D9#~(t45yy;@Mrf4(M`oF!;y-g~wx9a%uG}JY&-1(qvZi{kGuE+WSGx z*z8;IRROv1ke6H!;%jKU489DHeep7SU#S^!&&A8g*XB>r{i9p859XArc%od|+logr zd11#Xu+g(q!y|P0e`=WfhyRVMLHA2<-Z$$8-J{bV>2#Y;_v!SYPT$h$hdLdk9pfaO zo~6@CI-RT2%XM0<)73h?S*Q2t^hY|~rqg{oJ*d;Sbo!xA2WiJUNvCJ&bdpZz>hyA* zR_k=NPH)!fJv#l7PPgfFpH2_z^evr!sMA4uAe^MrvvfL1r*n0BxlXHfx>~0<>+~L- z{z#|Wbh=Nc2X*?EPCwM?AU)7d(&uSn8d_1`RJgpUwyvt6w#q6b zgrnjbtFUTaedD@SDh-EJ$!c{=s|QeyvQVI|f)#YGp)O33wfID~a77Rqxz$=&4J?R) zRTbe1t1wVgE;k6uYpTG+K&onaMJQCU&XO7?&svlP2BKHRs@h6qgxB)MMyn9_4|#(l zlj;6Tc+SJ@Q!~F#(HWEW4iBdPjSxKiD08i;zgTBXY8yhG@+i@LoeqF=wW%N08Izvr zCPuz@{fWSRh`H9(H|r6TvYeM$pO|%k!dli0WM~6Z->g?mx>+wIxa?rtO?^FGT62Np z8o<;y>mQSv^$tkn>TSPEfkmK9>2KB#CVfEHGx{4jCcPTvT$>o!tWQkZqU%$C>S^@X zXzN;JXj4<)td~rBr7ke?O?}gTdLir2!XGcQ{z|2Ky~g@nqxG)89yr=WlUZiHYErYV z1BqO{`+psU_K>mC9qkL0;&K-)tq>B}p z`s^+L5tO*=Z`SoqTEjxuW$b29ParcCe`XzR_9JEYDT?0o1~%!lAoIHFoBhg_M_K{<8aQK$f*k*YDk5s#Hki;}7^WV~Ys0E^jdPwPB5a8@&zRji}?6Z}vOQ_mi7t ztu-B^z|{CLeBK7$UEkVbE6Ob@S=Ot*E>(s2d>4P@m1_S3HXyYB0ZFVPm83rHR8D2y iLqhqz=}i5xz355H%bgn8sziOpwETa#k*<0M{r>=rrdQ(t diff --git a/dev-setup.sh b/dev-setup.sh index fd01517..4dd3e6b 100755 --- a/dev-setup.sh +++ b/dev-setup.sh @@ -85,6 +85,10 @@ cgroupsuffix="-cg" ! sudo groupadd $group && echo "Group creation failed." ! sudo echo "@$group cpu,memory %u$cgroupsuffix" >>/etc/cgrules.conf && echo "Cgroup rule creation failed." +# Setting up Sashimono admin group. +admin_group="sashiadmin" +! sudo groupadd $admin_group && echo "Admin group creation failed." + # Build sagent cmake . make \ No newline at end of file diff --git a/examples/message-board/message-board.js b/examples/message-board/message-board.js index e24d2c7..f4eaa3a 100644 --- a/examples/message-board/message-board.js +++ b/examples/message-board/message-board.js @@ -118,7 +118,6 @@ const interatctiveInterface = async () => { sendToAllAgents(JSON.stringify({ id: uuidv4(), type: 'initiate', - owner_pubkey: 'ed5cb83404120ac759609819591ef839b7d222c84f1f08b3012f490586159d2b50', container_name: containerName, peers: peers ? peers.split(',') : [], unl: unl ? unl.split(',') : [], @@ -133,7 +132,6 @@ const interatctiveInterface = async () => { sendToAllAgents(JSON.stringify({ id: uuidv4(), type: 'destroy', - owner_pubkey: 'ed5cb83404120ac759609819591ef839b7d222c84f1f08b3012f490586159d2b50', container_name: containerName })) break; @@ -142,7 +140,6 @@ const interatctiveInterface = async () => { sendToAllAgents(JSON.stringify({ id: uuidv4(), type: 'start', - owner_pubkey: 'ed5cb83404120ac759609819591ef839b7d222c84f1f08b3012f490586159d2b50', container_name: containerName })) break; @@ -151,7 +148,6 @@ const interatctiveInterface = async () => { sendToAllAgents(JSON.stringify({ id: uuidv4(), type: 'stop', - owner_pubkey: 'ed5cb83404120ac759609819591ef839b7d222c84f1f08b3012f490586159d2b50', container_name: containerName })) break; diff --git a/installer/sashimono-install.sh b/installer/sashimono-install.sh index 0e480ca..df614f9 100755 --- a/installer/sashimono-install.sh +++ b/installer/sashimono-install.sh @@ -8,6 +8,7 @@ sashimono_data=/etc/sashimono sashimono_service="sashimono-agent" cgcreate_service="sashimono-cgcreate" group="sashimonousers" +admin_group="sashiadmin" cgroupsuffix="-cg" registryuser="sashidockerreg" registryport=4444 @@ -58,7 +59,7 @@ function rollback() { } # Install Sashimono agent binaries into sashimono bin dir. -cp "$script_dir"/{sagent,hpfs,hpws,user-cgcreate.sh,user-install.sh,user-uninstall.sh} $sashimono_bin +cp "$script_dir"/{sagent,hpfs,user-cgcreate.sh,user-install.sh,user-uninstall.sh} $sashimono_bin chmod -R +x $sashimono_bin # Download and install rootless dockerd. @@ -79,6 +80,9 @@ selfip=$(ip -4 a l ens3 | awk '/inet/ {print $2}' | cut -d/ -f1) ! groupadd $group && echo "Group creation failed." && rollback ! echo "@$group cpu,memory %u$cgroupsuffix" >>/etc/cgrules.conf && echo "Cgroup rule creation failed." && rollback +# Setting up Sashimono admin group. +! groupadd $admin_group && echo "Admin group creation failed." && rollback + # Setup Sashimono data dir. cp -r "$script_dir"/contract_template $sashimono_data $sashimono_bin/sagent new $sashimono_data $selfip $registry_addr diff --git a/installer/sashimono-uninstall.sh b/installer/sashimono-uninstall.sh index 5cb8099..c987fbb 100755 --- a/installer/sashimono-uninstall.sh +++ b/installer/sashimono-uninstall.sh @@ -9,6 +9,7 @@ sashimono_service="sashimono-agent" cgcreate_service="sashimono-cgcreate" registryuser="sashidockerreg" group="sashimonousers" +admin_group="sashiadmin" cgroupsuffix="-cg" quiet=$1 @@ -80,5 +81,7 @@ echo "Deleting cgroup rules..." groupdel $group sed -i -r "/^@$group\s+cpu,memory\s+%u$cgroupsuffix/d" /etc/cgrules.conf +groupdel $admin_group + echo "Sashimono uninstalled successfully." exit 0 diff --git a/src/comm/comm_handler.cpp b/src/comm/comm_handler.cpp index 43105e8..ef2a88f 100644 --- a/src/comm/comm_handler.cpp +++ b/src/comm/comm_handler.cpp @@ -1,19 +1,60 @@ #include "comm_handler.hpp" #include "../util/util.hpp" #include "../conf.hpp" -#include "hpws.hpp" + +#define __HANDLE_RESPONSE(id, type, content, ret) \ + { \ + std::string res; \ + msg_parser.build_response(res, type, id, content, type == msg::MSGTYPE_CREATE_RES && ret == 0); \ + send(res); \ + return ret; \ + } namespace comm { constexpr uint32_t DEFAULT_MAX_MSG_SIZE = 1 * 1024 * 1024; // 1MB; bool init_success; + constexpr const int POLL_TIMEOUT = 10; + constexpr const int BUFFER_SIZE = 1024; + constexpr const int EMPTY_READ_TRESHOLD = 5; + msg::msg_parser msg_parser; comm_ctx ctx; int init() { - ctx.comm_handler_thread = std::thread(comm_handler_loop); + ctx.connection_socket = socket(AF_UNIX, SOCK_SEQPACKET, 0); + if (ctx.connection_socket == -1) + { + LOG_ERROR << errno << ": Error creating the socket."; + return -1; + } + struct sockaddr_un sock_name; + memset(&sock_name, 0, sizeof(struct sockaddr_un)); + sock_name.sun_family = AF_UNIX; + strncpy(sock_name.sun_path, conf::ctx.socket_path.c_str(), sizeof(sock_name.sun_path) - 1); + + // Remove the socket if it already exists. + unlink(conf::ctx.socket_path.c_str()); + + const std::string command = "chown :sashiadmin " + conf::ctx.socket_path; + + char mode[] = "0660"; // rw-rw---- + const mode_t permission_mode = strtol(mode, 0, 8); // Char to octal conversion. + + if (bind(ctx.connection_socket, (const struct sockaddr *)&sock_name, sizeof(struct sockaddr_un)) == -1 || + chmod(conf::ctx.socket_path.c_str(), permission_mode) == -1 || + system(command.data()) == -1 || + listen(ctx.connection_socket, 20) == -1) + { + LOG_ERROR << errno << ": Error binding the socket for " << conf::ctx.socket_path; + close(ctx.connection_socket); + return -1; + } + + msg_parser = msg::msg_parser(); + ctx.comm_handler_thread = std::thread(comm_handler_loop); init_success = true; return 0; @@ -27,48 +68,25 @@ namespace comm if (ctx.comm_handler_thread.joinable()) ctx.comm_handler_thread.join(); + + close(ctx.connection_socket); + unlink(conf::ctx.socket_path.c_str()); } } /** - * Make a connection and session to the given host. + * This accepts connections to the socket. * This only gets called whithin the comm handler thread. - * @param ip_port Ip and port of the host. * @return 0 on success -1 on error. */ - int connect(const conf::host_ip_port &ip_port) + int connect() { - std::string_view host = ip_port.host_address; - const uint16_t port = ip_port.port; - - LOG_DEBUG << "Trying to connect " << host << ":" << std::to_string(port); - - std::variant client_result = hpws::client::connect(conf::ctx.hpws_exe_path, DEFAULT_MAX_MSG_SIZE, host, port, "/", {}, util::fork_detach); - - if (std::holds_alternative(client_result)) + ctx.data_socket = accept(ctx.connection_socket, NULL, NULL); + if (ctx.data_socket == -1) { - const hpws::error error = std::get(client_result); - if (error.first != 202) - LOG_ERROR << "Connection hpws error:" << error.first << " " << error.second; + LOG_ERROR << errno << ": Error accepting the new connection."; return -1; } - else - { - hpws::client client = std::move(std::get(client_result)); - const std::variant host_result = client.host_address(); - if (std::holds_alternative(host_result)) - { - const hpws::error error = std::get(host_result); - LOG_ERROR << "Error getting ip from hpws:" << error.first << " " << error.second; - return -1; - } - else - { - const std::string &host_address = std::get(host_result); - ctx.session.emplace(host_address, std::move(client)); - ctx.session->init(); - } - } return 0; } @@ -78,11 +96,8 @@ namespace comm */ void disconnect() { - if (ctx.session.has_value()) - { - ctx.session->close(); - ctx.session.reset(); - } + close(ctx.data_socket); + ctx.data_socket = -1; } void comm_handler_loop() @@ -90,29 +105,45 @@ namespace comm LOG_INFO << "Message processor started."; util::mask_signal(); + struct pollfd pfd; + int empty_read_count = 0; // Helps to detect when the client is disconnected. while (!ctx.is_shutting_down) { - // Process queued messaged only if there's a session. - if (ctx.session.has_value()) + // Process queued messaged only if there's a socket connection. + if (ctx.data_socket != -1) { - // If no messages were processed in this cycle, wait for some time. - if (ctx.session->process_inbound_msg_queue() <= 0) - util::sleep(10); - - // If session is marked for closure since there's an issue, We disconnect the current session. - // And try to create a new session in the next round - if (ctx.session->state == SESSION_STATE::MUST_CLOSE) - { - LOG_DEBUG << "Closing the session due to a failure: " << ctx.session->display_name(); + std::string message; + const int ret = read_socket(message); + if (ret == -1) disconnect(); + else if (ret > 0) + handle_message(message); + else + { + empty_read_count++; + // Empty reads happens when client closed the connection. + // Disconnect connection after 5 consecutive empty reads. + if (empty_read_count == EMPTY_READ_TRESHOLD) + { + disconnect(); + empty_read_count = 0; + } util::sleep(1000); } } else { - // If host connection failed wait for some time. - if (connect(conf::cfg.server.ip_port) == -1) + pfd.fd = ctx.connection_socket; + pfd.events = POLLIN; + + // Wait for some time if no connections are available. + if (poll(&pfd, 1, POLL_TIMEOUT) > 0) + { + connect(); + empty_read_count = 0; + } + else util::sleep(1000); } } @@ -130,4 +161,113 @@ namespace comm { ctx.comm_handler_thread.join(); } + + /** + * Handles the received message. + * @param msg Received message. + * @return 0 on success -1 on error. + */ + int handle_message(std::string_view msg) + { + std::string id, type; + if (msg_parser.parse(msg) == -1 || msg_parser.extract_type_and_id(type, id) == -1) + __HANDLE_RESPONSE("", "error", "format_error", -1); + + if (type == msg::MSGTYPE_CREATE) + { + msg::create_msg msg; + if (msg_parser.extract_create_message(msg) == -1) + __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_CREATE_RES, "format_error", -1); + + hp::instance_info info; + if (hp::create_new_instance(info, msg.pubkey, msg.contract_id, msg.image) == -1) + __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_CREATE_RES, "create_error", -1); + + std::string create_res; + msg_parser.build_create_response(create_res, info); + __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_CREATE_RES, create_res, 0); + } + else if (type == msg::MSGTYPE_INITIATE) + { + msg::initiate_msg msg; + if (msg_parser.extract_initiate_message(msg) == -1) + __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_INITIATE_RES, "format_error", -1); + + if (hp::initiate_instance(msg.container_name, msg) == -1) + __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_INITIATE_RES, "init_error", -1); + + __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_INITIATE_RES, "initiated", 0); + } + else if (type == msg::MSGTYPE_DESTROY) + { + msg::destroy_msg msg; + if (msg_parser.extract_destroy_message(msg)) + __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_DESTROY_RES, "format_error", -1); + + if (hp::destroy_container(msg.container_name) == -1) + __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_DESTROY_RES, "destroy_error", -1); + + __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_DESTROY_RES, "destroyed", 0); + } + else if (type == msg::MSGTYPE_START) + { + msg::start_msg msg; + if (msg_parser.extract_start_message(msg)) + __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_START_RES, "format_error", -1); + + if (hp::start_container(msg.container_name) == -1) + __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_START_RES, "start_error", -1); + + __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_START_RES, "started", 0); + } + else if (type == msg::MSGTYPE_STOP) + { + msg::stop_msg msg; + if (msg_parser.extract_stop_message(msg)) + __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_STOP_RES, "format_error", -1); + + if (hp::stop_container(msg.container_name) == -1) + __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_STOP_RES, "stop_error", -1); + + __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_STOP_RES, "stopped", 0); + } + else + __HANDLE_RESPONSE(id, "error", "type_error", -1); + + return 0; + } + + /** + * Sends the given message to the connected client. + * @param message Message to send. + * @return 0 on success -1 on error. + **/ + int send(std::string_view message) + { + if (ctx.data_socket == -1) + return -1; + + const int ret = write(ctx.data_socket, message.data(), message.length() + 1); + // Close connection after sending the response to the client. + disconnect(); + return ret == -1 ? -1 : 0; + } + + /** + * Reads the message from the connected client. + * @param message Placeholder to store the message. + * @return Number of bytes read on success -1 on error. + **/ + int read_socket(std::string &message) + { + char buffer[BUFFER_SIZE]; + const int ret = read(ctx.data_socket, buffer, BUFFER_SIZE); + if (ret == -1) + { + LOG_ERROR << errno << ": Error receiving data."; + return -1; + } + message = std::string(buffer); + return ret; + } } // namespace comm diff --git a/src/comm/comm_handler.hpp b/src/comm/comm_handler.hpp index 7f6b081..6ec2243 100644 --- a/src/comm/comm_handler.hpp +++ b/src/comm/comm_handler.hpp @@ -2,15 +2,16 @@ #define _SA_COMM_COMM_SERVER_ #include "../pchheader.hpp" -#include "comm_session.hpp" +#include "../msg/msg_parser.hpp" namespace comm { struct comm_ctx { - std::optional session; bool is_shutting_down = false; std::thread comm_handler_thread; // Incoming message processor thread. + int connection_socket = -1; + int data_socket = -1; }; extern comm_ctx ctx; @@ -19,14 +20,20 @@ namespace comm void deinit(); - int connect(const conf::host_ip_port &ip_port); + int connect(); void disconnect(); void comm_handler_loop(); + int handle_message(std::string_view msg); + + int send(std::string_view message); + void wait(); + int read_socket(std::string &message); + } // namespace comm #endif diff --git a/src/comm/comm_session.cpp b/src/comm/comm_session.cpp deleted file mode 100644 index 607a0cb..0000000 --- a/src/comm/comm_session.cpp +++ /dev/null @@ -1,302 +0,0 @@ -#include "comm_session.hpp" -#include "../util/util.hpp" -#include "../hp_manager.hpp" - -#define __HANDLE_RESPONSE(id, type, content, ret) \ - { \ - std::string res; \ - msg_parser.build_response(res, type, id, content, type == msg::MSGTYPE_CREATE_RES && ret == 0); \ - send(res); \ - return ret; \ - } - -namespace comm -{ - constexpr uint16_t MAX_IN_MSG_QUEUE_SIZE = 64; // Maximum in message queue size, The size passed is rounded to next number in binary sequence 1(1),11(3),111(7),1111(15),11111(31).... - - comm_session::comm_session( - std::string_view host_address, hpws::client &&hpws_client) - : uniqueid(host_address), - host_address(host_address), - hpws_client(std::move(hpws_client)), - msg_parser(msg::msg_parser()), - in_msg_queue(MAX_IN_MSG_QUEUE_SIZE) - { - } - - /** - * Init() should be called to activate the session. - * Because we are starting threads here, after init() is called, the session object must not be "std::moved". - * @return returns 0 on successful init, otherwise -1; - */ - int comm_session::init() - { - if (state == SESSION_STATE::NONE) - { - reader_thread = std::thread(&comm_session::reader_loop, this); - writer_thread = std::thread(&comm_session::outbound_msg_queue_processor, this); - state = SESSION_STATE::ACTIVE; - - // Send an initial message to the host. - std::string res; - msg_parser.build_response(res, msg::MSGTYPE_INIT, {}, "Connection initiated."); - send(res); - LOG_DEBUG << "Session started: " << uniqueid; - } - - return 0; - } - - /** - * Listening for receiving messages and process them. - */ - void comm_session::reader_loop() - { - util::mask_signal(); - - while (state != SESSION_STATE::CLOSED && hpws_client) - { - // If reading from the hpws_client failed we'll mark this session to closure. - bool should_disconnect = false; - - const std::variant read_result = hpws_client->read(); - if (std::holds_alternative(read_result)) - { - should_disconnect = true; - const hpws::error error = std::get(read_result); - if (error.first != 1) // 1 indicates channel has closed. - LOG_DEBUG << "hpws client read failed:" << error.first << " " << error.second; - } - else - { - // Enqueue the message for processing. - std::string_view data = std::get(read_result); - in_msg_queue.try_enqueue(std::string(data)); - - // Signal the hpws client that we are ready for next message. - const std::optional error = hpws_client->ack(data); - if (error.has_value()) - { - should_disconnect = true; - LOG_DEBUG << "hpws client ack failed:" << error->first << " " << error->second; - } - } - - if (should_disconnect) - { - // Here we mark the session as needing to close. - // The session will be properly "closed" and cleared from comm_handler. - // Then comm_handler will try to initiate a new session with the host. - mark_for_closure(); - break; - } - } - } - - /** - * Processes the unprocessed queued inbound messages (if any). - * @return 0 if no messages in queue. 1 if messages were processed. -1 error occured - */ - int comm_session::process_inbound_msg_queue() - { - if (state == SESSION_STATE::CLOSED) - return -1; - - bool messages_processed = false; - std::string msg_to_process; - - // Process all messages in queue. - while (in_msg_queue.try_dequeue(msg_to_process)) - { - handle_message(msg_to_process); - msg_to_process.clear(); - messages_processed = true; - } - - return messages_processed ? 1 : 0; - } - - /** - * This function constructs and sends the message to the target from the given message. - * @param message Message to be sent via the pipe. - * @return 0 on successful message sent and -1 on error. - */ - int comm_session::process_outbound_message(std::string_view message) - { - if (state == SESSION_STATE::CLOSED || !hpws_client) - return -1; - - const std::optional error = hpws_client->write(message); - if (error.has_value()) - { - LOG_ERROR << "hpws client write failed:" << error->first << " " << error->second; - return -1; - } - return 0; - } - - /** - * Loop to keep processing outbound messages in the queue. - */ - void comm_session::outbound_msg_queue_processor() - { - // Appling a signal mask to prevent receiving control signals from linux kernel. - util::mask_signal(); - - // Keep checking until the session is terminated. - while (state != SESSION_STATE::CLOSED) - { - bool messages_sent = false; - std::string msg_to_send; - - // Send all messages in queue. - while (out_msg_queue.try_dequeue(msg_to_send)) - { - process_outbound_message(msg_to_send); - msg_to_send.clear(); - messages_sent = true; - } - - // Wait for small delay if there were no outbound messages. - if (!messages_sent) - util::sleep(10); - } - } - - /** - * Handles the received message. - * @param msg Received message. - * @return 0 on success -1 on error. - */ - int comm_session::handle_message(std::string_view msg) - { - std::string id, type; - if (msg_parser.parse(msg) == -1 || msg_parser.extract_type_and_id(type, id) == -1) - __HANDLE_RESPONSE("", "error", "format_error", -1); - - if (type == msg::MSGTYPE_CREATE) - { - msg::create_msg msg; - if (msg_parser.extract_create_message(msg) == -1) - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_CREATE_RES, "format_error", -1); - - hp::instance_info info; - if (hp::create_new_instance(info, msg.pubkey, msg.contract_id, msg.image) == -1) - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_CREATE_RES, "create_error", -1); - - std::string create_res; - msg_parser.build_create_response(create_res, info); - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_CREATE_RES, create_res, 0); - } - else if (type == msg::MSGTYPE_INITIATE) - { - msg::initiate_msg msg; - if (msg_parser.extract_initiate_message(msg) == -1) - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_INITIATE_RES, "format_error", -1); - - if (hp::initiate_instance(msg.container_name, msg) == -1) - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_INITIATE_RES, "init_error", -1); - - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_INITIATE_RES, "initiated", 0); - } - else if (type == msg::MSGTYPE_DESTROY) - { - msg::destroy_msg msg; - if (msg_parser.extract_destroy_message(msg)) - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_DESTROY_RES, "format_error", -1); - - if (hp::destroy_container(msg.container_name) == -1) - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_DESTROY_RES, "destroy_error", -1); - - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_DESTROY_RES, "destroyed", 0); - } - else if (type == msg::MSGTYPE_START) - { - msg::start_msg msg; - if (msg_parser.extract_start_message(msg)) - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_START_RES, "format_error", -1); - - if (hp::start_container(msg.container_name) == -1) - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_START_RES, "start_error", -1); - - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_START_RES, "started", 0); - } - else if (type == msg::MSGTYPE_STOP) - { - msg::stop_msg msg; - if (msg_parser.extract_stop_message(msg)) - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_STOP_RES, "format_error", -1); - - if (hp::stop_container(msg.container_name) == -1) - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_STOP_RES, "stop_error", -1); - - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_STOP_RES, "stopped", 0); - } - else - __HANDLE_RESPONSE(id, "error", "type_error", -1); - - return 0; - } - - /** - * Adds the given message to the outbound message queue. - * @param message Message to be added to the outbound queue. - * @return 0 on successful addition and -1 if the session is already closed. - */ - int comm_session::send(std::string_view message) - { - if (state == SESSION_STATE::CLOSED) - return -1; - - // Passing the ownership of message to the queue. - out_msg_queue.enqueue(std::string(message)); - - return 0; - } - - /** - * Mark the session as needing to close. - * The session will be properly "closed" by comm_handler. - */ - void comm_session::mark_for_closure() - { - if (state == SESSION_STATE::CLOSED) - return; - - state = SESSION_STATE::MUST_CLOSE; - } - - /** - * Close the connection and wrap up any session processing threads. - * This will be only called by the global comm_handler. - */ - void comm_session::close() - { - if (state == SESSION_STATE::CLOSED) - return; - - state = SESSION_STATE::CLOSED; - - // Destruct the hpws client instance so it will close the sockets and related processes. - hpws_client.reset(); - - // Wait untill reader/writer threads gracefully stop. - if (writer_thread.joinable()) - writer_thread.join(); - - if (reader_thread.joinable()) - reader_thread.join(); - - LOG_DEBUG << "Session closed: " << uniqueid; - } - - /** - * Returns printable name for the session based on uniqueid (used for logging). - * @return The display name as a string. - */ - const std::string comm_session::display_name() const - { - return uniqueid; - } - -} // namespace comm \ No newline at end of file diff --git a/src/comm/comm_session.hpp b/src/comm/comm_session.hpp deleted file mode 100644 index 8d7c7af..0000000 --- a/src/comm/comm_session.hpp +++ /dev/null @@ -1,54 +0,0 @@ -#ifndef _HP_COMM_COMM_SESSION_ -#define _HP_COMM_COMM_SESSION_ - -#include "../pchheader.hpp" -#include "../conf.hpp" -#include "hpws.hpp" -#include "../msg/msg_parser.hpp" - -namespace comm -{ - enum SESSION_STATE - { - NONE, // Session is not yet initialized properly. - ACTIVE, // Session is active and functioning. - MUST_CLOSE, // Session socket is in unusable state and must be closed. - CLOSED // Session is fully closed. - }; - - /** - * Represents an active WebSocket connection - */ - class comm_session - { - private: - std::optional hpws_client; - msg::msg_parser msg_parser; // Message parser. - const std::string uniqueid; // IP address. - const std::string host_address; // Connection host address of the remote party. - - std::thread reader_thread; // The thread responsible for reading messages from the read fd. - std::thread writer_thread; // The thread responsible for writing messages to the write fd. - moodycamel::ReaderWriterQueue in_msg_queue; // Holds incoming messages waiting to be processed. - moodycamel::ConcurrentQueue out_msg_queue; // Holds outgoing messages waiting to be processed. - - void reader_loop(); - int handle_message(std::string_view msg); - int process_outbound_message(std::string_view message); - void outbound_msg_queue_processor(); - void mark_for_closure(); - - public: - SESSION_STATE state = SESSION_STATE::NONE; - comm_session( - std::string_view host_address, hpws::client &&hpws_client); - int init(); - int send(std::string_view message); - int process_inbound_msg_queue(); - void close(); - const std::string display_name() const; - }; - -} // namespace comm - -#endif diff --git a/src/comm/hpws.hpp b/src/comm/hpws.hpp deleted file mode 100644 index 737f538..0000000 --- a/src/comm/hpws.hpp +++ /dev/null @@ -1,995 +0,0 @@ -#ifndef HPWS_INCLUDE -#define HPWS_INCLUDE -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#define DECODE_O_SIZE(control_msg, into) \ - { \ - into = ((uint32_t)control_msg[2] << 24) + ((uint32_t)control_msg[3] << 16) + \ - ((uint32_t)control_msg[4] << 8) + ((uint32_t)control_msg[5] << 0); \ - } - -#define ENCODE_O_SIZE(control_msg, from) \ - { \ - uint32_t f = from; \ - control_msg[2] = (unsigned char)((f >> 24) & 0xff); \ - control_msg[3] = (unsigned char)((f >> 16) & 0xff); \ - control_msg[4] = (unsigned char)((f >> 8) & 0xff); \ - control_msg[5] = (unsigned char)((f >> 0) & 0xff); \ - } - -#define HPWS_DEBUG 0 - -namespace hpws -{ - /*typedef enum e_retcode { - SUCCESS - } retcode; - */ - using error = std::pair; - -// used when waiting for messages that should already be on the pipe -#define HPWS_SMALL_TIMEOUT 10 -// used when waiting for server process to spawn -#define HPWS_LONG_TIMEOUT 1500 // This timeout has to account the possible delays in communication via internet. - - typedef union - { - struct sockaddr sa; - struct sockaddr_in sin; - struct sockaddr_in6 sin6; - struct sockaddr_storage ss; - } addr_t; - - class server; - - class client - { - - private: - pid_t child_pid = 0; // if this client was created by a connect this is set - // this value can't be changed once it's established between the processes - uint32_t max_buffer_size; - bool moved = false; - addr_t endpoint; - std::string get; // the get req this websocket was opened with - int control_line_fd[2]; // see below in client constructor - int buffer_fd[4]; // 0 1 - in buffers, 2 3 - out buffers - int buffer_lock[2] = {0, 0}; // this records if buffers 2 and 3 have been sent out awaiting an ack or not - void *buffer[4]; - - // private constructor - client( - std::string_view get, - addr_t endpoint, - int control_line_fd_0, // hpws -> sagent [ hpws sends bufs to us over this line ] - int control_line_fd_1, // sagent -> hpws [ we send bufs to hpws over this line ] - uint32_t max_buffer_size, - pid_t child_pid, - int buffer_fd[4], - void *buffer[4]) : endpoint(endpoint), - max_buffer_size(max_buffer_size), - child_pid(child_pid), get(get) - { - control_line_fd[0] = control_line_fd_0; - control_line_fd[1] = control_line_fd_1; - for (int i = 0; i < 4; ++i) - { - this->buffer[i] = buffer[i]; - this->buffer_fd[i] = buffer_fd[i]; - } - - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] child constructed pid = %d\n", child_pid); - } - - public: - // No copy constructor - client(const client &) = delete; - - // only a move constructor - client(client &&old) : child_pid(old.child_pid), - max_buffer_size(old.max_buffer_size), - endpoint(old.endpoint), - get(old.get) - { - old.moved = true; - for (int i = 0; i <= 1; ++i) - { - this->control_line_fd[i] = old.control_line_fd[i]; - buffer_lock[i] = old.buffer_lock[i]; - } - for (int i = 0; i < 4; ++i) - { - this->buffer[i] = old.buffer[i]; - this->buffer_fd[i] = old.buffer_fd[i]; - } - } - - ~client() - { - if (!moved) - { - - // RH TODO ensure this pid terminates by following up with a SIGKILL - if (child_pid > 0) - { - kill(child_pid, SIGTERM); - int status; - waitpid(child_pid, &status, 0 /* should we use WNOHANG? */); - } - - for (int i = 0; i < 4; ++i) - { - munmap(buffer[i], max_buffer_size); - ::close(buffer_fd[i]); - } - - ::close(control_line_fd[0]); - ::close(control_line_fd[1]); - - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] child destructed pid = %d\n", child_pid); - } - } - - const std::variant host_address() - { - char hostname[NI_MAXHOST]; - const int ret = getnameinfo((sockaddr *)&endpoint, sizeof(sockaddr), hostname, sizeof(hostname), NULL, 0, NI_NUMERICHOST); - if (ret != 0) - return error{10, gai_strerror(ret)}; - - return hostname; - } - - std::variant read() - { - - unsigned char buf[32]; - int bytes_read = recv(control_line_fd[0], buf, sizeof(buf), 0); - if (bytes_read < 1) - { - if (HPWS_DEBUG) - { - perror("recv"); - fprintf(stderr, "[HPWS.HPP] bytes received %d\n", bytes_read); - } - return error{1, "[read] control line could not be read"}; // todo clean up somehow? - } - - switch (buf[0]) - { - case 'o': - { - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] o message received\n"); - - if (bytes_read != 6) - return error{3, "invalid buffer in 'o' command sent by hpws"}; - - uint32_t len = 0; - DECODE_O_SIZE(buf, len); - - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] o message len: %u\n", len); - - int bufno = buf[1] - '0'; - if (bufno != 0 && bufno != 1) - return error{3, "invalid buffer in 'o' command sent by hpws"}; - - if (HPWS_DEBUG) - { - fprintf(stderr, "[HPWS.HPP] read %d\n", len); - for (uint32_t i = 0; i < len; ++i) - putc(((char *)(buffer[bufno]))[i], stderr); - fprintf(stderr, "\n---\n"); - } - return std::string_view{(const char *)(buffer[bufno]), len}; - } - case 'c': - { - return error{1000, "ws closed"}; - } - default: - { - fprintf(stderr, "[HPWS.HPP] read unknown control message 1: `%.*s`\n", bytes_read, buf); - return error{2, "unknown control line command was sent by hpws"}; - } - } - } - - void close() - { - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] close called\n"); - - // send the control message informing hpws that we wish to close - char buf[1] = {'c'}; - - ::write(control_line_fd[1], buf, 1); - - // wait for the process to end gracefully - int status; - printf("waitpid result: %d\n", waitpid(child_pid, &status, 0)); // add timeout here? - } - - std::optional write(std::string_view to_write) - { - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] write called for message: `%.*s`\n", - (int)to_write.size(), to_write.data()); - - // check if we have any free buffers - if (buffer_lock[0] && buffer_lock[1]) - { - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] no free buffers while writing, waiting for an ack on control_line_fd[1]=%d\n", control_line_fd[1]); - - // no free buffers, wait for a ack - unsigned char buf[32]; - int bytes_read = 0; - - bytes_read = recv(control_line_fd[1], buf, sizeof(buf), 0); - if (bytes_read < 1) - { - perror("recv"); - return error{1, "[write] control line could not be read"}; // todo clean up somehow? - } - - switch (buf[0]) - { - case 'a': - { - if (bytes_read != 2) - return error{4, "received an ack longer than 2 bytes"}; - int bufno = buf[1] - '0'; - if (!(bufno == 0 || bufno == 1)) - return error{5, "received an ack with an invalid buffer, expecting 0 or 1"}; - // unlock the buffer - buffer_lock[bufno] = 0; - break; - } - case 'c': - return error{1000, "ws closed"}; - default: - fprintf(stderr, "[HPWS.HPP] read unknown control message 2: `%.*s`\n", bytes_read, buf); - return error{2, "unknown control line command was sent by hpws"}; - } - } - - // execution to here ensures at least one buffer is free - int bufno = (buffer_lock[0] == 0 ? 2 : 3); - - // update the selected buffer lock - buffer_lock[bufno - 2] = 1; - - // write into the buffer - memcpy(buffer[bufno], to_write.data(), to_write.size()); - - // send the control message informing hpws that a message is ready on this buffer - uint32_t len = to_write.size(); - char buf[6] = {'o', (char)('0' + (bufno - 2)), 0, 0, 0, 0}; - ENCODE_O_SIZE(buf, len); - - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] writing 'o' to control_line_fd[1]=%d\n", - control_line_fd[1]); - - if (::write(control_line_fd[1], buf, 6) != 6) - return error{6, "could not write o message to control line"}; - - return std::nullopt; - } - - std::optional ack(std::string_view from_read) - { - char msg[2] = {'a', '0'}; - if (from_read.data() == buffer[1]) - msg[1]++; - if (send(control_line_fd[0], msg, 2, 0) < 2) - return error{10, "could not send ack down control line"}; - return std::nullopt; - } - - static std::variant connect( - std::string_view bin_path, - uint32_t max_buffer_size, - std::string_view host, - uint16_t port, - std::string_view get, - std::vector argv, - std::function fork_child_init = NULL) - { - -#define HPWS_CONNECT_ERROR(code, msg) \ - { \ - error_code = code; \ - error_msg = msg; \ - goto connect_error; \ - } - - int error_code = -1; - const char *error_msg = NULL; - int fd[4] = {-1, -1, -1, -1}; // 0,1 are hpws->sagent, 2,3 are sagent->hpws - int buffer_fd[4] = {-1, -1, -1, -1}; - void *mapping[4] = {NULL, NULL, NULL, NULL}; - int pid = -1; - int count_args = 14 + argv.size(); - char const **argv_pass = NULL; - - if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, fd)) - HPWS_CONNECT_ERROR(100, "could not create unix domain socket pair"); - - if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, fd + 2)) - HPWS_CONNECT_ERROR(101, "could not create unix domain socket pair"); - - // construct the arguments - char shm_size[32]; - - if (snprintf(shm_size, 32, "%d", max_buffer_size) <= 0) - HPWS_CONNECT_ERROR(90, "couldn't write shm size to string"); - - char port_str[6]; - if (snprintf(port_str, 6, "%d", port) <= 0) - HPWS_CONNECT_ERROR(91, "couldn't write port to string"); - - char cfd1[10]; - char cfd2[10]; - - snprintf(cfd1, 10, "%d", fd[1]); - snprintf(cfd2, 10, "%d", fd[3]); - - argv_pass = - reinterpret_cast(alloca(sizeof(char *) * count_args)); - { - int upto = 0; - argv_pass[upto++] = bin_path.data(); - argv_pass[upto++] = "--client"; - argv_pass[upto++] = "--maxmsg"; - argv_pass[upto++] = shm_size; - argv_pass[upto++] = "--host"; - argv_pass[upto++] = host.data(); - argv_pass[upto++] = "--port"; - argv_pass[upto++] = port_str; - argv_pass[upto++] = "--cntlfd"; - argv_pass[upto++] = cfd1; - argv_pass[upto++] = "--cntlfd2"; - argv_pass[upto++] = cfd2; - argv_pass[upto++] = "--get"; - argv_pass[upto++] = get.data(); - for (std::string_view &arg : argv) - argv_pass[upto++] = arg.data(); - argv_pass[upto] = NULL; - } - - pid = vfork(); - - if (pid) - { - - // --- PARENT - - // Fds are set to -1, so when error occurred these fds won't get closed again. - ::close(fd[1]); - fd[1] = -1; - ::close(fd[3]); - fd[3] = -1; - - int child_fd[2] = {fd[0], fd[2]}; - - int flags[2] = { - fcntl(child_fd[0], F_GETFD, NULL), - fcntl(child_fd[1], F_GETFD, NULL)}; - - if (flags[0] < 0 || flags[1] < 0) - HPWS_CONNECT_ERROR(101, "could not get flags from unix domain socket"); - - flags[0] |= FD_CLOEXEC; - flags[1] |= FD_CLOEXEC; - if (fcntl(child_fd[0], F_SETFD, flags[0]) || fcntl(child_fd[1], F_SETFD, flags[1])) - HPWS_CONNECT_ERROR(102, "could notset flags for unix domain socket"); - - // we will set a timeout and wait for the initial startup message from hpws client mode - struct pollfd pfd; - int ret; - - pfd.fd = child_fd[0]; // we receive setup events on control line 0 (hpws->sagent) - pfd.events = POLLIN; - ret = poll(&pfd, 1, HPWS_LONG_TIMEOUT); // default= 1500 ms timeout - - // timeout or error - if (ret < 1) - HPWS_CONNECT_ERROR(1, "timeout waiting for hpws connect message"); - - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] waiting for addr_t\n"); - // first thing we'll receive is the sockaddr union - addr_t child_addr; - - int bytes_read = - recv(child_fd[0], (unsigned char *)(&child_addr), sizeof(child_addr), 0); - - if (bytes_read < sizeof(child_addr)) - HPWS_CONNECT_ERROR(202, "received message on control line was not sizeof(addr_t)"); - - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] waiting for buffer fds\n"); - - // second thing we will receive is the four fds for the buffers - { - struct msghdr child_msg = {0}; - memset(&child_msg, 0, sizeof(child_msg)); - char cmsgbuf[CMSG_SPACE(sizeof(int) * 4)]; - child_msg.msg_control = cmsgbuf; - child_msg.msg_controllen = sizeof(cmsgbuf); - - int bytes_read = - recvmsg(child_fd[0], &child_msg, 0); - struct cmsghdr *cmsg = CMSG_FIRSTHDR(&child_msg); - if (cmsg == NULL || cmsg->cmsg_type != SCM_RIGHTS) - HPWS_CONNECT_ERROR(203, "non-scm_rights message sent on accept child control line"); - memcpy(&buffer_fd, CMSG_DATA(cmsg), sizeof(buffer_fd)); - for (int i = 0; i < 4; ++i) - { - //fprintf(stderr, "scm passed buffer_fd[%d] = %d\n", i, buffer_fd[i]); - if (buffer_fd[i] < 0) - HPWS_CONNECT_ERROR(203, "child accept scm_rights a passed buffer fd was negative"); - mapping[i] = - mmap(0, max_buffer_size, PROT_READ | PROT_WRITE, MAP_SHARED, buffer_fd[i], 0); - if (mapping[i] == (void *)(-1)) - HPWS_CONNECT_ERROR(204, "could not mmap scm_rights passed buffer fd"); - } - } - - for (int i = 0; i <= 1; ++i) - { - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] waiting for 'r' on child_fd[%d]=%d\n", i, child_fd[i]); - - pfd.fd = child_fd[i]; - // now we wait for a 'r' ready message or for the socket/client to die - ret = poll(&pfd, 1, HPWS_LONG_TIMEOUT); // default= 1500 ms timeout - - char rbuf[2]; - bytes_read = recv(child_fd[i], rbuf, sizeof(rbuf), 0); - if (bytes_read < 1) - HPWS_CONNECT_ERROR(2, "nil message sent by hpws on startup"); - - if (rbuf[0] != 'r') - HPWS_CONNECT_ERROR(3, "unexpected content in message sent by hpws client mode on startup"); - - if (rbuf[1] != '0' + i) - HPWS_CONNECT_ERROR(4, "received wrong r message on control fd"); - - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] received 'r%c' on child_fd[%d]=%d\n", rbuf[1], i, child_fd[i]); - } - - return client{ - get, - child_addr, - child_fd[0], - child_fd[1], - max_buffer_size, - pid, - buffer_fd, - mapping}; - } - else - { - - // --- CHILD - if (fork_child_init) - fork_child_init(); - - ::close(fd[0]); - ::close(fd[2]); - - // dup fd[1] into fd 3 - /*if (dup2(fd[1], 3) == -1) - perror("dup2 fd[1]"); - if (dup2(fd[3], 4) == -1) - perror("dup2 fd[3]"); - */ - // ::close(fd[1]); - // ::close(fd[3]); - - // we're assuming all fds above 3 will have close_exec flag - execv(bin_path.data(), (char *const *)argv_pass); - // we will send a nil message down the pipe to help the parent know somethings gone wrong - char nil[1]; - nil[0] = 0; - send(3, nil, 1, 0); - exit(1); // execl failure as child will always result in exit here - } - - connect_error:; - - // NB: execution to here can only happen in parent process - // clean up any mess after error - if (pid > 0) - { - kill((pid_t)pid, SIGKILL); /* RH TODO change this to SIGTERM and set a timeout? */ - int status; - waitpid(pid, &status, 0 /* should we use WNOHANG? */); - } - for (int i = 0; i < 4; ++i) - { - if (fd[i] > 0) - ::close(fd[i]); - if (mapping[i] != MAP_FAILED && mapping[i] != NULL) - munmap(mapping[i], max_buffer_size); - if (buffer_fd[i] > -1) - ::close(buffer_fd[i]); - } - - return error{error_code, std::string{error_msg}}; - } - friend class server; - }; - - class server - { - - private: - pid_t server_pid_; - int master_control_fd_; - uint32_t max_buffer_size_; - bool moved = false; - - // private constructor - server(pid_t server_pid, int master_control_fd, uint32_t max_buffer_size) - : server_pid_(server_pid), master_control_fd_(master_control_fd), max_buffer_size_(max_buffer_size) {} - - void accept_cleanup(void *mapping[4], int child_fd[2], int buffer_fd[4], uint32_t pid_child) - { - for (int i = 0; i < 4; i++) - { - if (mapping[i] != MAP_FAILED && mapping[i] != NULL) - munmap(mapping[i], max_buffer_size_); - if (i < 2 && child_fd[i] > -1) - ::close(child_fd[i]); - if (buffer_fd[i] > -1) - ::close(buffer_fd[i]); - } - - if (pid_child > 0) - { - int ret1 = kill(pid_child, SIGTERM); - int wstat; - int ret2 = waitpid(pid_child, &wstat, 0); - } - } - - public: - // No copy constructor - server(const server &) = delete; - - // only a move constructor - server(server &&old) : server_pid_(old.server_pid_), - master_control_fd_(old.master_control_fd_), - max_buffer_size_(old.max_buffer_size_) - { - old.moved = true; - } - - pid_t server_pid() - { - return server_pid_; - } - - int master_control_fd() - { - return master_control_fd_; - } - - uint32_t max_buffer_size() - { - return max_buffer_size_; - } - - std::variant accept(const bool no_block = false) - { - - static int calls = 0; - ++calls; - - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] Accept[0] called %d\n", calls); - -#define HPWS_ACCEPT_ERROR(code, msg) \ - { \ - accept_cleanup(mapping, child_fd, buffer_fd, pid); \ - return error{code, msg}; \ - } - - int child_fd[2] = {-1, -1}; - int buffer_fd[4] = {-1, -1, -1, -1}; - void *mapping[4] = {NULL, NULL, NULL, NULL}; - // must not use pid_t here since we transfer across IPC channel as a uint32. - uint32_t pid = 0; - - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] Accept[1] called %d\n", calls); - { - struct msghdr child_msg = {0}; - memset(&child_msg, 0, sizeof(child_msg)); - char cmsgbuf[CMSG_SPACE(sizeof(int) * 2)]; - child_msg.msg_control = cmsgbuf; - child_msg.msg_controllen = sizeof(cmsgbuf); - - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] Accept[2] called %d\n", calls); - - // If no-block is specified, we first check any bytes available on control fd - // before attempting to do a blocking a read. - if (no_block) - { - struct pollfd master_pfd; - master_pfd.fd = this->master_control_fd_; - master_pfd.events = POLLERR | POLLHUP | POLLNVAL | POLLIN; - const int master_poll_result = poll(&master_pfd, 1, HPWS_SMALL_TIMEOUT); - - if (master_poll_result == -1) // 1 ms timeout - HPWS_ACCEPT_ERROR(200, "poll failed on master control line"); - - if (master_poll_result == 0) // No data available - HPWS_ACCEPT_ERROR(199, "no new client available"); - } - - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] Accept[3] called %d\n", calls); - - int bytes_read = - recvmsg(this->master_control_fd_, &child_msg, 0); - struct cmsghdr *cmsg = CMSG_FIRSTHDR(&child_msg); - if (cmsg == NULL || cmsg->cmsg_type != SCM_RIGHTS) - HPWS_ACCEPT_ERROR(200, "non-scm_rights message sent on master control line"); - memcpy(&child_fd, CMSG_DATA(cmsg), sizeof(child_fd)); - if (child_fd[0] < 0 || child_fd[1] < 0) - HPWS_ACCEPT_ERROR(201, "scm_rights passed fd/s were negative"); - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] On accept received SCM: child_fd[0] = %d, child_fd[1] = %d\n", - child_fd[0], child_fd[1]); - - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] Accept[4] called %d\n", calls); - } - - // read info from child control line with a timeout - struct pollfd pfd; - int ret; - - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] Accept[5] called %d\n", calls); - - pfd.fd = child_fd[0]; // expect all setup messages on the hpws->sagent controlfd (0) - pfd.events = POLLIN; - ret = poll(&pfd, 1, HPWS_SMALL_TIMEOUT); // 1 ms timeout - - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] Accept[6] called %d\n", calls); - - // timeout or error - if (ret < 1) - HPWS_ACCEPT_ERROR(202, "timeout waiting for hpws accept child message"); - - // first thing we'll receive is the pid of the client - if (recv(child_fd[0], (unsigned char *)(&pid), sizeof(pid), 0) < sizeof(pid)) - HPWS_ACCEPT_ERROR(212, "did not receive expected 4 byte pid of child process on accept"); - - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] Accept[7] called %d\n", calls); - - // second thing we'll receive is IP address structure of the client - addr_t buf; - int bytes_read = - recv(child_fd[0], (unsigned char *)(&buf), sizeof(buf), 0); - - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] Accept[8] called %d\n", calls); - - if (bytes_read < sizeof(buf)) - HPWS_ACCEPT_ERROR(202, "received message on master control line was not sizeof(sockaddr_in6)"); - - // third thing we will receive is the four fds for the buffers - { - struct msghdr child_msg = {0}; - memset(&child_msg, 0, sizeof(child_msg)); - char cmsgbuf[CMSG_SPACE(sizeof(int) * 4)]; - child_msg.msg_control = cmsgbuf; - child_msg.msg_controllen = sizeof(cmsgbuf); - - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] Accept[9] called %d\n", calls); - - int bytes_read = - recvmsg(child_fd[0], &child_msg, 0); - struct cmsghdr *cmsg = CMSG_FIRSTHDR(&child_msg); - if (cmsg == NULL || cmsg->cmsg_type != SCM_RIGHTS) - HPWS_ACCEPT_ERROR(203, "non-scm_rights message sent on accept child control line"); - memcpy(&buffer_fd, CMSG_DATA(cmsg), sizeof(buffer_fd)); - - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] Accept[10] called %d\n", calls); - - for (int i = 0; i < 4; ++i) - { - //fprintf(stderr, "scm passed buffer_fd[%d] = %d\n", i, buffer_fd[i]); - if (buffer_fd[i] < 0) - HPWS_ACCEPT_ERROR(203, "child accept scm_rights a passed buffer fd was negative"); - mapping[i] = - mmap(0, max_buffer_size_, PROT_READ | PROT_WRITE, MAP_SHARED, buffer_fd[i], 0); - if (mapping[i] == MAP_FAILED) - HPWS_ACCEPT_ERROR(204, "could not mmap scm_rights passed buffer fd"); - } - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] Accept[11] called %d\n", calls); - } - { - struct pollfd pfd; - int ret; - - for (int i = 0; i <= 1; ++i) - { - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] waiting for 'r' on child_fd[%d]=%d accept\n", i, child_fd[i]); - pfd.fd = child_fd[i]; - pfd.events = POLLERR | POLLHUP | POLLNVAL | POLLIN; - - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] Accept[12] called %d\n", calls); - - // now we wait for a 'r' ready message or for the socket/client to die - ret = poll(&pfd, 1, HPWS_LONG_TIMEOUT); // default= 1500 ms timeout - - if (!(pfd.revents & POLLIN)) - HPWS_ACCEPT_ERROR(5, "could not read from client_fd"); - - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] Accept[12a] called %d - ret %d\n", calls, ret); - char rbuf[2]; - bytes_read = recv(child_fd[i], rbuf, sizeof(rbuf), 0); - if (bytes_read < 1) - HPWS_ACCEPT_ERROR(2, "nil message sent by hpws on startup on accept"); - - if (rbuf[0] != 'r') - HPWS_ACCEPT_ERROR(3, "unexpected content in message sent by hpws client mode on startup"); - - if (rbuf[1] != '0' + i) - HPWS_ACCEPT_ERROR(4, "received wrong r message on control line fd"); - - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] 'r%c' received on child_fd[%d]=%d\n", rbuf[1], i, child_fd[i]); - } - - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] Accept[13] called %d\n", calls); - } - - return client{ - "", - buf, - child_fd[0], - child_fd[1], - max_buffer_size_, - (pid_t)pid, - buffer_fd, - mapping}; - } - - ~server() - { - if (!moved) - { - - // RH TODO ensure this pid terminates by following up with a SIGKILL - if (server_pid_ > 0) - { - kill(server_pid_, SIGTERM); - int status; - waitpid(server_pid_, &status, 0 /* should we use WNOHANG? */); - } - - ::close(master_control_fd_); - } - } - - static std::variant create( - std::string_view bin_path, - uint32_t max_buffer_size, - uint16_t port, - uint32_t max_con, - uint16_t max_con_per_ip, - std::string_view cert_path, - std::string_view key_path, - std::vector argv, //additional_arguments - std::function fork_child_init = NULL) - { -#define HPWS_SERVER_ERROR(code, msg) \ - { \ - error_code = code; \ - error_msg = msg; \ - goto server_error; \ - } - - int error_code = -1; - const char *error_msg = NULL; - int fd[2] = {-1, -1}; - pid_t pid = -1; - int count_args = 17 + argv.size(); - char const **argv_pass = NULL; - - if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, fd)) - HPWS_SERVER_ERROR(100, "could not create unix domain socket pair"); - - // construct the arguments - char shm_size[32]; - - if (snprintf(shm_size, 32, "%d", max_buffer_size) <= 0) - HPWS_SERVER_ERROR(90, "couldn't write shm size to string"); - - char port_str[6]; - if (snprintf(port_str, 6, "%d", port) <= 0) - HPWS_SERVER_ERROR(91, "couldn't write port to string"); - - char max_con_str[11]; - if (snprintf(max_con_str, 11, "%d", max_con) <= 0) - HPWS_SERVER_ERROR(92, "couldn't write max_con to string"); - - char max_con_per_ip_str[6]; - if (snprintf(max_con_per_ip_str, 6, "%d", max_con_per_ip) <= 0) - HPWS_SERVER_ERROR(93, "couldn't write max_con_per_ip to string"); - - argv_pass = - reinterpret_cast(alloca(sizeof(char *) * count_args)); - { - int upto = 0; - argv_pass[upto++] = bin_path.data(); - argv_pass[upto++] = "--server"; - argv_pass[upto++] = "--maxmsg"; - argv_pass[upto++] = shm_size; - argv_pass[upto++] = "--port"; - argv_pass[upto++] = port_str; - argv_pass[upto++] = "--cert"; - argv_pass[upto++] = cert_path.data(); - argv_pass[upto++] = "--key"; - argv_pass[upto++] = key_path.data(); - argv_pass[upto++] = "--cntlfd"; - argv_pass[upto++] = "3"; - argv_pass[upto++] = "--maxcon"; - argv_pass[upto++] = max_con_str; - argv_pass[upto++] = "--maxconip"; - argv_pass[upto++] = max_con_per_ip_str; - for (std::string_view &arg : argv) - argv_pass[upto++] = arg.data(); - argv_pass[upto] = NULL; - } - - pid = vfork(); - if (pid) - { - - // --- PARENT - - // Fds are set to -1, so when error occurred these fds won't get closed again. - ::close(fd[1]); - fd[1] = -1; - - int flags = fcntl(fd[0], F_GETFD, NULL); - if (flags < 0) - HPWS_SERVER_ERROR(101, "could not get flags from unix domain socket"); - - flags |= FD_CLOEXEC; - if (fcntl(fd[0], F_SETFD, flags)) - HPWS_SERVER_ERROR(102, "could notset flags for unix domain socket"); - - // we will set a timeout and wait for the initial startup message from hpws server mode - struct pollfd pfd; - int ret; - - pfd.fd = fd[0]; - pfd.events = POLLIN; - ret = poll(&pfd, 1, HPWS_LONG_TIMEOUT); // default= 1500 ms timeout - - // timeout or error - if (ret < 1) - HPWS_SERVER_ERROR(1, "timeout waiting for hpws startup message"); - - char buf[1024]; - int bytes_read = recv(fd[0], buf, sizeof(buf) - 1, 0); - if (bytes_read < 1) - { - int status; - // Wait and obtain exit status code of hpws. - if (waitpid(pid, &status, 0) > 0) - { - switch (WEXITSTATUS(status)) - { - case 70: - HPWS_SERVER_ERROR(31, "Could not create listen socket."); - - case 72: - HPWS_SERVER_ERROR(32, "Could not bind socket for listen."); - - case 74: - HPWS_SERVER_ERROR(33, "Listen() failed."); - - default: - break; - } - } - HPWS_SERVER_ERROR(2, "nil message sent by hpws on startup"); - } - - buf[bytes_read] = '\0'; - if (strncmp(buf, "startup", 7) != 0) - { - fprintf(stderr, "startup message: `%.*s`\n", bytes_read, buf); - HPWS_SERVER_ERROR(3, "unexpected content in message sent by hpws on startup"); - } - return server{ - pid, - fd[0], - max_buffer_size}; - } - else - { - - // --- CHILD - if (fork_child_init) - fork_child_init(); - - ::close(fd[0]); - - // dup fd[1] into fd 3 - dup2(fd[1], 3); - ::close(fd[1]); - - // we're assuming all fds above 3 will have close_exec flag - execv(bin_path.data(), (char *const *)argv_pass); - // we will send a nil message down the pipe to help the parent know somethings gone wrong - char nil[1]; - nil[0] = 0; - send(3, nil, 1, 0); - exit(1); // execl failure as child will always result in exit here - } - - server_error:; - - // NB: execution to here can only happen in parent process - // clean up any mess after error - if (pid > 0) - { - kill(pid, SIGKILL); /* RH TODO change this to SIGTERM and set a timeout? */ - int status; - waitpid(pid, &status, 0 /* should we use WNOHANG? */); - } - if (fd[0] > 0) - ::close(fd[0]); - if (fd[1] > 0) - ::close(fd[1]); - - return error{error_code, std::string{error_msg}}; - } - }; -} // namespace hpws - -#endif diff --git a/src/conf.cpp b/src/conf.cpp index 23225c5..f68e991 100644 --- a/src/conf.cpp +++ b/src/conf.cpp @@ -61,7 +61,6 @@ namespace conf cfg.hp.host_address = host_addr.empty() ? "127.0.0.1" : std::string(host_addr); cfg.hp.init_peer_port = 22861; cfg.hp.init_user_port = 8081; - cfg.server.ip_port = {"127.0.0.1", 5000}; cfg.system.max_instance_count = 5; cfg.system.max_mem_kbytes = 1024000; // Total 1GB RAM @@ -109,11 +108,12 @@ namespace conf // If data dir is not specified, use the same dir as executables. ctx.data_dir = datadir.empty() ? ctx.exe_dir : util::realpath(datadir); - ctx.hpws_exe_path = ctx.exe_dir + "/hpws"; ctx.hpfs_exe_path = ctx.exe_dir + "/hpfs"; ctx.user_install_sh = ctx.exe_dir + "/user-install.sh"; ctx.user_uninstall_sh = ctx.exe_dir + "/user-uninstall.sh"; + ctx.socket_path = ctx.data_dir + "/sa.sock"; + ctx.contract_template_path = ctx.data_dir + "/contract_template"; ctx.config_file = ctx.data_dir + "/sa.cfg"; ctx.log_dir = ctx.data_dir + "/log"; @@ -125,11 +125,10 @@ namespace conf */ int validate_dir_paths() { - const std::string paths[7] = { + const std::string paths[6] = { ctx.config_file, ctx.log_dir, ctx.data_dir, - ctx.hpws_exe_path, ctx.contract_template_path, ctx.user_install_sh, ctx.user_uninstall_sh}; @@ -140,8 +139,6 @@ namespace conf { if (path == ctx.config_file) std::cerr << path << " does not exist. Initialize with command.\n"; - else if (path == ctx.hpws_exe_path) - std::cerr << path << " binary does not exist.\n"; else std::cerr << path << " does not exist.\n"; return -1; @@ -236,35 +233,6 @@ namespace conf } } - // server - { - jpath = "server"; - - try - { - const jsoncons::ojson &server = d["server"]; - - cfg.server.ip_port.host_address = server["host"].as(); - cfg.server.ip_port.port = server["port"].as(); - - if (cfg.server.ip_port.host_address.empty()) - { - std::cerr << "Configured server host_address is empty.\n"; - return -1; - } - else if (cfg.server.ip_port.port <= 0) - { - std::cerr << "Configured server port invalid.\n"; - return -1; - } - } - catch (const std::exception &e) - { - print_missing_field_error(jpath, e); - return -1; - } - } - // system { jpath = "system"; @@ -351,16 +319,6 @@ namespace conf d.insert_or_assign("hp", hp_config); } - // Server configs. - { - jsoncons::ojson server_config; - - server_config.insert_or_assign("host", cfg.server.ip_port.host_address); - server_config.insert_or_assign("port", cfg.server.ip_port.port); - - d.insert_or_assign("server", server_config); - } - // System configs. { jsoncons::ojson system_config; diff --git a/src/conf.hpp b/src/conf.hpp index 7a843de..8ba375d 100644 --- a/src/conf.hpp +++ b/src/conf.hpp @@ -66,11 +66,6 @@ namespace conf size_t max_file_count = 0; // Max no. of log files to keep. }; - struct server_config - { - host_ip_port ip_port; - }; - struct hp_config { std::string host_address; @@ -95,7 +90,6 @@ namespace conf { std::string version; hp_config hp; - server_config server; system_config system; docker_config docker; log_config log; @@ -105,10 +99,11 @@ namespace conf { std::string command; // The CLI command issued to launch Sashimono agent std::string exe_dir; // Sashimono Agent executable dir. - std::string hpws_exe_path; // hpws executable file path. std::string hpfs_exe_path; // hpfs executable file path. std::string contract_template_path; // Path to default contract. + std::string socket_path; // Path to the unix socket file. + std::string user_install_sh; std::string user_uninstall_sh; diff --git a/src/main.cpp b/src/main.cpp index e8527f7..7a6fef2 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -107,9 +107,6 @@ int main(int argc, char **argv) signal(SIGSEGV, &segfault_handler); signal(SIGABRT, &segfault_handler); - // Become a sub-reaper so we can gracefully reap hpws child processes via hpws.hpp. - // (Otherwise they will get reaped by OS init process and we'll end up with race conditions with gracefull kills) - prctl(PR_SET_CHILD_SUBREAPER, 1); // Disable SIGPIPE to avoid crashing on broken pipe IO. { diff --git a/src/msg/json/msg_json.cpp b/src/msg/json/msg_json.cpp index de5da5e..6775898 100644 --- a/src/msg/json/msg_json.cpp +++ b/src/msg/json/msg_json.cpp @@ -77,43 +77,6 @@ namespace msg::json return 0; } - /** - * Extracts type, id and pubkey in the msg. - * @param type Type in the message. - * @param id id in the message. - * @param pubkey Pubkey in the message. - * @param d The json document holding the read request message. - * Accepted signed input container format: - * { - * ... - * "type": "", - * "id": "", - * "owner_pubkey": "", - * ... - * } - * @return 0 on successful extraction. -1 for failure. - */ - int extract_commons(std::string &type, std::string &id, std::string &pubkey, const jsoncons::json &d) - { - if (extract_type_and_id(type, id, d) == -1) - return -1; - - if (!d.contains(msg::FLD_PUBKEY)) - { - LOG_ERROR << "Field owner_pubkey is missing."; - return -1; - } - - if (!d[msg::FLD_PUBKEY].is()) - { - LOG_ERROR << "Invalid owner_pubkey value."; - return -1; - } - - pubkey = d[msg::FLD_PUBKEY].as(); - return 0; - } - /** * Extracts create message from msg. * @param msg Populated msg object. @@ -129,9 +92,16 @@ namespace msg::json */ int extract_create_message(create_msg &msg, const jsoncons::json &d) { - if (extract_commons(msg.type, msg.id, msg.pubkey, d) == -1) + if (extract_type_and_id(msg.type, msg.id, d) == -1) return -1; + + if (!d.contains(msg::FLD_PUBKEY)) + { + LOG_ERROR << "Field owner_pubkey is missing."; + return -1; + } + if (!d.contains(msg::FLD_CONTRACT_ID)) { LOG_ERROR << "Field contract_id is missing."; @@ -144,6 +114,12 @@ namespace msg::json return -1; } + if (!d[msg::FLD_PUBKEY].is()) + { + LOG_ERROR << "Invalid owner_pubkey value."; + return -1; + } + if (!d[msg::FLD_CONTRACT_ID].is()) { LOG_ERROR << "Invalid contract_id value."; @@ -156,6 +132,7 @@ namespace msg::json return -1; } + msg.pubkey = d[msg::FLD_PUBKEY].as(); msg.contract_id = d[msg::FLD_CONTRACT_ID].as(); msg.image = d[msg::FLD_IMAGE].as(); return 0; @@ -181,7 +158,7 @@ namespace msg::json */ int extract_initiate_message(initiate_msg &msg, const jsoncons::json &d) { - if (extract_commons(msg.type, msg.id, msg.pubkey, d) == -1) + if (extract_type_and_id(msg.type, msg.id, d) == -1) return -1; if (!d.contains(msg::FLD_CONTAINER_NAME)) @@ -327,7 +304,7 @@ namespace msg::json */ int extract_destroy_message(destroy_msg &msg, const jsoncons::json &d) { - if (extract_commons(msg.type, msg.id, msg.pubkey, d) == -1) + if (extract_type_and_id(msg.type, msg.id, d) == -1) return -1; if (!d.contains(msg::FLD_CONTAINER_NAME)) @@ -360,7 +337,7 @@ namespace msg::json */ int extract_start_message(start_msg &msg, const jsoncons::json &d) { - if (extract_commons(msg.type, msg.id, msg.pubkey, d) == -1) + if (extract_type_and_id(msg.type, msg.id, d) == -1) return -1; if (!d.contains(msg::FLD_CONTAINER_NAME)) @@ -393,7 +370,7 @@ namespace msg::json */ int extract_stop_message(stop_msg &msg, const jsoncons::json &d) { - if (extract_commons(msg.type, msg.id, msg.pubkey, d) == -1) + if (extract_type_and_id(msg.type, msg.id, d) == -1) return -1; if (!d.contains(msg::FLD_CONTAINER_NAME)) diff --git a/src/msg/json/msg_json.hpp b/src/msg/json/msg_json.hpp index 5a73d43..61bcd6e 100644 --- a/src/msg/json/msg_json.hpp +++ b/src/msg/json/msg_json.hpp @@ -14,8 +14,6 @@ namespace msg::json int extract_type_and_id(std::string &extracted_type, std::string &extracted_id, const jsoncons::json &d); - int extract_commons(std::string &type, std::string &id, std::string &pubkey, const jsoncons::json &d); - int extract_create_message(create_msg &msg, const jsoncons::json &d); int extract_initiate_message(initiate_msg &msg, const jsoncons::json &d); diff --git a/src/msg/msg_common.hpp b/src/msg/msg_common.hpp index 0af6479..ccba83f 100644 --- a/src/msg/msg_common.hpp +++ b/src/msg/msg_common.hpp @@ -21,7 +21,6 @@ namespace msg { std::string id; std::string type; - std::string pubkey; std::string container_name; std::set peers; std::set unl; @@ -35,7 +34,6 @@ namespace msg { std::string id; std::string type; - std::string pubkey; std::string container_name; }; @@ -43,7 +41,6 @@ namespace msg { std::string id; std::string type; - std::string pubkey; std::string container_name; }; @@ -51,7 +48,6 @@ namespace msg { std::string id; std::string type; - std::string pubkey; std::string container_name; }; diff --git a/src/pchheader.hpp b/src/pchheader.hpp index 0a87404..236075a 100644 --- a/src/pchheader.hpp +++ b/src/pchheader.hpp @@ -18,14 +18,17 @@ #include #include #include +#include #include #include +#include #include #include #include #include #include #include +#include #include #include #include