Read request configuration options. (#236)

* Ignores read requests when contract execution disabled.
* Added concurrent_read_reqeuests config.
* Improved metric test script.
This commit is contained in:
Ravin Perera
2021-02-05 14:54:42 +05:30
committed by GitHub
parent a4399544b9
commit e1b1382599
6 changed files with 107 additions and 58 deletions

View File

@@ -400,6 +400,7 @@ namespace conf
cfg.user.max_bytes_per_min = user["max_bytes_per_min"].as<uint64_t>();
cfg.user.max_bad_msgs_per_min = user["max_bad_msgs_per_min"].as<uint64_t>();
cfg.user.idle_timeout = user["idle_timeout"].as<uint16_t>();
cfg.user.concurrent_read_reqeuests = user["concurrent_read_reqeuests"].as<uint64_t>();
cfg.user.enabled = user["enabled"].as<bool>();
}
catch (const std::exception &e)
@@ -501,6 +502,7 @@ namespace conf
user_config.insert_or_assign("max_connections", cfg.user.max_connections);
user_config.insert_or_assign("max_in_connections_per_host", cfg.user.max_in_connections_per_host);
user_config.insert_or_assign("enabled", cfg.user.enabled);
user_config.insert_or_assign("concurrent_read_reqeuests", cfg.user.concurrent_read_reqeuests);
d.insert_or_assign("user", user_config);
}
@@ -556,6 +558,13 @@ namespace conf
return -1;
}
// User settings
if (cfg.user.concurrent_read_reqeuests > CONCURRENT_READ_REQUEST_MAX_LIMIT)
{
std::cerr << "User concurrent_read_reqeuests cannot exceed " << CONCURRENT_READ_REQUEST_MAX_LIMIT << "\n";
return -1;
}
// Log settings
const std::unordered_set<std::string> valid_loglevels({"dbg", "inf", "wrn", "err"});
if (valid_loglevels.count(cfg.log.loglevel) != 1)

View File

@@ -9,6 +9,8 @@
*/
namespace conf
{
constexpr size_t CONCURRENT_READ_REQUEST_MAX_LIMIT = 32;
// Struct to represent ip and port of the peer.
struct ip_port_prop
{
@@ -109,6 +111,7 @@ namespace conf
uint64_t max_bad_msgs_per_min = 0; // User bad messages per minute
uint16_t max_connections = 0; // Max inbound user connections
uint16_t max_in_connections_per_host = 0; // Max inbound user connections per remote host (IP).
uint64_t concurrent_read_reqeuests = 10; // Supported concurrent read requests count.
bool enabled = true; // User connections enable/disable.
};

View File

@@ -14,7 +14,6 @@ namespace read_req
{
constexpr uint16_t LOOP_WAIT = 100; // Milliseconds.
constexpr uint16_t MAX_QUEUE_SIZE = 64; // Maximum read request queue size, The size passed is rounded up to the next multiple of the block size (32).
constexpr uint16_t MAX_THREAD_CAP = 5; // Maximum number of read request processing threads.
bool is_shutting_down = false;
bool init_success = false;
@@ -22,7 +21,7 @@ namespace read_req
util::buffer_store read_req_store;
std::thread thread_pool_executor; // Thread which spawns new threads for the read requests is the queue.
std::vector<std::thread> read_req_threads;
moodycamel::ConcurrentQueue<user_read_req> read_req_queue(MAX_QUEUE_SIZE, 0, MAX_THREAD_CAP);
moodycamel::ConcurrentQueue<user_read_req> read_req_queue(MAX_QUEUE_SIZE, 0, conf::CONCURRENT_READ_REQUEST_MAX_LIMIT);
std::mutex execution_contexts_mutex;
std::list<sc::execution_context> execution_contexts;
std::mutex completed_threads_mutex;
@@ -91,7 +90,7 @@ namespace read_req
}
}
if (read_req_queue.size_approx() != 0 && read_req_threads.size() <= MAX_THREAD_CAP)
if (read_req_queue.size_approx() != 0 && read_req_threads.size() <= conf::cfg.user.concurrent_read_reqeuests)
{
read_req_threads.push_back(std::thread(read_request_processor));
if (read_req_queue.size_approx() == 1)

View File

@@ -144,6 +144,10 @@ namespace usr
if (msg_type == msg::usrmsg::MSGTYPE_CONTRACT_READ_REQUEST)
{
// Ignore the request if contract execution is disabled or read requests disallowed.
if (!conf::cfg.contract.execute || conf::cfg.user.concurrent_read_reqeuests == 0)
return 0;
std::string content;
if (parser.extract_read_request(content) != -1)
{

View File

@@ -84,36 +84,39 @@ do
# Update config.
contract_json=$(node -p "JSON.stringify({...require('./tmp.json').contract,
id: '3c349abe-4d70-4f50-9fa6-018f1f2530ab', \
bin_path: '$binary', \
bin_args: '$binargs', \
roundtime: $roundtime, \
consensus: 'public', \
npl: 'public', \
appbill: { \
mode: '', \
bin_args: '' \
},}, null, 2)")
id: '3c349abe-4d70-4f50-9fa6-018f1f2530ab', \
bin_path: '$binary', \
bin_args: '$binargs', \
roundtime: $roundtime, \
consensus: 'public', \
npl: 'public', \
appbill: { \
mode: '', \
bin_args: '' \
}
}, null, 2)")
mesh_json=$(node -p "JSON.stringify({...require('./tmp.json').mesh, \
port:${peerport}, \
peer_discovery: { \
enabled: true, \
interval: 10000 \
}
port:${peerport}, \
peer_discovery: { \
enabled: true, \
interval: 10000 \
}
}, null, 2)")
user_json=$(node -p "JSON.stringify({...require('./tmp.json').user, \
port:${pubport}
}, null, 2)")
log_json=$(node -p "JSON.stringify({...require('./tmp.json').log, \
loglevel: '$loglevel', \
loggers:['console', 'file'] \
}, null, 2)")
user_json=$(node -p "JSON.stringify({...require('./tmp.json').user, port:${pubport}}, null, 2)")
node -p "JSON.stringify({...require('./tmp.json'), \
contract: ${contract_json},\
mesh: ${mesh_json},\
user: ${user_json}, \
log: {\
loglevel: '$loglevel', \
max_mbytes_per_file: 10, \
max_file_count": 50, \
loggers:['console', 'file'] \
}\
contract: ${contract_json},\
mesh: ${mesh_json},\
user: ${user_json}, \
log: ${log_json}, \
}, null, 2)" > hp.cfg
rm tmp.json

