don't publish stale data

This commit is contained in:
CJ Cobb
2022-01-26 20:49:27 +00:00
parent bc131f666a
commit 95a35caf49
3 changed files with 19 additions and 16 deletions

View File

@@ -120,4 +120,6 @@ uint256ToString(ripple::uint256 const& uint)
{ {
return {reinterpret_cast<const char*>(uint.data()), uint.size()}; return {reinterpret_cast<const char*>(uint.data()), uint.size()};
} }
static constexpr uint32_t rippleEpochStart = 946684800;
#endif #endif

View File

@@ -658,11 +658,22 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
// success is false if the ledger was already written // success is false if the ledger was already written
if (success) if (success)
{ {
/* auto now =
boost::asio::post(publishStrand_, [this, lgrInfo = lgrInfo]() { std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
auto closeTime = lgrInfo.closeTime.time_since_epoch().count();
auto age = now - (rippleEpochStart + closeTime);
// if the ledger closed over 10 seconds ago, assume we are still
// catching up and don't publish
if (age < 10)
{
boost::asio::post(
publishStrand_, [this, lgrInfo = lgrInfo]() {
publishLedger(lgrInfo); publishLedger(lgrInfo);
}); });
*/ }
backend_->updateRange(lgrInfo.seq); backend_->updateRange(lgrInfo.seq);
lastPublishedSequence = lgrInfo.seq; lastPublishedSequence = lgrInfo.seq;
} }

12
test.py
View File

@@ -825,17 +825,7 @@ async def subscribe(ip, port):
address = 'ws://' + str(ip) + ':' + str(port) address = 'ws://' + str(ip) + ':' + str(port)
try: try:
async with websockets.connect(address) as ws: async with websockets.connect(address) as ws:
await ws.send(json.dumps({"command":"server_info"})); await ws.send(json.dumps({"command":"subscribe","streams":["ledger"]}))
print(json.loads(await ws.recv()))
await ws.send(json.dumps({"command":"server_info"}));
print(json.loads(await ws.recv()))
await ws.send(json.dumps({"command":"server_info"}));
await ws.send(json.dumps({"command":"server_info"}));
await ws.send(json.dumps({"command":"server_info"}));
print(json.loads(await ws.recv()))
print(json.loads(await ws.recv()))
print(json.loads(await ws.recv()))
await ws.send(json.dumps({"command":"subscribe","streams":["ledger"],"books":[{"snapshot":True,"taker_pays":{"currency":"XRP"},"taker_gets":{"currency":"USD","issuer":"rhub8VRN55s94qWKDv6jmDy1pUykJzF3wq"}}]}))
#await ws.send(json.dumps({"command":"subscribe","streams":["manifests"]})) #await ws.send(json.dumps({"command":"subscribe","streams":["manifests"]}))
while True: while True:
res = json.loads(await ws.recv()) res = json.loads(await ws.recv())