Fine tune cache download (#215)

* Fine tune cache download

* Allow operators to specify the max number of concurrent markers. The
  software generates possible markers from ledger diffs, as before, but
  only processes a specified number at one time, which caps database
  reads and distributes the load more evenly over the entire download.
* Allow operators to specify the page fetch size during the cache
  download, which is the number of ledger objects to fetch per marker at
  one time.

* Refactor full ledger dump in test.py
This commit is contained in:
CJ Cobb
2022-07-26 15:00:27 -04:00
committed by GitHub
parent 3edead32ba
commit 2ffd98f895
3 changed files with 99 additions and 71 deletions

View File

@@ -914,7 +914,7 @@ ReportingETL::loadCache(uint32_t seq)
a.insert(std::end(a), std::begin(b), std::end(b));
};
for (size_t i = 0; i < numDiffs_; ++i)
for (size_t i = 0; i < numCacheDiffs_; ++i)
{
append(diff, Backend::synchronousAndRetryOnTimeout([&](auto yield) {
return backend_->fetchLedgerDiff(seq - i, yield);
@@ -949,55 +949,73 @@ ReportingETL::loadCache(uint32_t seq)
<< "Loading cache. num cursors = " << cursors.size() - 1;
BOOST_LOG_TRIVIAL(debug) << __func__ << " cursors = " << cursorStr.str();
std::atomic_uint* numRemaining = new std::atomic_uint{cursors.size() - 1};
auto startTime = std::chrono::system_clock::now();
for (size_t i = 0; i < cursors.size() - 1; ++i)
{
std::optional<ripple::uint256> start = cursors[i];
std::optional<ripple::uint256> end = cursors[i + 1];
boost::asio::spawn(
ioContext_,
[this, seq, start, end, numRemaining, startTime](
boost::asio::yield_context yield) {
std::optional<ripple::uint256> cursor = start;
while (true)
{
auto res =
Backend::retryOnTimeout([this, seq, &cursor, &yield]() {
return backend_->fetchLedgerPage(
cursor, seq, 256, false, yield);
});
backend_->cache().update(res.objects, seq, true);
if (!res.cursor || (end && *(res.cursor) > *end))
break;
cacheDownloader_ = std::thread{[this, seq, cursors]() {
auto startTime = std::chrono::system_clock::now();
std::atomic_int markers = 0;
std::atomic_int numRemaining = cursors.size() - 1;
for (size_t i = 0; i < cursors.size() - 1; ++i)
{
std::optional<ripple::uint256> start = cursors[i];
std::optional<ripple::uint256> end = cursors[i + 1];
markers.wait(numCacheMarkers_);
++markers;
boost::asio::spawn(
ioContext_,
[this, seq, start, end, &numRemaining, startTime, &markers](
boost::asio::yield_context yield) {
std::optional<ripple::uint256> cursor = start;
std::string cursorStr = cursor.has_value()
? ripple::strHex(cursor.value())
: ripple::strHex(Backend::firstKey);
BOOST_LOG_TRIVIAL(debug)
<< "Loading cache. cache size = "
<< backend_->cache().size()
<< " - cursor = " << ripple::strHex(res.cursor.value());
cursor = std::move(res.cursor);
}
if (--(*numRemaining) == 0)
{
auto endTime = std::chrono::system_clock::now();
auto duration =
std::chrono::duration_cast<std::chrono::seconds>(
endTime - startTime);
BOOST_LOG_TRIVIAL(info)
<< "Finished loading cache. cache size = "
<< backend_->cache().size() << ". Took "
<< duration.count() << " seconds";
backend_->cache().setFull();
delete numRemaining;
}
else
{
BOOST_LOG_TRIVIAL(info)
<< "Finished a cursor. num remaining = "
<< *numRemaining;
}
});
}
<< "Starting a cursor: " << cursorStr
<< " markers = " << markers;
while (!stopping_)
{
auto res = Backend::retryOnTimeout([this,
seq,
&cursor,
&yield]() {
return backend_->fetchLedgerPage(
cursor, seq, cachePageFetchSize_, false, yield);
});
backend_->cache().update(res.objects, seq, true);
if (!res.cursor || (end && *(res.cursor) > *end))
break;
BOOST_LOG_TRIVIAL(debug)
<< "Loading cache. cache size = "
<< backend_->cache().size() << " - cursor = "
<< ripple::strHex(res.cursor.value())
<< " start = " << cursorStr
<< " markers = " << markers;
cursor = std::move(res.cursor);
}
--markers;
markers.notify_one();
if (--numRemaining == 0)
{
auto endTime = std::chrono::system_clock::now();
auto duration =
std::chrono::duration_cast<std::chrono::seconds>(
endTime - startTime);
BOOST_LOG_TRIVIAL(info)
<< "Finished loading cache. cache size = "
<< backend_->cache().size() << ". Took "
<< duration.count() << " seconds";
backend_->cache().setFull();
}
else
{
BOOST_LOG_TRIVIAL(info)
<< "Finished a cursor. num remaining = "
<< numRemaining << " start = " << cursorStr
<< " markers = " << markers;
}
});
}
}};
// If loading synchronously, poll cache until full
while (cacheLoadStyle_ == CacheLoadStyle::SYNC &&
!backend_->cache().isFull())
@@ -1107,9 +1125,12 @@ ReportingETL::ReportingETL(
if (entry == "none" || entry == "no")
cacheLoadStyle_ = CacheLoadStyle::NOT_AT_ALL;
}
if (cache.contains("num_diffs") && cache.at("num_diffs").as_int64())
{
numDiffs_ = cache.at("num_diffs").as_int64();
}
if (cache.contains("num_diffs") && cache.at("num_diffs").is_int64())
numCacheDiffs_ = cache.at("num_diffs").as_int64();
if (cache.contains("num_markers") && cache.at("num_markers").is_int64())
numCacheMarkers_ = cache.at("num_markers").as_int64();
if (cache.contains("page_fetch_size") &&
cache.at("page_fetch_size").is_int64())
cachePageFetchSize_ = cache.at("page_fetch_size").as_int64();
}
}

View File

@@ -52,7 +52,15 @@ private:
// number of diffs to use to generate cursors to traverse the ledger in
// parallel during initial cache download
size_t numDiffs_ = 1;
size_t numCacheDiffs_ = 32;
// number of markers to use at one time to traverse the ledger in parallel
// during initial cache download
size_t numCacheMarkers_ = 48;
// number of ledger objects to fetch concurrently per marker during cache
// download
size_t cachePageFetchSize_ = 512;
// thread responsible for syncing the cache on startup
std::thread cacheDownloader_;
std::thread worker_;
boost::asio::io_context& ioContext_;
@@ -313,6 +321,8 @@ public:
if (worker_.joinable())
worker_.join();
if (cacheDownloader_.joinable())
cacheDownloader_.join();
BOOST_LOG_TRIVIAL(debug) << "Joined ReportingETL worker thread";
}

31
test.py
View File

@@ -475,14 +475,13 @@ async def ledger_data(ip, port, ledger, limit, binary, cursor):
except websockets.exceptions.connectionclosederror as e:
print(e)
def writeLedgerData(data,filename):
print(len(data[0]))
def writeLedgerData(state,filename):
print(len(state))
with open(filename,'w') as f:
data[0].sort()
data[1].sort()
for k,v in zip(data[0],data[1]):
for k,v in state.items():
f.write(k)
f.write('\n')
f.write(':')
f.write(v)
f.write('\n')
@@ -490,15 +489,14 @@ def writeLedgerData(data,filename):
async def ledger_data_full(ip, port, ledger, binary, limit, typ=None, count=-1, marker = None):
address = 'ws://' + str(ip) + ':' + str(port)
try:
blobs = []
keys = []
state = {}
async with websockets.connect(address,max_size=1000000000) as ws:
if int(limit) < 2048:
limit = 2048
while True:
res = {}
if marker is None:
await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"binary":binary, "limit":int(limit)}))
await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"binary":binary, "limit":int(limit),"out_of_order":True}))
res = json.loads(await ws.recv())
else:
@@ -520,16 +518,15 @@ async def ledger_data_full(ip, port, ledger, binary, limit, typ=None, count=-1,
if binary:
if typ is None or x["data"][2:6] == typ:
#print(json.dumps(x))
keys.append(x["index"])
state[x["index"]] = x["data"]
else:
if typ is None or x["LedgerEntryType"] == typ:
blobs.append(x)
keys.append(x["index"])
if count != -1 and len(keys) > count:
state[x["index"]] = x
if count != -1 and len(state) > count:
print("stopping early")
print(len(keys))
print(len(state))
print("done")
return (keys,blobs)
return state
if "cursor" in res:
marker = res["cursor"]
print(marker)
@@ -538,7 +535,7 @@ async def ledger_data_full(ip, port, ledger, binary, limit, typ=None, count=-1,
print(marker)
else:
print("done")
return (keys, blobs)
return state
except websockets.exceptions.connectionclosederror as e:
@@ -1263,7 +1260,7 @@ def run(args):
res = asyncio.get_event_loop().run_until_complete(
ledger_data_full(args.ip, args.port, args.ledger, bool(args.binary), args.limit,args.type, int(args.count), args.marker))
print(len(res[0]))
print(len(res))
if args.verify:
writeLedgerData(res,args.filename)