Compare commits

..

2 Commits

Author SHA1 Message Date
Denis Angell
80b4c941f0 update headers 2024-07-04 10:29:43 +02:00
Denis Angell
7b5ed1c7c4 rough draft 2024-06-06 12:38:20 +02:00
20 changed files with 309 additions and 1227 deletions

View File

@@ -542,7 +542,6 @@ target_sources (rippled PRIVATE
src/ripple/nodestore/backend/NuDBFactory.cpp
src/ripple/nodestore/backend/NullFactory.cpp
src/ripple/nodestore/backend/RocksDBFactory.cpp
src/ripple/nodestore/backend/SnugDBFactory.cpp
src/ripple/nodestore/impl/BatchWriter.cpp
src/ripple/nodestore/impl/Database.cpp
src/ripple/nodestore/impl/DatabaseNodeImp.cpp
@@ -589,6 +588,7 @@ target_sources (rippled PRIVATE
src/ripple/resource/impl/Consumer.cpp
src/ripple/resource/impl/Fees.cpp
src/ripple/resource/impl/ResourceManager.cpp
src/ripple/resource/impl/Tuning.cpp
#[===============================[
main sources:
subdir: rpc

View File

@@ -9,15 +9,6 @@ echo "-- GITHUB_RUN_NUMBER: $4"
umask 0000;
echo "Fixing CentOS 7 EOL"
sed -i 's/mirrorlist/#mirrorlist/g' /etc/yum.repos.d/CentOS-*
sed -i 's|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g' /etc/yum.repos.d/CentOS-*
yum clean all
yum-config-manager --disable centos-sclo-sclo
####
cd /io;
mkdir src/certs;
curl --silent -k https://raw.githubusercontent.com/RichardAH/rippled-release-builder/main/ca-bundle/certbundle.h -o src/certs/certbundle.h;

View File

@@ -23,7 +23,9 @@
#
# 9. Misc Settings
#
# 10. Example Settings
# 10. Resource Settings
#
# 11. Example Settings
#
#-------------------------------------------------------------------------------
#
@@ -1565,7 +1567,41 @@
#
#-------------------------------------------------------------------------------
#
# 10. Example Settings
# 10. Resource Settings
#
#--------------------
# [resource]
#
# A set of key/value pair parameters to tune the performance of the
# transaction queue.
#
# warning_threshold = <number>
#
# Lorem Epsium....
#
# drop_threshold = <number>
#
# Lorem Epsium....
#
# decay_window_seconds = <number>
#
# Lorem Epsium....
#
# minimum_gossip_balance = <number>
#
# Lorem Epsium....
#
# seconds_until_expiration = <number>
#
# Lorem Epsium....
#
# gossip_expiration_seconds = <number>
#
# Lorem Epsium....
#
#
#
# 11. Example Settings
#
#--------------------
#

View File

@@ -352,6 +352,7 @@ public:
, validatorKeys_(*config_, m_journal)
, m_resourceManager(Resource::make_Manager(
config_->section("resource"),
m_collectorManager->collector(),
logs_->journal("Resource")))

View File

@@ -101,6 +101,7 @@ struct ConfigSection
#define SECTION_SWEEP_INTERVAL "sweep_interval"
#define SECTION_NETWORK_ID "network_id"
#define SECTION_IMPORT_VL_KEYS "import_vl_keys"
#define SECTION_RESOURCE "resource"
} // namespace ripple

View File

@@ -127,7 +127,7 @@ sizedItems
{SizedItem::txnDBCache, {{ 4, 12, 24, 64, 128 }}},
{SizedItem::lgrDBCache, {{ 4, 8, 16, 32, 128 }}},
{SizedItem::openFinalLimit, {{ 8, 16, 32, 64, 128 }}},
{SizedItem::burstSize, {{ 4, 8, 16, 32, 64*1024*1024 }}},
{SizedItem::burstSize, {{ 4, 8, 16, 32, 48 }}},
{SizedItem::ramSizeGB, {{ 8, 12, 16, 24, 32 }}},
{SizedItem::accountIdCacheSize, {{ 20047, 50053, 77081, 150061, 300007 }}}
}};

View File

