diff --git a/examples/js_client/hp-client-lib.js b/examples/js_client/hp-client-lib.js index 5683b078..6e6a6179 100644 --- a/examples/js_client/hp-client-lib.js +++ b/examples/js_client/hp-client-lib.js @@ -12,6 +12,8 @@ const outputValidationPassThreshold = 0.8; const connectionCheckIntervalMs = 1000; const recentActivityThresholdMs = 3000; + const textEncoder = new TextEncoder(); + const textDecoder = new TextDecoder(); // External dependency references. let WebSocket = null; @@ -430,7 +432,7 @@ // Sign the challenge and send back the response const response = msgHelper.createUserChallengeResponse(m.challenge, serverChallenge, protocol); - ws.send(msgHelper.serializeObject(response)); + wsSend(msgHelper.serializeObject(response)); connectionStatus = 1; return true; @@ -519,9 +521,10 @@ const messageHandler = async (rcvd) => { - const data = (connectionStatus < 2 || protocol == protocols.json) ? - (isBrowser ? await rcvd.data.text() : rcvd.data) : - (isBrowser ? await rcvd.data.arrayBuffer() : rcvd.data); + // Decode the received data buffer. + // In browser, text(json) mode requires the buffer to be "decoded" to text before JSON parsing. + const isTextMode = (connectionStatus < 2 || protocol == protocols.json); + const data = (isBrowser && isTextMode) ? textDecoder.decode(rcvd.data) : rcvd.data; try { m = msgHelper.deserializeMessage(data); @@ -593,6 +596,13 @@ handshakeResolver && handshakeResolver(false); } + const wsSend = (msg) => { + if (isString(msg)) + ws.send(textEncoder.encode(msg)); + else + ws.send(msg); + } + this.isConnected = () => { return connectionStatus == 2; }; @@ -602,6 +612,9 @@ return new Promise(resolve => { ws = isBrowser ? new WebSocket(server) : new WebSocket(server, { rejectUnauthorized: false }); + if (isBrowser) + ws.binaryType = "arraybuffer"; + handshakeResolver = resolve; ws.addEventListener("error", errorHandler); ws.addEventListener("open", openHandler); @@ -633,7 +646,7 @@ // Otherwise simply wait for the previously sent request. if (statResponseResolvers.length == 1) { const msg = msgHelper.createStatusRequest(); - ws.send(msgHelper.serializeObject(msg)); + wsSend(msgHelper.serializeObject(msg)); } return p; } @@ -663,7 +676,7 @@ contractInputResolvers[sigKey] = resolve; }); - ws.send(msgHelper.serializeObject(msg)); + wsSend(msgHelper.serializeObject(msg)); return p; } @@ -673,7 +686,7 @@ return; const msg = msgHelper.createReadRequest(request); - ws.send(msgHelper.serializeObject(msg)); + wsSend(msgHelper.serializeObject(msg)); } } diff --git a/src/comm/hpws.hpp b/src/comm/hpws.hpp index fb9e84dd..b4dd0884 100644 --- a/src/comm/hpws.hpp +++ b/src/comm/hpws.hpp @@ -47,7 +47,7 @@ namespace hpws // used when waiting for messages that should already be on the pipe #define HPWS_SMALL_TIMEOUT 10 // used when waiting for server process to spawn -#define HPWS_LONG_TIMEOUT 2500 +#define HPWS_LONG_TIMEOUT 50 typedef union { @@ -138,11 +138,11 @@ namespace hpws for (int i = 0; i < 4; ++i) { munmap(buffer[i], max_buffer_size); - close(buffer_fd[i]); + ::close(buffer_fd[i]); } - close(control_line_fd[0]); - close(control_line_fd[1]); + ::close(control_line_fd[0]); + ::close(control_line_fd[1]); if (HPWS_DEBUG) fprintf(stderr, "[HPWS.HPP] child destructed pid = %d\n", child_pid); @@ -215,6 +215,21 @@ namespace hpws } } + void close() + { + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] close called\n"); + + // send the control message informing hpws that we wish to close + char buf[1] = {'c'}; + + ::write(control_line_fd[1], buf, 1); + + // wait for the process to end gracefully + int status; + printf("waitpid result: %d\n", waitpid(child_pid, &status, 0)); // add timeout here? + } + std::optional write(std::string_view to_write) { if (HPWS_DEBUG) @@ -371,8 +386,8 @@ namespace hpws // --- PARENT - close(fd[1]); - close(fd[3]); + ::close(fd[1]); + ::close(fd[3]); int child_fd[2] = {fd[0], fd[2]}; @@ -481,8 +496,8 @@ namespace hpws if (fork_child_init) fork_child_init(); - close(fd[0]); - close(fd[2]); + ::close(fd[0]); + ::close(fd[2]); // dup fd[1] into fd 3 /*if (dup2(fd[1], 3) == -1) @@ -490,8 +505,8 @@ namespace hpws if (dup2(fd[3], 4) == -1) perror("dup2 fd[3]"); */ - // close(fd[1]); - // close(fd[3]); + // ::close(fd[1]); + // ::close(fd[3]); // we're assuming all fds above 3 will have close_exec flag execv(bin_path.data(), (char *const *)argv_pass); @@ -515,11 +530,11 @@ namespace hpws for (int i = 0; i < 4; ++i) { if (fd[i] > 0) - close(fd[i]); + ::close(fd[i]); if (mapping[i] != MAP_FAILED && mapping[i] != NULL) munmap(mapping[i], max_buffer_size); if (buffer_fd[i] > -1) - close(buffer_fd[i]); + ::close(buffer_fd[i]); } return error{error_code, std::string{error_msg}}; @@ -547,9 +562,9 @@ namespace hpws if (mapping[i] != MAP_FAILED && mapping[i] != NULL) munmap(mapping[i], max_buffer_size_); if (i < 2 && child_fd[i] > -1) - close(child_fd[i]); + ::close(child_fd[i]); if (buffer_fd[i] > -1) - close(buffer_fd[i]); + ::close(buffer_fd[i]); } if (pid_child > 0) @@ -589,6 +604,13 @@ namespace hpws std::variant accept(const bool no_block = false) { + + static int calls = 0; + ++calls; + + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] Accept[0] called %d\n", calls); + #define HPWS_ACCEPT_ERROR(code, msg) \ { \ accept_cleanup(mapping, child_fd, buffer_fd, pid); \ @@ -601,6 +623,8 @@ namespace hpws // must not use pid_t here since we transfer across IPC channel as a uint32. uint32_t pid = 0; + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] Accept[1] called %d\n", calls); { struct msghdr child_msg = {0}; memset(&child_msg, 0, sizeof(child_msg)); @@ -608,13 +632,16 @@ namespace hpws child_msg.msg_control = cmsgbuf; child_msg.msg_controllen = sizeof(cmsgbuf); + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] Accept[2] called %d\n", calls); + // If no-block is specified, we first check any bytes available on control fd // before attempting to do a blocking a read. if (no_block) { struct pollfd master_pfd; master_pfd.fd = this->master_control_fd_; - master_pfd.events = POLLIN; + master_pfd.events = POLLERR | POLLHUP | POLLNVAL | POLLIN; const int master_poll_result = poll(&master_pfd, 1, HPWS_SMALL_TIMEOUT); if (master_poll_result == -1) // 1 ms timeout @@ -624,6 +651,9 @@ namespace hpws HPWS_ACCEPT_ERROR(199, "no new client available"); } + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] Accept[3] called %d\n", calls); + int bytes_read = recvmsg(this->master_control_fd_, &child_msg, 0); struct cmsghdr *cmsg = CMSG_FIRSTHDR(&child_msg); @@ -635,16 +665,25 @@ namespace hpws if (HPWS_DEBUG) fprintf(stderr, "[HPWS.HPP] On accept received SCM: child_fd[0] = %d, child_fd[1] = %d\n", child_fd[0], child_fd[1]); + + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] Accept[4] called %d\n", calls); } // read info from child control line with a timeout struct pollfd pfd; int ret; + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] Accept[5] called %d\n", calls); + pfd.fd = child_fd[0]; // expect all setup messages on the hpws->hpcore controlfd (0) pfd.events = POLLIN; ret = poll(&pfd, 1, HPWS_SMALL_TIMEOUT); // 1 ms timeout + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] Accept[6] called %d\n", calls); + // timeout or error if (ret < 1) HPWS_ACCEPT_ERROR(202, "timeout waiting for hpws accept child message"); @@ -653,11 +692,17 @@ namespace hpws if (recv(child_fd[0], (unsigned char *)(&pid), sizeof(pid), 0) < sizeof(pid)) HPWS_ACCEPT_ERROR(212, "did not receive expected 4 byte pid of child process on accept"); + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] Accept[7] called %d\n", calls); + // second thing we'll receive is IP address structure of the client addr_t buf; int bytes_read = recv(child_fd[0], (unsigned char *)(&buf), sizeof(buf), 0); + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] Accept[8] called %d\n", calls); + if (bytes_read < sizeof(buf)) HPWS_ACCEPT_ERROR(202, "received message on master control line was not sizeof(sockaddr_in6)"); @@ -669,12 +714,19 @@ namespace hpws child_msg.msg_control = cmsgbuf; child_msg.msg_controllen = sizeof(cmsgbuf); + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] Accept[9] called %d\n", calls); + int bytes_read = recvmsg(child_fd[0], &child_msg, 0); struct cmsghdr *cmsg = CMSG_FIRSTHDR(&child_msg); if (cmsg == NULL || cmsg->cmsg_type != SCM_RIGHTS) HPWS_ACCEPT_ERROR(203, "non-scm_rights message sent on accept child control line"); memcpy(&buffer_fd, CMSG_DATA(cmsg), sizeof(buffer_fd)); + + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] Accept[10] called %d\n", calls); + for (int i = 0; i < 4; ++i) { //fprintf(stderr, "scm passed buffer_fd[%d] = %d\n", i, buffer_fd[i]); @@ -685,6 +737,8 @@ namespace hpws if (mapping[i] == MAP_FAILED) HPWS_ACCEPT_ERROR(204, "could not mmap scm_rights passed buffer fd"); } + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] Accept[11] called %d\n", calls); } { struct pollfd pfd; @@ -695,10 +749,19 @@ namespace hpws if (HPWS_DEBUG) fprintf(stderr, "[HPWS.HPP] waiting for 'r' on child_fd[%d]=%d accept\n", i, child_fd[i]); pfd.fd = child_fd[i]; - pfd.events = POLLIN; + pfd.events = POLLERR | POLLHUP | POLLNVAL | POLLIN; + + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] Accept[12] called %d\n", calls); + // now we wait for a 'r' ready message or for the socket/client to die ret = poll(&pfd, 1, HPWS_LONG_TIMEOUT); // default= 1500 ms timeout + if (!(pfd.revents & POLLIN)) + HPWS_ACCEPT_ERROR(5, "could not read from client_fd"); + + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] Accept[12a] called %d - ret %d\n", calls, ret); char rbuf[2]; bytes_read = recv(child_fd[i], rbuf, sizeof(rbuf), 0); if (bytes_read < 1) @@ -713,6 +776,9 @@ namespace hpws if (HPWS_DEBUG) fprintf(stderr, "[HPWS.HPP] 'r%c' received on child_fd[%d]=%d\n", rbuf[1], i, child_fd[i]); } + + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] Accept[13] called %d\n", calls); } return client{ @@ -739,7 +805,7 @@ namespace hpws waitpid(server_pid_, &status, 0 /* should we use WNOHANG? */); } - close(master_control_fd_); + ::close(master_control_fd_); } } @@ -820,7 +886,7 @@ namespace hpws // --- PARENT - close(fd[1]); + ::close(fd[1]); int flags = fcntl(fd[0], F_GETFD, NULL); if (flags < 0) @@ -886,11 +952,11 @@ namespace hpws if (fork_child_init) fork_child_init(); - close(fd[0]); + ::close(fd[0]); // dup fd[1] into fd 3 dup2(fd[1], 3); - close(fd[1]); + ::close(fd[1]); // we're assuming all fds above 3 will have close_exec flag execv(bin_path.data(), (char *const *)argv_pass); @@ -912,9 +978,9 @@ namespace hpws waitpid(pid, &status, 0 /* should we use WNOHANG? */); } if (fd[0] > 0) - close(fd[0]); + ::close(fd[0]); if (fd[1] > 0) - close(fd[1]); + ::close(fd[1]); return error{error_code, std::string{error_msg}}; } diff --git a/test/bin/hpws b/test/bin/hpws index 4a51c224..6f508cec 100755 Binary files a/test/bin/hpws and b/test/bin/hpws differ diff --git a/test/metrics/hp-client-lib.js b/test/metrics/hp-client-lib.js deleted file mode 100644 index 5683b078..00000000 --- a/test/metrics/hp-client-lib.js +++ /dev/null @@ -1,915 +0,0 @@ -(() => { - - // Whether we are in Browser or NodeJs. - const isBrowser = !(typeof window === 'undefined'); - - // In browser, avoid duplicate initializations. - if (isBrowser && window.HotPocket) - return; - - const supportedHpVersion = "0.0"; - const serverChallengeSize = 16; - const outputValidationPassThreshold = 0.8; - const connectionCheckIntervalMs = 1000; - const recentActivityThresholdMs = 3000; - - // External dependency references. - let WebSocket = null; - let sodium = null; - let bson = null; - let blake3 = null; - let logLevel = 0; // 0=info, 1=error - - /*--- Included in public interface. ---*/ - const protocols = { - json: "json", - bson: "bson" // (Requires nodejs or browserified hp client library on Browser) - } - Object.freeze(protocols); - - /*--- Included in public interface. ---*/ - const events = { - disconnect: "disconnect", - contractOutput: "contractOutput", - contractReadResponse: "contractReadResponse", - connectionChange: "connectionChange", - unlChange: "unlChange" - } - Object.freeze(events); - - /*--- Included in public interface. ---*/ - const generateKeys = async (privateKeyHex = null) => { - - await initSodium(); - - if (!privateKeyHex) { - const keys = sodium.crypto_sign_keypair(); - return { - privateKey: keys.privateKey, - publicKey: keys.publicKey - } - } - else { - const binPrivateKey = hexToUint8Array(privateKeyHex); - return { - privateKey: Uint8Array.from(binPrivateKey), - publicKey: Uint8Array.from(binPrivateKey.slice(32)) - } - } - } - - /*--- Included in public interface. ---*/ - const createClient = async (servers, clientKeys, options) => { - - const defaultOptions = { - contractId: null, - contractVersion: null, - trustedServerKeys: null, - protocol: protocols.json, - requiredConnectionCount: 1, - connectionTimeoutMs: 5000 - }; - const opt = options ? { ...defaultOptions, ...options } : defaultOptions; - - if (!clientKeys) - throw "clientKeys not specified."; - if (opt.contractId == "") - throw "contractId not specified. Specify null to bypass contract id validation."; - if (opt.contractVersion == "") - throw "contractVersion not specified. Specify null to bypass contract version validation."; - if (!opt.protocol || (opt.protocol != protocols.json && opt.protocol != protocols.bson)) - throw "Valid protocol not specified."; - if (!opt.requiredConnectionCount || opt.requiredConnectionCount == 0) - throw "requiredConnectionCount must be greater than 0."; - if (!opt.connectionTimeoutMs || opt.connectionTimeoutMs == 0) - throw "Connection timeout must be greater than 0."; - - await initSodium(); - await initBlake3(); - initWebSocket(); - if (opt.protocol == protocols.bson) - initBson(); - - // Load servers and serverKeys to object keys to avoid duplicates. - - const serversLookup = {}; - servers && servers.forEach(s => { - const url = s.trim(); - if (url.length > 0) - serversLookup[url] = true - }); - if (Object.keys(serversLookup).length == 0) - throw "servers not specified."; - if (opt.requiredConnectionCount > Object.keys(serversLookup).length) - throw "requiredConnectionCount is higher than no. of servers."; - - let trustedKeysLookup = {}; - opt.trustedServerKeys && opt.trustedServerKeys.sort().forEach(k => { - const key = k.trim(); - if (key.length > 0) - trustedKeysLookup[key] = true - }); - if (Object.keys(trustedKeysLookup).length == 0) - trustedKeysLookup = null; - - return new HotPocketClient(opt.contractId, opt.contractVersion, clientKeys, serversLookup, trustedKeysLookup, opt.protocol, opt.requiredConnectionCount, opt.connectionTimeoutMs); - } - - function HotPocketClient(contractId, contractVersion, clientKeys, serversLookup, trustedKeysLookup, protocol, requiredConnectionCount, connectionTimeoutMs) { - - let emitter = new EventEmitter(); - - // The accessor function passed into connections to query latest trusted key list. - // We update the returning key list whenever we get a unl update. - const getTrustedKeys = () => trustedKeysLookup; - - // Whenever unl change is reported, update the trusted key list. - emitter.on(events.unlChange, (unl) => { - trustedKeysLookup = {}; - unl.sort().forEach(pubkey => trustedKeysLookup[pubkey] = true); - }) - - const nodes = Object.keys(serversLookup).map(s => { - return { - server: s, // Server address. - connection: null, // Hot Pocket connection (if any). - lastActivity: 0 // Last connection activity timestamp. - } - }); - - let status = 0; //0:none, 1:connected, 2:closed - - // This will get fired whenever the required connection count gets fullfilled. - let initialConnectSuccess = null; - - // Tracks when was the earliest time that we were missing some required connections. - // 0 indicates we are no missing any connections. - let connectionsMissingFrom = new Date().getTime(); - - // Checks for missing connections and attempts to establish them. - const reviewConnections = () => { - - if (status == 2) - return; - - // Check for connection changes periodically. - setTimeout(() => { - reviewConnections(); - }, connectionCheckIntervalMs); - - // Check whether we have fullfilled all required connections. - if (nodes.filter(n => n.connection && n.connection.isConnected()).length == requiredConnectionCount) { - connectionsMissingFrom = 0; - initialConnectSuccess && initialConnectSuccess(true); - initialConnectSuccess = null; - status = 1; - return; - } - - if (connectionsMissingFrom == 0) { - // Reaching here means we moved from connections-fullfilled state to missing-connections state. - connectionsMissingFrom = new Date().getTime(); - } - else if ((new Date().getTime() - connectionsMissingFrom) > connectionTimeoutMs) { - - // This means we were not able to maintain required connection count for the entire timeout period. - - liblog(1, "Missing-connections timeout reached."); - - // Close and cleanup all connections if we hit the timeout. - this.close().then(() => { - if (initialConnectSuccess) { - initialConnectSuccess(false); - initialConnectSuccess = null; - } - else { - emitter && emitter.emit(events.disconnect); - } - }); - return; - } - - // Reaching here means we should attempt to establish more connections if we have available slots. - let currentConnectionCount = nodes.filter(n => n.connection).length; - if (currentConnectionCount == requiredConnectionCount) - return; - - // Find out available slots. - // Skip nodes that are already connected or is currently establishing connection. - // Skip nodes that have recently shown some connection activity. - // Give priority to nodes that have not shown any activity recently. - const freeNodes = nodes.filter(n => !n.connection && (new Date().getTime() - n.lastActivity) > recentActivityThresholdMs); - freeNodes.sort((a, b) => a.lastActivity - b.lastActivity); // Oldest activity comes first. - - while (currentConnectionCount < requiredConnectionCount && freeNodes.length > 0) { - - // Get the next available node. - const n = freeNodes.shift(); - n.connection = new HotPocketConnection(contractId, contractVersion, clientKeys, n.server, getTrustedKeys, protocol, connectionTimeoutMs, emitter); - n.lastActivity = new Date().getTime(); - - n.connection.connect().then(success => { - if (!success) - n.connection = null; - else - emitter && emitter.emit(events.connectionChange, n.server, "add"); - }); - - n.connection.onClose = () => { - n.connection = null; - emitter && emitter.emit(events.connectionChange, n.server, "remove"); - }; - - currentConnectionCount++; - } - } - - this.connect = () => { - - if (status > 0) - return; - - reviewConnections(); - return new Promise(resolve => { - initialConnectSuccess = resolve; - }) - } - - this.close = async () => { - - if (status == 2) - return; - - status = 2; - emitter.clear(events.connectionChange); - emitter.clear(events.contractOutput); - emitter.clear(events.contractReadResponse); - - // Close all nodes connections. - await Promise.all(nodes.filter(n => n.connection).map(n => n.connection.close())); - nodes.forEach(n => n.connection = null); - } - - this.on = (event, listener) => { - emitter.on(event, listener); - } - - this.clear = (event) => { - emitter.clear(event); - } - - this.sendContractInput = async (input, nonce = null, maxLclOffset = null) => { - if (status == 2) - return; - - return await Promise.all( - nodes.filter(n => n.connection && n.connection.isConnected()) - .map(n => n.connection.sendContractInput(input, nonce, maxLclOffset))); - } - - this.sendContractReadRequest = (request) => { - if (status == 2) - return; - - nodes.filter(n => n.connection && n.connection.isConnected()).forEach(n => { - n.connection.sendContractReadRequest(request); - }); - } - } - - function HotPocketConnection(contractId, contractVersion, clientKeys, server, getTrustedKeys, protocol, connectionTimeoutMs, emitter) { - - // Create message helper with JSON protocol initially. - // After challenge handshake, we will change this to use the protocol specified by user. - const msgHelper = new MessageHelper(clientKeys, protocols.json); - - let connectionStatus = 0; // 0:none, 1:server challenge sent, 2:handshake complete. - let serverChallenge = null; // The hex challenge we have issued to the server. - let reportedContractId = null; - let reportedContractVersion = null; - let pubkey = false; // Pubkey hex (with prefix) of this connection. - - let ws = null; - let handshakeTimer = null; // Timer to track connection handshake timeout. - let handshakeResolver = null; - let closeResolver = null; - let statResponseResolvers = []; - let contractInputResolvers = {}; - - // Calcualtes the blake3 hash of all array items. - const getHash = (arr) => { - const hash = blake3.createHash(); - arr.forEach(item => hash.update(item)); - return new Uint8Array(hash.digest()); - } - - // Get root hash of the given merkle hash tree. (called recursively) - // checkHashString specifies the hash that must be checked for existance. - const getMerkleHash = (tree, checkHashString) => { - - const listToHash = []; // Collects elements to hash. - let checkHashFound = false; - - for (let elem of tree) { - - if (Array.isArray(elem)) { - // If the 'elem' is an array we should find the root hash of the array. - // Call this func recursively. If checkHash already found, pass null. - const result = getMerkleHash(elem, checkHashFound ? null : checkHashString); - if (result[0] == true) - checkHashFound = true; - - listToHash.push(result[1]); - } - else { - // 'elem' is a single hash value. We get the hash bytes depending on the data type. - // (json encoding will use hex string and bson will use buffer) - const hashBytes = isString(elem) ? hexToUint8Array(elem) : elem.buffer; - listToHash.push(hashBytes); - - // If checkHash is specified, compare the hashes. - if (checkHashString && msgHelper.stringifyValue(hashBytes) == checkHashString) - checkHashFound = true; - } - } - - // Return a tuple of whether check hash was found and the root hash of the provided merkle tree. - return [checkHashFound, getHash(listToHash)]; - } - - // Verifies whether the provided root hash has enough signatures from unl. - const validateHashSignatures = (rootHash, signatures, unlKeysLookup) => { - - const totalUnl = Object.keys(unlKeysLookup).length; - if (totalUnl == 0) { - liblog(1, "Cannot validate outputs with empty unl."); - return false; - } - - const passedKeys = {}; - - // 'signatures' is an array of pairs of [pubkey, signature] - for (pair of signatures) { - const pubkeyHex = msgHelper.stringifyValue(pair[0]); // Gets the pubkey hex to use for unl lookup key. - - // Get the signature and issuer pubkey bytes based on the data type. - // (json encoding will use hex string and bson will use buffer) - const pubkey = isString(pair[0]) ? hexToUint8Array(pair[0].substring(2)) : pair[0].buffer.slice(1); // Skip prefix byte. - const sig = isString(pair[1]) ? hexToUint8Array(pair[1]) : pair[1].buffer; - - // Check whether the pubkey is in unl and whether signature is valid. - if (!passedKeys[pubkeyHex] && unlKeysLookup[pubkeyHex] && sodium.crypto_sign_verify_detached(sig, rootHash, pubkey)) - passedKeys[pubkeyHex] = true; - } - - // Check the percentage of unl keys that passed the signature check. - const passed = Object.keys(passedKeys).length; - return ((passed / totalUnl) >= outputValidationPassThreshold); - } - - const validateOutput = (msg, trustedKeys) => { - - // Calculate combined output hash with user's pubkey. - const outputHash = getHash([[0xED], clientKeys.publicKey, ...msgHelper.spreadArrayField(msg.outputs)]); - - const result = getMerkleHash(msg.hashes, msgHelper.stringifyValue(outputHash)); - if (result[0] == true) { - const rootHash = result[1]; - - // Verify the issued signatures against the root hash. - return validateHashSignatures(rootHash, msg.unl_sig, trustedKeys); - } - - return false; - } - - const validateAndEmitUnlChange = (changedUnl) => { - // If this is currently a trusted connection, notify unl update. - const trustedKeys = getTrustedKeys(); - if (trustedKeys && trustedKeys[pubkey]) { - // Prepare sorted new unl lookup object for equality comparison. - const newUnl = {}; - changedUnl.sort().forEach(k => newUnl[k] = true); - - // Only emit unl change event if the unl has really changed. - if (JSON.stringify(trustedKeys) != JSON.stringify(newUnl)) - emitter && emitter.emit(events.unlChange, changedUnl); - } - } - - const handshakeMessageHandler = (m) => { - - if (connectionStatus == 0 && m.type == "user_challenge" && m.hp_version && m.contract_id) { - - if (m.hp_version != supportedHpVersion) { - liblog(1, `Incompatible Hot Pocket server version. Expected:${supportedHpVersion} Got:${m.hp_version}`); - return false; - } - else if (!m.contract_id) { - liblog(1, "Server did not specify contract id."); - return false; - } - else if (contractId && m.contract_id != contractId) { - liblog(1, `Contract id mismatch. Expected:${contractId} Got:${m.contract_id}`); - return false; - } - else if (!m.contract_version) { - liblog(1, "Server did not specify contract version."); - return false; - } - else if (contractVersion && m.contract_version != contractVersion) { - liblog(1, `Contract version mismatch. Expected:${contractVersion} Got:${m.contract_version}`); - return false; - } - - reportedContractId = m.contract_id; - reportedContractVersion = m.contract_version; - - // Generate the challenge we are sending to server. - serverChallenge = uint8ArrayToHex(sodium.randombytes_buf(serverChallengeSize)); - - // Sign the challenge and send back the response - const response = msgHelper.createUserChallengeResponse(m.challenge, serverChallenge, protocol); - ws.send(msgHelper.serializeObject(response)); - - connectionStatus = 1; - return true; - } - else if (connectionStatus == 1 && serverChallenge && m.type == "server_challenge_response" && m.sig && m.pubkey) { - - // Verify server challenge response. - const stringToVerify = serverChallenge + reportedContractId + reportedContractVersion; - const serverPubkeyHex = m.pubkey.substring(2); // Skip 'ed' prefix; - if (!sodium.crypto_sign_verify_detached(hexToUint8Array(m.sig), stringToVerify, hexToUint8Array(serverPubkeyHex))) { - liblog(1, `${server} challenge response verification failed.`); - return false; - } - - clearTimeout(handshakeTimer); // Cancel the handshake timeout monitor. - handshakeTimer = null; - serverChallenge = null; // Clear the sent challenge as we no longer need it. - msgHelper.useProtocol(protocol); // Here onwards, use the message protocol specified by user. - pubkey = m.pubkey; // Set this connection's public key. - connectionStatus = 2; // Handshake complete. - - // If we are still connected, report handshaking as successful. - // (If websocket disconnects, handshakeResolver will be already null) - handshakeResolver && handshakeResolver(true); - liblog(0, `Connected to ${server}`); - - validateAndEmitUnlChange(m.unl); - - return true; - } - - liblog(1, `${server} invalid message during handshake. Connection status:${connectionStatus}`); - liblog(0, m); - return false; - } - - const contractMessageHandler = (m) => { - - if (m.type == "contract_read_response") { - emitter && emitter.emit(events.contractReadResponse, msgHelper.deserializeOutput(m.content)); - } - else if (m.type == "contract_input_status") { - const sigKey = msgHelper.stringifyValue(m.input_sig); - const resolver = contractInputResolvers[sigKey]; - if (resolver) { - if (m.status == "accepted") - resolver("ok"); - else - resolver(m.reason); - delete contractInputResolvers[sigKey]; - } - } - else if (m.type == "contract_output") { - if (emitter) { - // Validate outputs if trusted keys is not null. (null means bypass validation) - const trustedKeys = getTrustedKeys(); - if (!trustedKeys || validateOutput(m, trustedKeys)) - m.outputs.forEach(output => emitter.emit(events.contractOutput, msgHelper.deserializeOutput(output))); - else - liblog(1, "Output validation failed."); - } - } - else if (m.type == "stat_response") { - statResponseResolvers.forEach(resolver => { - resolver({ - lcl: m.lcl, - lclSeqNo: m.lcl_seqno - }); - }) - statResponseResolvers = []; - } - else if (m.type == "unl_change") { - if (m.unl) { - // Convert unl pubkeys to hex string. - let unl = m.unl.map(k => msgHelper.stringifyValue(k)); - validateAndEmitUnlChange(unl); - } - } - else { - liblog(1, "Received unrecognized contract message: type:" + m.type); - return false; - } - - return true; - } - - const messageHandler = async (rcvd) => { - - const data = (connectionStatus < 2 || protocol == protocols.json) ? - (isBrowser ? await rcvd.data.text() : rcvd.data) : - (isBrowser ? await rcvd.data.arrayBuffer() : rcvd.data); - - try { - m = msgHelper.deserializeMessage(data); - } - catch (e) { - liblog(1, e); - liblog(0, "Exception deserializing: "); - liblog(0, data || rcvd); - - // If we get invalid message during handshake, close the socket. - if (connectionStatus < 2) - this.close(); - - return; - } - - let isValid = false; - if (connectionStatus < 2) - isValid = handshakeMessageHandler(m); - else if (connectionStatus == 2) - isValid = contractMessageHandler(m); - - if (!isValid) { - // If we get invalid message during handshake, close the socket. - if (connectionStatus < 2) - this.close(); - } - } - - const openHandler = () => { - ws.addEventListener("message", messageHandler); - ws.addEventListener("close", closeHandler); - - handshakeTimer = setTimeout(() => { - // If handshake does not complete within timeout, close the connection. - this.close(); - handshakeTimer = null; - }, connectionTimeoutMs); - } - - const closeHandler = () => { - - if (closeResolver) - liblog(0, "Closing connection to " + server); - else - liblog(0, "Disconnected from " + server); - - emitter = null; - - if (handshakeTimer) - clearTimeout(handshakeTimer); - - // If there are any ongoing resolvers resolve them with error output. - - handshakeResolver && handshakeResolver(false); - handshakeResolver = null; - - statResponseResolvers.forEach(resolver => resolver(null)); - statResponseResolvers = []; - - Object.values(contractInputResolvers).forEach(resolver => resolver(null)); - contractInputResolvers = {}; - - this.onClose && this.onClose(); - closeResolver && closeResolver(); - } - - const errorHandler = (e) => { - handshakeResolver && handshakeResolver(false); - } - - this.isConnected = () => { - return connectionStatus == 2; - }; - - this.connect = () => { - liblog(0, "Connecting to " + server); - return new Promise(resolve => { - - ws = isBrowser ? new WebSocket(server) : new WebSocket(server, { rejectUnauthorized: false }); - handshakeResolver = resolve; - ws.addEventListener("error", errorHandler); - ws.addEventListener("open", openHandler); - }); - } - - this.close = () => { - if (ws.readyState == WebSocket.OPEN) { - return new Promise(resolve => { - closeResolver = resolve; - ws.close(); - }); - } - else { - return Promise.resolve(); - } - } - - this.getStatus = () => { - - if (connectionStatus != 2) - return Promise.resolve(null); - - const p = new Promise(resolve => { - statResponseResolvers.push(resolve); - }); - - // If this is the only awaiting stat request, then send an actual stat request. - // Otherwise simply wait for the previously sent request. - if (statResponseResolvers.length == 1) { - const msg = msgHelper.createStatusRequest(); - ws.send(msgHelper.serializeObject(msg)); - } - return p; - } - - this.sendContractInput = async (input, nonce = null, maxLclOffset = null) => { - - if (connectionStatus != 2) - return null; - - if (!maxLclOffset) - maxLclOffset = 10; - - if (!nonce) - nonce = (new Date()).getTime().toString(); - else - nonce = nonce.toString(); - - // Acquire the current lcl and add the specified offset. - const stat = await this.getStatus(); - if (!stat) - return new Promise(resolve => resolve("ledger_status_error")); - const maxLclSeqNo = stat.lclSeqNo + maxLclOffset; - - const msg = msgHelper.createContractInput(input, nonce, maxLclSeqNo); - const sigKey = msgHelper.stringifyValue(msg.sig); - const p = new Promise(resolve => { - contractInputResolvers[sigKey] = resolve; - }); - - ws.send(msgHelper.serializeObject(msg)); - return p; - } - - this.sendContractReadRequest = (request) => { - - if (connectionStatus != 2) - return; - - const msg = msgHelper.createReadRequest(request); - ws.send(msgHelper.serializeObject(msg)); - } - } - - function MessageHelper(keys, protocol) { - - this.useProtocol = (p) => { - protocol = p; - } - - this.binaryEncode = (data) => { - return protocol == protocols.json ? - uint8ArrayToHex(data) : - (Buffer.isBuffer(data) ? data : Buffer.from(data)); - } - - this.serializeObject = (obj) => { - return protocol == protocols.json ? JSON.stringify(obj) : bson.serialize(obj); - } - - this.deserializeMessage = (m) => { - return protocol == protocols.json ? JSON.parse(m) : bson.deserialize(m); - } - - this.serializeInput = (input) => { - return protocol == protocols.json ? - (isString(input) ? input : input.toString()) : - (Buffer.isBuffer(input) ? input : Buffer.from(input)); - } - - this.deserializeOutput = (content) => { - return protocol == protocols.json ? content : content.buffer; - } - - // Used for generating strings to hold values as js object keys. - this.stringifyValue = (val) => { - if (isString(val)) - return val; - else if (val instanceof Uint8Array) - return uint8ArrayToHex(val); - else if (val.buffer) // BSON binary. - return uint8ArrayToHex(new Uint8Array(val.buffer)); - else - throw "Cannot stringify signature."; - } - - // Spreads hex/binary item array. - this.spreadArrayField = (outputs) => { - return protocol == protocols.json ? outputs : outputs.map(o => o.buffer); - } - - this.createUserChallengeResponse = (userChallenge, serverChallenge, msgProtocol) => { - // For challenge response encoding Hot Pocket always uses json. - // Challenge response will specify the protocol to use for contract messages. - const sigBytes = sodium.crypto_sign_detached(userChallenge, keys.privateKey); - - return { - type: "user_challenge_response", - sig: this.binaryEncode(sigBytes), - pubkey: "ed" + this.binaryEncode(keys.publicKey), - server_challenge: serverChallenge, - protocol: msgProtocol - } - } - - this.createContractInput = (input, nonce, maxLclSeqNo) => { - - if (input.length == 0) - return null; - - const inpContainer = { - input: this.serializeInput(input), - nonce: nonce, - max_lcl_seqno: maxLclSeqNo - } - - const serlializedInpContainer = this.serializeObject(inpContainer); - const sigBytes = sodium.crypto_sign_detached(serlializedInpContainer, keys.privateKey); - - const signedInpContainer = { - type: "contract_input", - input_container: serlializedInpContainer, - sig: this.binaryEncode(sigBytes) - } - - return signedInpContainer; - } - - this.createReadRequest = (request) => { - if (request.length == 0) - return null; - - return { - type: "contract_read_request", - content: this.serializeInput(request) - } - } - - this.createStatusRequest = () => { - return { type: "stat" }; - } - } - - function hexToUint8Array(hexString) { - return new Uint8Array(hexString.match(/.{1,2}/g).map(byte => parseInt(byte, 16))); - } - - function uint8ArrayToHex(bytes) { - return bytes.reduce((str, byte) => str + byte.toString(16).padStart(2, "0"), ""); - } - - function isString(obj) { - return (typeof obj === "string" || obj instanceof String); - } - - function EventEmitter() { - const registrations = {}; - - this.on = (eventName, listener) => { - if (!registrations[eventName]) - registrations[eventName] = []; - registrations[eventName].push(listener); - } - - this.emit = (eventName, ...value) => { - if (registrations[eventName]) - registrations[eventName].forEach(listener => listener(...value)); - } - - this.clear = (eventName) => { - if (eventName) - delete registrations[eventName] - else - Object.keys(registrations).forEach(k => delete registrations[k]); - } - } - - // Set sodium reference. - async function initSodium() { - - if (isBrowser) { // Browser - if (!sodium) { - sodium = window.sodium || await new Promise(resolve => { - window.sodium = { - onload: async (sodiumRef) => resolve(sodiumRef) - } - }) - } - } - else { // nodejs - if (!sodium) - sodium = require('libsodium-wrappers'); - await sodium.ready; - } - - if (!sodium) - throw "Sodium reference not found. Please include sodium js lib in browser scripts."; - } - - // Set bson reference. - function initBson() { - if (bson) // If already set, do nothing. - return; - else if (isBrowser && window.BSON) // browser - bson = window.BSON; - else if (!isBrowser) // nodejs - bson = require('bson'); - - if (!bson) - throw "BSON reference not found."; - } - - // Set WebSocket reference. - function initWebSocket() { - if (WebSocket) // If already set, do nothing. - return; - else if (isBrowser && window.WebSocket) // browser - WebSocket = window.WebSocket; - else if (!isBrowser) // nodejs - WebSocket = require('ws'); - - if (!WebSocket) - throw "WebSocket reference not found."; - } - - let blake3Resolver = null; - // Set blake3 reference. - async function initBlake3() { - if (blake3) // If already set, do nothing. - return; - else if (isBrowser && window.blake3) // browser (if blake3 already loaded) - blake3 = window.blake3; - else if (isBrowser && !window.blake3) // If blake3 not yet loaded in browser, wait for it. - blake3 = await new Promise(resolve => blake3Resolver = resolve); - else if (!isBrowser) // nodejs - blake3 = require('blake3'); - - if (!blake3) - throw "Blake3 reference not found."; - } - - function setBlake3(blake3ref) { - if (blake3Resolver) { - blake3Resolver(blake3ref) - blake3Resolver = null; - } - else { - blake3 = blake3ref; - } - } - - function setLogLevel(level) { - logLevel = level; - } - - function liblog(level, msg) { - if (level >= logLevel) - console.log(msg); - } - - if (isBrowser) { - window.HotPocket = { - generateKeys, - createClient, - events, - protocols, - setBlake3, - setLogLevel - }; - } - else { - module.exports = { - generateKeys, - createClient, - events, - protocols, - setLogLevel - }; - } -})(); \ No newline at end of file diff --git a/test/metrics/metrics.js b/test/metrics/metrics.js index 863b4456..a2273d47 100644 --- a/test/metrics/metrics.js +++ b/test/metrics/metrics.js @@ -1,7 +1,7 @@ // HotPocket test client to collect metrics. // This assumes the HotPocket server we are connecting to is hosting the echo contract. -const HotPocket = require('./hp-client-lib'); +const HotPocket = require('../../examples/js_client/hp-client-lib'); let server = 'wss://localhost:8080'; if (process.argv.length == 3) server = 'wss://localhost:' + process.argv[2]; @@ -13,10 +13,11 @@ async function main() { HotPocket.setLogLevel(1); const tests = { + "Large payload": largePayload, "Single user read requests": singleUserReadRequests, "Single user Input/Output": singleUserInputOutput, "Multi user read requests": multiUserReadRequests, - "Multi user Input/Output": multiUserInputOutput + "Multi user Input/Output": multiUserInputOutput, }; for (const test in tests) { @@ -31,6 +32,8 @@ async function main() { console.log(duration + "ms"); } + + console.log("Done."); } async function createClient() { @@ -115,4 +118,20 @@ function multiUserInputOutput() { return Promise.all(tasks); } +function largePayload() { + return new Promise(async (resolve) => { + + const payload = "A".repeat(2 * 1024 * 1024); + + const hpc = await createClient(); + hpc.on(HotPocket.events.contractOutput, (response) => { + if (response.length < payload.length) + console.log("Payload length mismatch."); + hpc.close().then(() => resolve()); + }); + + await hpc.sendContractInput(payload); + }) +} + main(); \ No newline at end of file