diff --git a/src/backend/DBHelpers.h b/src/backend/DBHelpers.h index 06548e7d..b89c67ad 100644 --- a/src/backend/DBHelpers.h +++ b/src/backend/DBHelpers.h @@ -120,4 +120,6 @@ uint256ToString(ripple::uint256 const& uint) { return {reinterpret_cast(uint.data()), uint.size()}; } + +static constexpr uint32_t rippleEpochStart = 946684800; #endif diff --git a/src/etl/ReportingETL.cpp b/src/etl/ReportingETL.cpp index 6169b490..a29dc34a 100644 --- a/src/etl/ReportingETL.cpp +++ b/src/etl/ReportingETL.cpp @@ -658,11 +658,22 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) // success is false if the ledger was already written if (success) { - /* - boost::asio::post(publishStrand_, [this, lgrInfo = lgrInfo]() { - publishLedger(lgrInfo); - }); - */ + auto now = + std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + auto closeTime = lgrInfo.closeTime.time_since_epoch().count(); + auto age = now - (rippleEpochStart + closeTime); + // if the ledger closed over 10 seconds ago, assume we are still + // catching up and don't publish + if (age < 10) + { + boost::asio::post( + publishStrand_, [this, lgrInfo = lgrInfo]() { + publishLedger(lgrInfo); + }); + } + backend_->updateRange(lgrInfo.seq); lastPublishedSequence = lgrInfo.seq; } diff --git a/test.py b/test.py index 86529f68..2dec6755 100755 --- a/test.py +++ b/test.py @@ -825,17 +825,7 @@ async def subscribe(ip, port): address = 'ws://' + str(ip) + ':' + str(port) try: async with websockets.connect(address) as ws: - await ws.send(json.dumps({"command":"server_info"})); - print(json.loads(await ws.recv())) - await ws.send(json.dumps({"command":"server_info"})); - print(json.loads(await ws.recv())) - await ws.send(json.dumps({"command":"server_info"})); - await ws.send(json.dumps({"command":"server_info"})); - await ws.send(json.dumps({"command":"server_info"})); - print(json.loads(await ws.recv())) - print(json.loads(await ws.recv())) - print(json.loads(await ws.recv())) - await ws.send(json.dumps({"command":"subscribe","streams":["ledger"],"books":[{"snapshot":True,"taker_pays":{"currency":"XRP"},"taker_gets":{"currency":"USD","issuer":"rhub8VRN55s94qWKDv6jmDy1pUykJzF3wq"}}]})) + await ws.send(json.dumps({"command":"subscribe","streams":["ledger"]})) #await ws.send(json.dumps({"command":"subscribe","streams":["manifests"]})) while True: res = json.loads(await ws.recv())