@@ -1,336 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <ripple/basics/contract.h>
#include <ripple/nodestore/Factory.h>
#include <ripple/nodestore/Manager.h>
#include <ripple/nodestore/impl/DecodedBlob.h>
#include <ripple/nodestore/impl/EncodedBlob.h>
#include <ripple/nodestore/impl/codec.h>
#include <boost/filesystem.hpp>
#include "snug.hpp"
#include <cassert>
#include <chrono>
#include <cstdint>
#include <cstdio>
#include <exception>
#include <memory>
namespace ripple {
namespace NodeStore {
class SnugDBBackend : public Backend
{
private:
static constexpr uint64_t BUFFER_SIZE =
256ULL * 1024ULL * 1024ULL; // 256 Mib read buffer per thread
public:
beast::Journal const j_;
std::string const name_;
std::unique_ptr<snug::SnugDB> db_;
Scheduler& scheduler_;
SnugDBBackend(
Section const& keyValues,
Scheduler& scheduler,
beast::Journal journal)
: j_(journal), name_(get(keyValues, "path")), scheduler_(scheduler)
{
if (name_.empty())
throw std::runtime_error(
"nodestore: Missing path in SnugDB backend");
}
~SnugDBBackend() override
{
try
{
// close can throw and we don't want the destructor to throw.
db_ = nullptr;
}
catch (std::exception const& e)
{
JLOG(j_.warn()) << "SnugDB threw on destruction: " << e.what();
// Don't allow exceptions to propagate out of destructors.
}
}
std::string
getName() override
{
return name_;
}
void
open(bool createIfMissing, uint64_t appType, uint64_t uid, uint64_t salt)
override
{
if (db_)
{
assert(false);
JLOG(j_.error()) << "database is already open";
return;
}
std::string path = name_ + "/" + std::to_string(uid) + "-" +
std::to_string(appType) + "-" + std::to_string(salt);
boost::filesystem::create_directories(path);
db_ = std::make_unique<snug::SnugDB>(path);
}
bool
isOpen() override
{
return db_ != nullptr;
}
void
open(bool createIfMissing) override
{
open(createIfMissing, 0, 0, 0);
}
void
close() override
{
db_ = nullptr;
}
Status
fetch(void const* key, std::shared_ptr<NodeObject>* pno) override
{
if (!db_)
return backendError;
pno->reset();
static thread_local std::unique_ptr<uint8_t[]> thread_buffer =
std::make_unique<uint8_t[]>(BUFFER_SIZE);
uint8_t* ptr = &(thread_buffer[0]);
uint64_t len = BUFFER_SIZE;
int result = db_->read_entry(
static_cast<uint8_t*>(const_cast<void*>(key)), ptr, &len);
if (0)
{
std::stringstream ss;
const unsigned char* bytes = static_cast<const unsigned char*>(key);
for (int i = 0; i < 32; ++i)
{
ss << std::setfill('0') << std::setw(2) << std::hex
<< static_cast<int>(bytes[i]);
}
std::string key_hex = ss.str();
// Print the result using printf
printf(
"snug fetch: len=%zu result=%zu key=%s\n",
len,
result,
key_hex.c_str());
}
if (result == 1)
return notFound;
if (result == 0)
{
DecodedBlob decoded(key, ptr, len);
if (!decoded.wasOk())
return dataCorrupt;
*pno = decoded.createObject();
return ok;
}
return backendError;
}
std::pair<std::vector<std::shared_ptr<NodeObject>>, Status>
fetchBatch(std::vector<uint256 const*> const& hashes) override
{
std::vector<std::shared_ptr<NodeObject>> results;
results.reserve(hashes.size());
for (auto const& h : hashes)
{
std::shared_ptr<NodeObject> nObj;
Status status = fetch(h->begin(), &nObj);
if (status != ok)
results.push_back({});
else
results.push_back(nObj);
}
return {results, ok};
}
void
do_insert(std::shared_ptr<NodeObject> const& no)
{
EncodedBlob e(no);
if (0)
{
std::stringstream ss;
const unsigned char* bytes = static_cast<const unsigned char*>(
const_cast<void*>(e.getKey()));
for (int i = 0; i < 32; ++i)
ss << std::setfill('0') << std::setw(2) << std::hex
<< static_cast<int>(bytes[i]);
std::string key_hex = ss.str();
std::cout << "snugdb write: len=" << e.getSize()
<< ", key=" << key_hex << "\n";
}
int out = db_->write_entry(
static_cast<uint8_t*>(const_cast<void*>(e.getKey())),
static_cast<uint8_t*>(const_cast<void*>(e.getData())),
e.getSize());
if (out != 0)
throw std::runtime_error(
"SnugDB could not write entry. Disk full? error" +
std::to_string(out));
}
void
store(std::shared_ptr<NodeObject> const& no) override
{
BatchWriteReport report;
report.writeCount = 1;
auto const start = std::chrono::steady_clock::now();
do_insert(no);
report.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start);
scheduler_.onBatchWrite(report);
}
void
storeBatch(Batch const& batch) override
{
BatchWriteReport report;
report.writeCount = batch.size();
auto const start = std::chrono::steady_clock::now();
for (auto const& e : batch)
do_insert(e);
report.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start);
scheduler_.onBatchWrite(report);
}
void
sync() override
{
}
void
for_each(std::function<void(std::shared_ptr<NodeObject>)> f) override
{
db_->visit_all(
[](uint8_t* key, uint8_t* data, uint64_t len, void* fp) -> void {
DecodedBlob decoded(key, data, len);
if (!decoded.wasOk())
{
throw std::runtime_error(
"Missing or corrupted data in snugdb");
return;
}
std::function<void(std::shared_ptr<NodeObject>)> f =
*(reinterpret_cast<
std::function<void(std::shared_ptr<NodeObject>)>*>(fp));
f(decoded.createObject());
},
reinterpret_cast<void*>(&f));
}
int
getWriteLoad() override
{
return 0;
}
void
setDeletePath() override
{
}
void
verify() override
{
}
int
fdRequired() const override
{
return 3;
}
};
//------------------------------------------------------------------------------
class SnugDBFactory : public Factory
{
public:
SnugDBFactory()
{
Manager::instance().insert(*this);
}
~SnugDBFactory() override
{
Manager::instance().erase(*this);
}
std::string
getName() const override
{
return "SnugDB";
}
std::unique_ptr<Backend>
createInstance(
size_t keyBytes,
Section const& keyValues,
std::size_t burstSize,
Scheduler& scheduler,
beast::Journal journal) override
{
return std::make_unique<SnugDBBackend>(keyValues, scheduler, journal);
}
std::unique_ptr<Backend>
createInstance(
size_t keyBytes,
Section const& keyValues,
std::size_t burstSize,
Scheduler& scheduler,
nudb::context& context,
beast::Journal journal) override
{
return std::make_unique<SnugDBBackend>(keyValues, scheduler, journal);
}
};
static SnugDBFactory snugDBFactory;
} // namespace NodeStore
} // namespace ripple

View File