View File

@@ -13,23 +13,32 @@ async function main() {
HotPocket.setLogLevel(1);
const tests = {
"Large payload": largePayload,
"Single user read requests": singleUserReadRequests,
"Single user Input/Output": singleUserInputOutput,
"Multi user read requests": multiUserReadRequests,
"Multi user Input/Output": multiUserInputOutput,
"Large payload": () => largePayload(2),
"Single user read requests": () => singleUserReadRequests(10, 10),
"Single user Input/Output": () => singleUserInputOutput(10, 10),
"Multi user read requests": () => multiUserReadRequests(10, 10, 10),
"Multi user Input/Output": () => multiUserInputOutput(10, 10, 10),
};
// Execute all tests.
for (const test in tests) {
console.log(test + "...");
const start = new Date();
await tests[test]();
// The test will return single or multiple tuples of time periods.
// Multiple tuples mean that the test had multiple sub-tests inside it.
// Each tuple indicates [start time, end time] of a particular atomic test.
const result = await tests[test]();
const end = new Date();
const duration = end.getTime() - start.getTime();
// If the result is a single period tuple, put them in a parent array.
const runPeriods = Array.isArray(result[0]) ? result : [result];
// Duration is calculated as the duration between earliest start time and latest end time.
const startTimes = runPeriods.map(p => p[0]);
const endTimes = runPeriods.map(p => p[1]);
const minStartTime = Math.min.apply(null, startTimes);
const maxEndTime = Math.max.apply(null, endTimes);
const duration = maxEndTime - minStartTime;
console.log(duration + "ms");
}
@@ -53,42 +62,50 @@ async function createClient() {
return hpc;
}
function singleUserReadRequests() {
function singleUserReadRequests(payloadKB, requestCount) {
return new Promise(async resolve => {
const payload = "A".repeat(10 * 1024);
const requestCount = 10;
const payload = "A".repeat(payloadKB * 1024);
let respCount = 0;
const hpc = await createClient();
const timer = new Timer();
hpc.on(HotPocket.events.contractReadResponse, (response) => {
respCount++;
if (respCount == requestCount)
hpc.close().then(() => resolve());
if (respCount == requestCount) {
const runPeriod = timer.stop();
hpc.close().then(() => resolve(runPeriod));
}
});
timer.start();
for (let i = 0; i < requestCount; i++) {
hpc.sendContractReadRequest(payload);
}
})
}
function singleUserInputOutput() {
function singleUserInputOutput(payloadKB, requestCount) {
return new Promise(async (resolve) => {
const payload = "A".repeat(10 * 1024);
const requestCount = 10;
const payload = "A".repeat(payloadKB * 1024);
let respCount = 0;
const hpc = await createClient();
const timer = new Timer();
hpc.on(HotPocket.events.contractOutput, (response) => {
respCount++;
if (respCount == requestCount)
hpc.close().then(() => resolve());
if (respCount == requestCount) {
const runPeriod = timer.stop();
hpc.close().then(() => resolve(runPeriod));
}
});
timer.start();
for (let i = 0; i < requestCount; i++) {
const nonce = i.toString().padStart(5);
await hpc.sendContractInput(payload, nonce, 20);
@@ -96,42 +113,56 @@ function singleUserInputOutput() {
})
}
function multiUserReadRequests() {
function multiUserReadRequests(payloadKB, requestCountPerUser, userCount) {
const userCount = 10;
const tasks = [];
for (let i = 0; i < userCount; i++) {
tasks.push(singleUserReadRequests());
tasks.push(singleUserReadRequests(payloadKB, requestCountPerUser));
}
return Promise.all(tasks);
}
function multiUserInputOutput() {
function multiUserInputOutput(payloadKB, requestCountPerUser, userCount) {
const userCount = 10;
const tasks = [];
for (let i = 0; i < userCount; i++) {
tasks.push(singleUserInputOutput());
tasks.push(singleUserInputOutput(payloadKB, requestCountPerUser));
}
return Promise.all(tasks);
}
function largePayload() {
function largePayload(payloadMB) {
return new Promise(async (resolve) => {
const payload = "A".repeat(2 * 1024 * 1024);
const payload = "A".repeat(payloadMB * 1024 * 1024);
const hpc = await createClient();
const timer = new Timer();
hpc.on(HotPocket.events.contractOutput, (response) => {
if (response.length < payload.length)
console.log("Payload length mismatch.");
hpc.close().then(() => resolve());
const runPeriod = timer.stop();
hpc.close().then(() => resolve(runPeriod));
});
timer.start();
await hpc.sendContractInput(payload);
})
}
function Timer() {
let startedOn = null;
this.start = () => {
startedOn = new Date().getTime();
}
this.stop = () => {
const endedOn = new Date().getTime();
return [startedOn, endedOn];
}
}
main();