@@ -1,741 +0,0 @@
#include <algorithm>
#include <array>
#include <fcntl.h>
#include <memory>
#include <mutex>
#include <shared_mutex>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>
#include <vector>
#define MMAPFLAGS (PROT_READ | PROT_WRITE | MAP_NORESERVE), MAP_SHARED
namespace snug {
int
compare_entries_reverse(const void* a, const void* b)
{
const uint64_t* a_key = static_cast<const uint64_t*>(a);
const uint64_t* b_key = static_cast<const uint64_t*>(b);
// Unrolled comparison of 4 uint64_t values (4 * 8 = 32 bytes)
if (b_key[0] > a_key[0])
return 1;
if (b_key[0] < a_key[0])
return -1;
if (b_key[1] > a_key[1])
return 1;
if (b_key[1] < a_key[1])
return -1;
if (b_key[2] > a_key[2])
return 1;
if (b_key[2] < a_key[2])
return -1;
if (b_key[3] > a_key[3])
return 1;
if (b_key[3] < a_key[3])
return -1;
return 0; // Keys are equal
}
class SnugDB
{
private:
static constexpr uint64_t SNUGSIZE =
256ull * 1024ull * 1024ull * 1024ull; // 256 GiB
static constexpr uint64_t BIGSIZE =
10ull * 1024ull * 1024ull * 1024ull * 1024ull; // 10 TiB
static constexpr size_t BUCKET_COUNT = 1048576;
std::unique_ptr<std::shared_mutex[]> mutexes =
std::make_unique<std::shared_mutex[]>(BUCKET_COUNT);
// each file snug.0 snug.1 ... is mmaped and the pointer
uint8_t* mapped_files[1024];
uint64_t mapped_files_count{0};
uint8_t* big_file; // this file has 64kib blocks in it which are used
// as an overflow for large blobs
std::mutex big_file_mutex; // locked when incrementing the "next new block"
// pointer
// only used when adding a new file
std::mutex mapped_files_count_mutex;
std::string const path;
// 0 = success
// 1 = could not open
// 2 = could not seek
// 3 = could not write at end of file
int
alloc_file(char const* fn, uint64_t size)
{
int fd = open(fn, O_WRONLY | O_CREAT | O_TRUNC, 0644);
if (fd < 0)
return 1;
// must be a multiple of bufsize
if (lseek(fd, size, SEEK_SET) == -1)
{
close(fd);
unlink(fn);
return 2;
}
if (write(fd, "", 1) != 1)
{
close(fd);
unlink(fn);
return 3;
}
close(fd);
return 0;
}
// 0 = file exists and is right size
int
check_file(char const* fn, uint64_t size)
{
struct stat st;
int file_exists = (stat(fn, &st) == 0);
if (!file_exists)
return 1;
if (st.st_size != size + 1)
return 2;
return 0;
}
#define OFFSET(byte0, byte1, byte2) \
(((((uint64_t)(byte0 & 0xFFU)) << 12) + \
(((uint64_t)(byte1 & 0xFFU)) << 4) + ((uint64_t)(byte2 & 0xFU))) \
<< 18)
// check if 32 bytes are 0, which they will be for a zero entry
#define IS_ZERO_ENTRY(x) \
(*((uint64_t*)((x) + 0)) == 0 && *((uint64_t*)((x) + 8)) == 0 && \
*((uint64_t*)((x) + 16)) == 0 && *((uint64_t*)((x) + 24)) == 0)
#define IS_ENTRY(x, y) \
(*((uint64_t*)((x) + 0)) == *((uint64_t*)((y) + 0)) && \
*((uint64_t*)((x) + 8)) == *((uint64_t*)((y) + 8)) && \
*((uint64_t*)((x) + 16)) == *((uint64_t*)((y) + 16)) && \
*((uint64_t*)((x) + 24)) == *((uint64_t*)((y) + 24)))
#define WRITE_KEY(x /* dst */, y /* src */, flags) \
{ \
*((uint64_t*)((x) + 0)) = *((uint64_t*)((y) + 0)); \
*((uint64_t*)((x) + 8)) = *((uint64_t*)((y) + 8)); \
*((uint64_t*)((x) + 16)) = *((uint64_t*)((y) + 16)); \
*((uint64_t*)((x) + 24)) = *((uint64_t*)((y) + 24)); \
*((uint64_t*)((x) + 32)) = flags; \
}
// if an entry exceeds 984 bytes then the overflow is written
// into the snug.big file in a linked list of 32kib blocks
// the first of those blocks is a control block
uint64_t
get_big_block()
{
std::unique_lock<std::mutex> lock(big_file_mutex);
uint64_t free_blocks = *((uint64_t*)(big_file + 8));
if (free_blocks == 0)
{
// no free blocks, allocate a new one
uint64_t next_block = *((uint64_t*)big_file);
// special edge case, first block ever allocated:
if (!next_block)
next_block += 32768;
*((uint64_t*)(big_file)) = next_block + 32768;
if (next_block + 32768 > BIGSIZE)
return 0;
return next_block;
}
// grab the nth one
uint8_t* offset = big_file + 16 + 8 * (free_blocks - 1);
// decrement free block counter
*(uint64_t*)(big_file + 8) -= 1;
return *((uint64_t*)offset);
}
void
unalloc_blocks(uint64_t next_block)
{
if (next_block != 0)
{
// scope the lock only if called with non-zero nextblock
std::unique_lock<std::mutex> lock(big_file_mutex);
do
{
uint64_t free_blocks = *((uint64_t*)(big_file + 8));
if (free_blocks >= 4095)
break;
uint8_t* offset = big_file + 16 + 8 * free_blocks;
*((uint64_t*)offset) = next_block;
*((uint64_t*)(big_file + 8)) += 1;
uint8_t* big_ptr = big_file + next_block;
uint64_t previous = next_block;
next_block = *((uint64_t*)(big_file + next_block));
// clear the pointer on the old block
*((uint64_t*)(big_file + previous)) = 0;
} while (next_block != 0);
}
}
/*
* First big entry is control block:
* 0 - 7: The next free new block
* 8 - 15: The number of free blocks blow
* 16 - 23 [... repeating]: The next free unused block
*/
/*
* Big entry format:
* 0 - 7: next block in chain, if any.
* 8 - 32767: payload
*/
// return 0 = failure
// > 0 = first block in the chain
uint64_t
write_big_entry_internal(uint8_t* data, ssize_t len, uint64_t next_block)
{
uint64_t first_block = 0;
uint64_t* last_block_ptr = 0;
do
{
// if next_block is populated we follow an existing pathway
// otherwise allocate a new block now
if (!next_block)
next_block = get_big_block();
if (!next_block)
return 0;
if (!first_block)
first_block = next_block;
if (last_block_ptr)
*last_block_ptr = next_block;
uint8_t* big_ptr = big_file + next_block;
// copy to the block
ssize_t to_write = len > 32760 ? 32760 : len;
memcpy(big_ptr + 8, data, to_write);
data += to_write;
len -= to_write;
next_block = *((uint64_t*)big_ptr);
last_block_ptr = (uint64_t*)big_ptr;
} while (len > 0);
// if there's a dangling chain we'll unallocate it
if (next_block != 0)
unalloc_blocks(next_block);
return first_block;
}
/*
* Entry format:
* 0 - 31: the 32 byte key
* 32 - 39: flags (high 4 bytes are flags, low 4 are size)
* 40 - 1023: data (up to 984 bytes)
*/
// 0 = success
// 1 = bucket full
// 2 = big blocks full
int
write_entry_internal(
uint8_t* data,
uint8_t* key,
uint8_t* val,
uint32_t len)
{
// find the entry
uint64_t offset = OFFSET(key[0], key[1], (key[2] >> 4));
// lock the bucket for writing
std::unique_lock<std::shared_mutex> lock(mutexes[offset >> 18]);
uint8_t* start = data + offset;
for (int i = 0; i < 256 * 1024; i += 1024)
{
bool const found = IS_ENTRY(start + i, key);
if (!found && !IS_ZERO_ENTRY(start + i))
continue;
// special edge case: the key doesn't exist and they're trying to
// delete it
if (!found && len == 0)
return 0;
// read flags
uint64_t flags = *((uint64_t*)(start + i + 32));
// big entries are tricky
bool const old_big = (flags >> 32) != 0;
bool const new_big = len > 984;
if (new_big)
{
// write_big_entry_internal(uint8_t* data, ssize_t len, uint64_t
// next_block)
uint64_t first_block = write_big_entry_internal(
val + 984, len - 984, (old_big ? (flags >> 32) : 0));
if (first_block == 0) // error state
{
if (old_big)
unalloc_blocks(flags >> 32);
return 2;
}
flags = (first_block << 32) + len;
}
else if (old_big) // big blocks exist but new value is small
{
// unallocate the old chain
unalloc_blocks(flags >> 32);
}
if (!new_big)
flags = len;
if (len == 0)
{
// deletion requests are written as zero keys
memset(start + i, 0, 1024);
}
else
{
/// write entry
WRITE_KEY(start + i, key, flags);
memcpy(start + i + 40, val, (len > 984 ? 984 : len));
}
// sort the bucket backwards so 0's appear at the end
qsort(start, 256, 1024, compare_entries_reverse);
return 0;
}
/// file (bucket) full
return 1;
}
// out_len carries the length of the output buffer when calling and is
// replaced with the length of the data found when returning
int
read_entry_internal(
uint8_t* data,
uint8_t* key,
uint8_t* val_out,
uint64_t* out_len)
{
uint64_t buf_len = *out_len;
// find the entry
uint64_t offset = OFFSET(key[0], key[1], (key[2] >> 4));
uint8_t* start = data + offset;
// lock the bucket for reading
std::shared_lock<std::shared_mutex> lock(mutexes[offset >> 18]);
for (int i = 0; i < 256 * 1024; i += 1024)
{
if (IS_ZERO_ENTRY(start + i))
return 1;
if (!IS_ENTRY(start + i, key))
continue;
// read out the value
uint64_t flags = *((uint64_t*)(start + i + 32));
uint32_t size = flags & 0xFFFFFFFFUL;
uint64_t next_block = flags >> 32;
if (size > buf_len)
return 2;
*out_len = size;
size_t to_read = size > 984 ? 984 : size;
memcpy(val_out, start + i + 40, to_read);
val_out += to_read;
size -= to_read;
// big block read logic
while (size > 0)
{
// follow big block pointers
if (!next_block)
{
printf("End while size=%d\n", size);
return 3;
}
uint8_t* big_ptr = big_file + next_block;
to_read = size > 32760 ? 32760 : size;
memcpy(val_out, big_ptr + 8, to_read);
val_out += to_read;
size -= to_read;
next_block = *((uint64_t*)big_ptr);
}
return 0;
}
return 1;
}
void
setup()
{
struct stat path_stat;
if (stat(path.c_str(), &path_stat) != 0)
throw std::runtime_error(
"Error checking path: " + path + " - " +
std::string(strerror(errno)));
if (!S_ISDIR(path_stat.st_mode))
throw std::runtime_error("Path is not a directory: " + path);
if (access(path.c_str(), R_OK | W_OK | X_OK) != 0)
throw std::runtime_error(
"Insufficient permissions for path: " + path);
// Search for existing snug files sequentially
std::vector<std::string> snug_files;
for (int file_index = 0; file_index < 1024; ++file_index)
{
std::string filename = "snug." + std::to_string(file_index);
std::string full_path = path + "/" + filename;
if (access(full_path.c_str(), F_OK) != 0)
break;
snug_files.push_back(filename);
}
// If no files found, create snug.0
if (snug_files.empty())
{
std::string new_file = path + "/snug.0";
int result = alloc_file(new_file.c_str(), SNUGSIZE);
if (result != 0)
throw std::runtime_error(
"Failed to create initial file: " + new_file);
snug_files.push_back("snug.0");
}
// Memory map all files
for (const auto& file : snug_files)
{
std::string full_path = path + "/" + file;
if (check_file(full_path.c_str(), SNUGSIZE) != 0)
throw std::runtime_error("File was the wrong size: " + file);
int fd = open(full_path.c_str(), O_RDWR);
if (fd == -1)
throw std::runtime_error("Unable to open file: " + full_path);
struct stat file_stat;
if (fstat(fd, &file_stat) == -1)
{
close(fd);
throw std::runtime_error(
"Unable to get file stats: " + full_path);
}
void* mapped = mmap(nullptr, file_stat.st_size, MMAPFLAGS, fd, 0);
close(fd); // Can close fd after mmap
if (mapped == MAP_FAILED)
throw std::runtime_error("Unable to mmap file: " + full_path);
mapped_files[mapped_files_count++] = static_cast<uint8_t*>(mapped);
}
// create and map snug.big overflow file
{
std::string new_file = path + "/snug.big";
if (check_file(new_file.c_str(), BIGSIZE) != 0)
{
int result = alloc_file(new_file.c_str(), BIGSIZE);
if (result != 0)
throw std::runtime_error(
"Failed to create initial file: " + new_file);
}
int fd = open(new_file.c_str(), O_RDWR);
if (fd == -1)
throw std::runtime_error("Unable to open file: " + new_file);
struct stat file_stat;
if (fstat(fd, &file_stat) == -1)
{
close(fd);
throw std::runtime_error(
"Unable to get file stats: " + new_file);
}
void* mapped = mmap(nullptr, file_stat.st_size, MMAPFLAGS, fd, 0);
close(fd); // Can close fd after mmap
if (mapped == MAP_FAILED)
throw std::runtime_error("Unable to mmap file: " + new_file);
big_file = static_cast<uint8_t*>(mapped);
}
}
public:
SnugDB(std::string path_) : path(path_)
{
setup();
}
~SnugDB()
{
// Unmap all files in destructor
// RH TODO: consider lock here
for (int i = 0; i < mapped_files_count; ++i)
munmap(mapped_files[i], SNUGSIZE);
// unmap the big file
munmap(big_file, BIGSIZE);
}
int
write_entry(uint8_t* key, uint8_t* val, ssize_t len)
{
for (size_t i = 0; i < mapped_files_count; ++i)
{
int result = write_entry_internal(mapped_files[i], key, val, len);
if (result == 0)
return 0;
if (result != 1) // only bucket full falls through
return result;
}
// All existing files are full, allocate a new one
{
// acquire the mutex
const std::lock_guard<std::mutex> lock(mapped_files_count_mutex);
std::string new_file =
path + "/snug." + std::to_string(mapped_files_count);
int alloc_result = alloc_file(new_file.c_str(), SNUGSIZE);
if (alloc_result != 0)
return alloc_result +
10; // Return error code from alloc_file if it fails (+10)
int fd = open(new_file.c_str(), O_RDWR);
if (fd == -1)
return 1; // Return 1 for open failure
struct stat file_stat;
if (fstat(fd, &file_stat) == -1)
{
close(fd);
return 2; // Return 2 for fstat failure
}
void* mapped = mmap(
nullptr,
file_stat.st_size,
PROT_READ | PROT_WRITE,
MAP_SHARED,
fd,
0);
close(fd); // Can close fd after mmap
if (mapped == MAP_FAILED)
return 3; // Return 3 for mmap failure
// add the new file to the map, and increment the counter
mapped_files[mapped_files_count] = static_cast<uint8_t*>(mapped);
// this is the last possible thing done
mapped_files_count++;
}
// finally write the entry
// RH TODO: consider adding a recursion guard here
return write_entry(key, val, len);
}
int
read_entry(uint8_t* key, uint8_t* val_out, uint64_t* out_len_orig)
{
for (size_t i = 0; i < mapped_files_count; ++i)
{
uint64_t out_len = *out_len_orig;
int result =
read_entry_internal(mapped_files[i], key, val_out, &out_len);
if (result == 0)
{
*out_len_orig = out_len;
return 0; // Entry found and read successfully
}
if (result == 2)
return 2; // Output buffer too small
}
// Entry not found in any file
return 1;
}
void
visit_all(
void (*f)(uint8_t*, uint8_t*, uint64_t, void* /*opaque caller val*/),
void* opaque)
{
// to visit all we only need to check snug.0 to begin with
// we go to the first bucket
// if we find no entries there we go to the next bucket
// if we find entries there then we need to count them,
// if we find 256 entries there then we go to snug.1 and so on until we
// run out we merge sort the entries into a list for the visit
for (uint64_t bucket = 0; bucket < BUCKET_COUNT; ++bucket)
{
// acquire the bucket lock
std::shared_lock<std::shared_mutex> lock(mutexes[bucket]);
// check the bucket
uint8_t* ptr = mapped_files[0] + (bucket << 18);
if (*((uint64_t*)(ptr + 32)) == 0)
continue;
// if (IS_ZERO_ENTRY(ptr))
// continue;
// live bucket, collect entries
std::vector<uint8_t*> entries;
{
// need to acquire the mutex to prevent a race condition
// where a new file is being added while we're searching
const std::lock_guard<std::mutex> lock(
mapped_files_count_mutex);
// preallocate worst case scenario, RIP memory
entries.reserve(mapped_files_count * 256);
for (int i = 0; i < mapped_files_count; ++i)
{
uint8_t* ptr = mapped_files[i] + (bucket << 18);
for (int entry_count = 0;
!IS_ZERO_ENTRY(ptr) && entry_count < 256;
++entry_count, ptr += 1024)
entries.push_back(ptr);
}
}
if (entries.empty())
continue;
// sort the entries
std::sort(
entries.begin(),
entries.end(),
[](const uint8_t* a, const uint8_t* b) {
return memcmp(a, b, 32) < 0;
});
for (auto e : entries)
{
// visitation
uint8_t* entry = &e[0];
uint64_t flags = *((uint64_t*)(entry + 32));
uint64_t next_block = flags >> 32;
uint64_t size = flags & 0xFFFFFFFFULL;
if (size <= 984)
{
f(entry, entry + 40, size, opaque);
continue;
}
// copy big entry to a buffer
std::unique_ptr<uint8_t[]> copybuf =
std::make_unique<uint8_t[]>(size);
uint8_t* data = &(copybuf[0]);
memcpy(data, entry + 40, 984);
data += 984;
size -= 984;
// big block read logic
while (size > 0)
{
// follow big block pointers
if (!next_block)
{
printf("End while size=%lu\n", size);
return;
}
uint8_t* big_ptr = big_file + next_block;
uint64_t to_read = size > 32760 ? 32760 : size;
memcpy(data, big_ptr + 8, to_read);
data += to_read;
size -= to_read;
next_block = *((uint64_t*)big_ptr);
}
f(entry, data, (flags & 0xFFFFFFFFULL), opaque);
}
}
}
};
} // namespace snug

View File

@@ -80,6 +80,7 @@ public:
std::unique_ptr<Manager>
make_Manager(
Section const& section,
beast::insight::Collector::ptr const& collector,
beast::Journal journal);

View File

@@ -89,7 +89,7 @@ struct Entry : public beast::List<Entry>::Node
int refcount;
// Exponentially decaying balance of resource consumption
DecayingSample<decayWindowSeconds, clock_type> local_balance;
DecayingSample<Tuning::getDecayWindowSeconds(), clock_type> local_balance;
// Normalized balance contribution from imports
int remote_balance;

View File

@@ -31,6 +31,7 @@
#include <ripple/resource/Fees.h>
#include <ripple/resource/Gossip.h>
#include <ripple/resource/impl/Import.h>
#include <ripple/resource/impl/Tuning.h>
#include <cassert>
#include <mutex>
@@ -88,11 +89,27 @@ private:
//--------------------------------------------------------------------------
public:
Logic(
Section const& section,
beast::insight::Collector::ptr const& collector,
clock_type& clock,
beast::Journal journal)
: m_stats(collector), m_clock(clock), m_journal(journal)
{
std::uint32_t warningThreshold;
if (get_if_exists(section, "warning_threshold", warningThreshold))
Tuning::warningThreshold = warningThreshold;
std::uint32_t dropThreshold;
if (get_if_exists(section, "drop_threshold", dropThreshold))
Tuning::dropThreshold = dropThreshold;
// std::uint32_t decayWindowSeconds;
// if (get_if_exists(section, "decay_window_seconds", decayWindowSeconds))
// Tuning::decayWindowSeconds = decayWindowSeconds;
std::uint32_t minimumGossipBalance;
if (get_if_exists(section, "minimum_gossip_balance", minimumGossipBalance))
Tuning::minimumGossipBalance = minimumGossipBalance;
}
~Logic()
@@ -200,7 +217,7 @@ public:
Json::Value
getJson()
{
return getJson(warningThreshold);
return getJson(Tuning::warningThreshold);
}
/** Returns a Json::objectValue. */
@@ -266,7 +283,7 @@ public:
{
Gossip::Item item;
item.balance = inboundEntry.local_balance.value(now);
if (item.balance >= minimumGossipBalance)
if (item.balance >= Tuning::minimumGossipBalance)
{
item.address = inboundEntry.key->address;
gossip.items.push_back(item);
@@ -294,7 +311,7 @@ public:
{
// This is a new import
Import& next(resultIt->second);
next.whenExpires = elapsed + gossipExpirationSeconds;
next.whenExpires = elapsed + Tuning::gossipExpirationSeconds;
next.items.reserve(gossip.items.size());
for (auto const& gossipItem : gossip.items)
@@ -312,7 +329,7 @@ public:
// balances and then deduct the old remote balances.
Import next;
next.whenExpires = elapsed + gossipExpirationSeconds;
next.whenExpires = elapsed + Tuning::gossipExpirationSeconds;
next.items.reserve(gossip.items.size());
for (auto const& gossipItem : gossip.items)
{
@@ -387,10 +404,10 @@ public:
static Disposition
disposition(int balance)
{
if (balance >= dropThreshold)
if (balance >= Tuning::dropThreshold)
return Disposition::drop;
if (balance >= warningThreshold)
if (balance >= Tuning::warningThreshold)
return Disposition::warn;
return Disposition::ok;
@@ -437,7 +454,7 @@ public:
break;
}
inactive_.push_back(entry);
entry.whenExpires = m_clock.now() + secondsUntilExpiration;
entry.whenExpires = m_clock.now() + Tuning::secondsUntilExpiration;
}
}
@@ -460,7 +477,7 @@ public:
std::lock_guard _(lock_);
bool notify(false);
auto const elapsed = m_clock.now();
if (entry.balance(m_clock.now()) >= warningThreshold &&
if (entry.balance(m_clock.now()) >= Tuning::warningThreshold &&
elapsed != entry.lastWarningTime)
{
charge(entry, feeWarning);
@@ -485,11 +502,11 @@ public:
bool drop(false);
clock_type::time_point const now(m_clock.now());
int const balance(entry.balance(now));
if (balance >= dropThreshold)
if (balance >= Tuning::dropThreshold)
{
JLOG(m_journal.warn())
<< "Consumer entry " << entry << " dropped with balance "
<< balance << " at or above drop threshold " << dropThreshold;
<< balance << " at or above drop threshold " << Tuning::dropThreshold;
// Adding feeDrop at this point keeps the dropped connection
// from re-connecting for at least a little while after it is

View File

@@ -45,9 +45,10 @@ private:
public:
ManagerImp(
Section const& section,
beast::insight::Collector::ptr const& collector,
beast::Journal journal)
: journal_(journal), logic_(collector, stopwatch(), journal)
: journal_(journal), logic_(section, collector, stopwatch(), journal)
{
thread_ = std::thread{&ManagerImp::run, this};
}
@@ -173,10 +174,11 @@ Manager::~Manager() = default;
std::unique_ptr<Manager>
make_Manager(
Section const& section,
beast::insight::Collector::ptr const& collector,
beast::Journal journal)
{
return std::make_unique<ManagerImp>(collector, journal);
return std::make_unique<ManagerImp>(section, collector, journal);
}
} // namespace Resource

View File

@@ -0,0 +1,34 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "Tuning.h"
namespace ripple {
namespace Resource {
uint32_t Tuning::warningThreshold = 5000;
uint32_t Tuning::dropThreshold = 15000;
// uint32_t Tuning::decayWindowSeconds = 32;
uint32_t Tuning::minimumGossipBalance = 1000;
std::chrono::seconds constexpr Tuning::secondsUntilExpiration{300};
std::chrono::seconds constexpr Tuning::gossipExpirationSeconds{30};
} // namespace Resource
} // namespace ripple

View File

@@ -17,6 +17,7 @@
*/
//==============================================================================
// Tuning.h
#ifndef RIPPLE_RESOURCE_TUNING_H_INCLUDED
#define RIPPLE_RESOURCE_TUNING_H_INCLUDED
@@ -25,31 +26,23 @@
namespace ripple {
namespace Resource {
/** Tunable constants. */
enum {
// Balance at which a warning is issued
warningThreshold = 5000
class Tuning
{
public:
static std::uint32_t warningThreshold;
static std::uint32_t dropThreshold;
// static std::uint32_t decayWindowSeconds;
static std::uint32_t minimumGossipBalance;
// Balance at which the consumer is disconnected
,
dropThreshold = 15000
static std::chrono::seconds const secondsUntilExpiration;
static std::chrono::seconds const gossipExpirationSeconds;
// The number of seconds in the exponential decay window
// (This should be a power of two)
,
decayWindowSeconds = 32
// The minimum balance required in order to include a load source in gossip
,
minimumGossipBalance = 1000
static constexpr std::uint32_t getDecayWindowSeconds()
{
return 32;
}
};
// The number of seconds until an inactive table item is removed
std::chrono::seconds constexpr secondsUntilExpiration{300};
// Number of seconds until imported gossip expires
std::chrono::seconds constexpr gossipExpirationSeconds{30};
} // namespace Resource
} // namespace ripple

View File

@@ -61,7 +61,22 @@ getDeliveredAmount(
if (serializedTx->isFieldPresent(sfAmount))
{
return serializedTx->getFieldAmount(sfAmount);
using namespace std::chrono_literals;
// Ledger 4594095 is the first ledger in which the DeliveredAmount field
// was present when a partial payment was made and its absence indicates
// that the amount delivered is listed in the Amount field.
//
// If the ledger closed long after the DeliveredAmount code was deployed
// then its absence indicates that the amount delivered is listed in the
// Amount field. DeliveredAmount went live January 24, 2014.
// 446000000 is in Feb 2014, well after DeliveredAmount went live
if (getLedgerIndex() >= 4594095 ||
getCloseTime() > NetClock::time_point{446000000s} ||
(serializedTx && serializedTx->isFieldPresent(sfNetworkID)))
{
return serializedTx->getFieldAmount(sfAmount);
}
}
return {};

View File

@@ -5466,7 +5466,6 @@ private:
params[jss::transaction] = txIds[i];
auto const jrr = env.rpc("json", "tx", to_string(params));
auto const meta = jrr[jss::result][jss::meta];
BEAST_EXPECT(meta[jss::delivered_amount] == "1000000");
for (auto const& node : meta[sfAffectedNodes.jsonName])
{
auto const nodeType = node[sfLedgerEntryType.jsonName];

View File

@@ -23,6 +23,7 @@
#include <ripple/resource/Consumer.h>
#include <ripple/resource/impl/Entry.h>
#include <ripple/resource/impl/Logic.h>
#include <ripple/resource/impl/Tuning.h>
#include <test/unit_test/SuiteJournal.h>
#include <boost/utility/base_from_member.hpp>
@@ -42,8 +43,8 @@ public:
using clock_type = boost::base_from_member<TestStopwatch>;
public:
explicit TestLogic(beast::Journal journal)
: Logic(beast::insight::NullCollector::New(), member, journal)
explicit TestLogic(Section const& section, beast::Journal journal)
: Logic(section, beast::insight::NullCollector::New(), member, journal)
{
}
@@ -89,9 +90,13 @@ public:
else
testcase("Unlimited warn/drop");
TestLogic logic(j);
auto const config = test::jtx::envconfig([](std::unique_ptr<Config> cfg) {
return cfg;
});
Charge const fee(dropThreshold + 1);
TestLogic logic(config->section("resource"), j);
Charge const fee(Tuning::dropThreshold + 1);
beast::IP::Endpoint const addr(
beast::IP::Endpoint::from_string("192.0.2.2"));
@@ -173,7 +178,7 @@ public:
using namespace std::chrono_literals;
// Give Consumer time to become readmitted. Should never
// exceed expiration time.
auto n = secondsUntilExpiration + 1s;
auto n = Tuning::secondsUntilExpiration + 1s;
while (--n > 0s)
{
++logic.clock();
@@ -199,7 +204,11 @@ public:
{
testcase("Imports");
TestLogic logic(j);
auto const config = test::jtx::envconfig([](std::unique_ptr<Config> cfg) {
return cfg;
});
TestLogic logic(config->section("resource"), j);
Gossip g[5];
@@ -217,7 +226,11 @@ public:
{
testcase("Import");
TestLogic logic(j);
auto const config = test::jtx::envconfig([](std::unique_ptr<Config> cfg) {
return cfg;
});
TestLogic logic(config->section("resource"), j);
Gossip g;
Gossip::Item item;
@@ -236,7 +249,11 @@ public:
{
testcase("Charge");
TestLogic logic(j);
auto const config = test::jtx::envconfig([](std::unique_ptr<Config> cfg) {
return cfg;
});
TestLogic logic(config->section("resource"), j);
{
beast::IP::Endpoint address(
@@ -275,6 +292,41 @@ public:
pass();
}
void
testConfig(beast::Journal j)
{
std::cout << "warningThreshold: " << Tuning::warningThreshold << "\n";
std::cout << "dropThreshold: " << Tuning::dropThreshold << "\n";
std::cout << "getDecayWindowSeconds: " << Tuning::getDecayWindowSeconds() << "\n";
std::cout << "minimumGossipBalance: " << Tuning::minimumGossipBalance << "\n";
BEAST_EXPECT(Tuning::warningThreshold == 5000);
BEAST_EXPECT(Tuning::dropThreshold == 15000);
BEAST_EXPECT(Tuning::getDecayWindowSeconds() == 32);
BEAST_EXPECT(Tuning::minimumGossipBalance == 1000);
BEAST_EXPECT(Tuning::secondsUntilExpiration == std::chrono::seconds{300});
BEAST_EXPECT(Tuning::gossipExpirationSeconds == std::chrono::seconds{30});
auto const config = test::jtx::envconfig([](std::unique_ptr<Config> cfg) {
cfg->section("resource").set("warning_threshold", "15000");
cfg->section("resource").set("drop_threshold", "25000");
cfg->section("resource").set("minimum_gossip_balance", "2000");
cfg->section("resource").set("seconds_until_expiration", "600");
cfg->section("resource").set("gossip_expiration_seconds", "60");
return cfg;
});
TestLogic logic(config->section("resource"), j);
BEAST_EXPECT(Tuning::warningThreshold == 15000);
BEAST_EXPECT(Tuning::dropThreshold == 25000);
BEAST_EXPECT(Tuning::getDecayWindowSeconds() == 32);
BEAST_EXPECT(Tuning::minimumGossipBalance == 2000);
// BEAST_EXPECT(Tuning::secondsUntilExpiration == 600);
// BEAST_EXPECT(Tuning::gossipExpirationSeconds == 60);
}
void
run() override
{
@@ -286,6 +338,7 @@ public:
testCharges(journal);
testImports(journal);
testImport(journal);
testConfig(journal);
}
};

View File

@@ -191,73 +191,80 @@ class DeliveredAmount_test : public beast::unit_test::suite
auto const gw = Account("gateway");
auto const USD = gw["USD"];
Env env{*this, features};
env.fund(XRP(10000), alice, bob, carol, gw);
env.trust(USD(1000), alice, bob, carol);
env.close();
CheckDeliveredAmount checkDeliveredAmount{true};
for (bool const afterSwitchTime : {true, false})
{
// add payments, but do no close until subscribed
Env env{*this, features};
env.fund(XRP(10000), alice, bob, carol, gw);
env.trust(USD(1000), alice, bob, carol);
if (afterSwitchTime)
env.close(NetClock::time_point{446000000s});
else
env.close();
// normal payments
env(pay(gw, alice, USD(50)));
checkDeliveredAmount.adjCountersSuccess();
env(pay(gw, alice, XRP(50)));
checkDeliveredAmount.adjCountersSuccess();
// partial payment
env(pay(gw, bob, USD(9999999)), txflags(tfPartialPayment));
checkDeliveredAmount.adjCountersPartialPayment();
env.require(balance(bob, USD(1000)));
// failed payment
env(pay(bob, carol, USD(9999999)), ter(tecPATH_PARTIAL));
checkDeliveredAmount.adjCountersFail();
env.require(balance(carol, USD(0)));
}
auto wsc = makeWSClient(env.app().config());
{
Json::Value stream;
// RPC subscribe to ledger stream
stream[jss::streams] = Json::arrayValue;
stream[jss::streams].append("ledger");
stream[jss::accounts] = Json::arrayValue;
stream[jss::accounts].append(toBase58(alice.id()));
stream[jss::accounts].append(toBase58(bob.id()));
stream[jss::accounts].append(toBase58(carol.id()));
auto jv = wsc->invoke("subscribe", stream);
if (wsc->version() == 2)
CheckDeliveredAmount checkDeliveredAmount{afterSwitchTime};
{
BEAST_EXPECT(
jv.isMember(jss::jsonrpc) && jv[jss::jsonrpc] == "2.0");
BEAST_EXPECT(
jv.isMember(jss::ripplerpc) && jv[jss::ripplerpc] == "2.0");
BEAST_EXPECT(jv.isMember(jss::id) && jv[jss::id] == 5);
// add payments, but do no close until subscribed
// normal payments
env(pay(gw, alice, USD(50)));
checkDeliveredAmount.adjCountersSuccess();
env(pay(gw, alice, XRP(50)));
checkDeliveredAmount.adjCountersSuccess();
// partial payment
env(pay(gw, bob, USD(9999999)), txflags(tfPartialPayment));
checkDeliveredAmount.adjCountersPartialPayment();
env.require(balance(bob, USD(1000)));
// failed payment
env(pay(bob, carol, USD(9999999)), ter(tecPATH_PARTIAL));
checkDeliveredAmount.adjCountersFail();
env.require(balance(carol, USD(0)));
}
BEAST_EXPECT(jv[jss::result][jss::ledger_index] == 3);
}
{
env.close();
// Check stream update
while (true)
auto wsc = makeWSClient(env.app().config());
{
auto const r = wsc->findMsg(1s, [&](auto const& jv) {
return jv[jss::ledger_index] == 4;
});
if (!r)
break;
if (!r->isMember(jss::transaction))
continue;
BEAST_EXPECT(checkDeliveredAmount.checkTxn(
(*r)[jss::transaction], (*r)[jss::meta]));
Json::Value stream;
// RPC subscribe to ledger stream
stream[jss::streams] = Json::arrayValue;
stream[jss::streams].append("ledger");
stream[jss::accounts] = Json::arrayValue;
stream[jss::accounts].append(toBase58(alice.id()));
stream[jss::accounts].append(toBase58(bob.id()));
stream[jss::accounts].append(toBase58(carol.id()));
auto jv = wsc->invoke("subscribe", stream);
if (wsc->version() == 2)
{
BEAST_EXPECT(
jv.isMember(jss::jsonrpc) && jv[jss::jsonrpc] == "2.0");
BEAST_EXPECT(
jv.isMember(jss::ripplerpc) &&
jv[jss::ripplerpc] == "2.0");
BEAST_EXPECT(jv.isMember(jss::id) && jv[jss::id] == 5);
}
BEAST_EXPECT(jv[jss::result][jss::ledger_index] == 3);
}
{
env.close();
// Check stream update
while (true)
{
auto const r = wsc->findMsg(1s, [&](auto const& jv) {
return jv[jss::ledger_index] == 4;
});
if (!r)
break;
if (!r->isMember(jss::transaction))
continue;
BEAST_EXPECT(checkDeliveredAmount.checkTxn(
(*r)[jss::transaction], (*r)[jss::meta]));
}
}
BEAST_EXPECT(checkDeliveredAmount.checkExpectedCounters());
}
BEAST_EXPECT(checkDeliveredAmount.checkExpectedCounters());
}
void
testTxDeliveredAmountRPC(FeatureBitset features)
@@ -273,41 +280,49 @@ class DeliveredAmount_test : public beast::unit_test::suite
auto const gw = Account("gateway");
auto const USD = gw["USD"];
Env env{*this, features};
env.fund(XRP(10000), alice, bob, carol, gw);
env.trust(USD(1000), alice, bob, carol);
env.close();
for (bool const afterSwitchTime : {true, false})
{
Env env{*this, features};
env.fund(XRP(10000), alice, bob, carol, gw);
env.trust(USD(1000), alice, bob, carol);
if (afterSwitchTime)
env.close(NetClock::time_point{446000000s});
else
env.close();
CheckDeliveredAmount checkDeliveredAmount{true};
// normal payments
env(pay(gw, alice, USD(50)));
checkDeliveredAmount.adjCountersSuccess();
env(pay(gw, alice, XRP(50)));
checkDeliveredAmount.adjCountersSuccess();
CheckDeliveredAmount checkDeliveredAmount{afterSwitchTime};
// normal payments
env(pay(gw, alice, USD(50)));
checkDeliveredAmount.adjCountersSuccess();
env(pay(gw, alice, XRP(50)));
checkDeliveredAmount.adjCountersSuccess();
// partial payment
env(pay(gw, bob, USD(9999999)), txflags(tfPartialPayment));
checkDeliveredAmount.adjCountersPartialPayment();
env.require(balance(bob, USD(1000)));
// partial payment
env(pay(gw, bob, USD(9999999)), txflags(tfPartialPayment));
checkDeliveredAmount.adjCountersPartialPayment();
env.require(balance(bob, USD(1000)));
// failed payment
env(pay(gw, carol, USD(9999999)), ter(tecPATH_PARTIAL));
checkDeliveredAmount.adjCountersFail();
env.require(balance(carol, USD(0)));
// failed payment
env(pay(gw, carol, USD(9999999)), ter(tecPATH_PARTIAL));
checkDeliveredAmount.adjCountersFail();
env.require(balance(carol, USD(0)));
env.close();
std::string index;
Json::Value jvParams;
jvParams[jss::ledger_index] = 4u;
jvParams[jss::transactions] = true;
jvParams[jss::expand] = true;
auto const jtxn = env.rpc(
"json",
"ledger",
to_string(jvParams))[jss::result][jss::ledger][jss::transactions];
for (auto const& t : jtxn)
BEAST_EXPECT(checkDeliveredAmount.checkTxn(t, t[jss::metaData]));
BEAST_EXPECT(checkDeliveredAmount.checkExpectedCounters());
env.close();
std::string index;
Json::Value jvParams;
jvParams[jss::ledger_index] = 4u;
jvParams[jss::transactions] = true;
jvParams[jss::expand] = true;
auto const jtxn = env.rpc(
"json",
"ledger",
to_string(
jvParams))[jss::result][jss::ledger][jss::transactions];
for (auto const& t : jtxn)
BEAST_EXPECT(
checkDeliveredAmount.checkTxn(t, t[jss::metaData]));
BEAST_EXPECT(checkDeliveredAmount.checkExpectedCounters());
}
}
public:

View File

@@ -276,11 +276,11 @@ class NoRippleCheckLimits_test : public beast::unit_test::suite
Endpoint::from_string(test::getEnvLocalhostAddr()));
// if we go above the warning threshold, reset
if (c.balance() > warningThreshold)
if (c.balance() > Tuning::warningThreshold)
{
using ct = beast::abstract_clock<steady_clock>;
c.entry().local_balance =
DecayingSample<decayWindowSeconds, ct>{steady_clock::now()};
DecayingSample<Tuning::getDecayWindowSeconds(), ct>{steady_clock::now()};
}
};

View File

@@ -57,6 +57,7 @@ public:
{
Env env(*this);
auto const result = env.rpc("server_definitions");
std::cout << "RESULT: " << result << "\n";
BEAST_EXPECT(!result[jss::result].isMember(jss::error));
BEAST_EXPECT(result[jss::result].isMember(jss::FIELDS));
BEAST_EXPECT(result[jss::result].isMember(jss::LEDGER_ENTRY_TYPES));