proper handling of deleted objects

This commit is contained in:
CJ Cobb
2022-03-16 10:04:56 -04:00
committed by CJ Cobb
parent f41bfa3715
commit f8437b9ba4
6 changed files with 38 additions and 24 deletions

View File

@@ -250,7 +250,8 @@ BackendInterface::fetchLedgerPage(
LedgerPage page; LedgerPage page;
std::vector<ripple::uint256> keys; std::vector<ripple::uint256> keys;
while (keys.size() < limit) bool reachedEnd = false;
while (keys.size() < limit && !reachedEnd)
{ {
ripple::uint256 const& curCursor = keys.size() ? keys.back() ripple::uint256 const& curCursor = keys.size() ? keys.back()
: cursor ? *cursor : cursor ? *cursor
@@ -258,15 +259,17 @@ BackendInterface::fetchLedgerPage(
uint32_t seq = outOfOrder ? range->maxSequence : ledgerSequence; uint32_t seq = outOfOrder ? range->maxSequence : ledgerSequence;
auto succ = fetchSuccessorKey(curCursor, seq, yield); auto succ = fetchSuccessorKey(curCursor, seq, yield);
if (!succ) if (!succ)
break; reachedEnd = true;
BOOST_LOG_TRIVIAL(trace) << ripple::strHex(*succ); else
keys.push_back(std::move(*succ)); keys.push_back(std::move(*succ));
} }
auto objects = fetchLedgerObjects(keys, ledgerSequence, yield); auto objects = fetchLedgerObjects(keys, ledgerSequence, yield);
for (size_t i = 0; i < objects.size(); ++i) for (size_t i = 0; i < objects.size(); ++i)
{ {
if (!objects[i].size()) if (objects[i].size())
page.objects.push_back({std::move(keys[i]), std::move(objects[i])});
else if (!outOfOrder)
{ {
BOOST_LOG_TRIVIAL(error) BOOST_LOG_TRIVIAL(error)
<< __func__ << " incorrect successor table. key = " << __func__ << " incorrect successor table. key = "
@@ -277,12 +280,11 @@ BackendInterface::fetchLedgerPage(
msg << " - " << ripple::strHex(keys[j]); msg << " - " << ripple::strHex(keys[j]);
} }
BOOST_LOG_TRIVIAL(error) << __func__ << msg.str(); BOOST_LOG_TRIVIAL(error) << __func__ << msg.str();
assert(false);
} }
assert(objects[i].size());
page.objects.push_back({std::move(keys[i]), std::move(objects[i])});
} }
if (page.objects.size() >= limit) if (!reachedEnd)
page.cursor = page.objects.back().key; page.cursor = keys.back();
return page; return page;
} }

View File

@@ -758,8 +758,9 @@ CassandraBackend::doOnlineDelete(
std::optional<ripple::uint256> cursor; std::optional<ripple::uint256> cursor;
while (true) while (true)
{ {
auto [objects, curCursor] = retryOnTimeout( auto [objects, curCursor] = retryOnTimeout([&]() {
[&]() { return fetchLedgerPage(cursor, minLedger, 256, yield); }); return fetchLedgerPage(cursor, minLedger, 256, false, yield);
});
for (auto& obj : objects) for (auto& obj : objects)
{ {

View File

@@ -812,8 +812,9 @@ PostgresBackend::doOnlineDelete(
std::optional<ripple::uint256> cursor; std::optional<ripple::uint256> cursor;
while (true) while (true)
{ {
auto [objects, curCursor] = retryOnTimeout( auto [objects, curCursor] = retryOnTimeout([&]() {
[&]() { return fetchLedgerPage(cursor, minLedger, 256, yield); }); return fetchLedgerPage(cursor, minLedger, 256, false, yield);
});
BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page"; BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page";
std::stringstream objectsBuffer; std::stringstream objectsBuffer;

View File

@@ -69,12 +69,14 @@ doLedgerData(Context const& context)
else else
return Status{Error::rpcINVALID_PARAMS, "markerNotString"}; return Status{Error::rpcINVALID_PARAMS, "markerNotString"};
} }
else
{
BOOST_LOG_TRIVIAL(debug) << __func__ << " : parsing marker";
BOOST_LOG_TRIVIAL(debug) << __func__ << " : parsing marker"; cursor = ripple::uint256{};
if (!cursor->parseHex(request.at("marker").as_string().c_str()))
cursor = ripple::uint256{}; return Status{Error::rpcINVALID_PARAMS, "markerMalformed"};
if (!cursor->parseHex(request.at("marker").as_string().c_str())) }
return Status{Error::rpcINVALID_PARAMS, "markerMalformed"};
} }
auto v = ledgerInfoFromRequest(context); auto v = ledgerInfoFromRequest(context);
@@ -163,8 +165,9 @@ doLedgerData(Context const& context)
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)
<< __func__ << " number of results = " << results.size() << __func__ << " number of results = " << results.size()
<< " fetched in " << time << "microseconds"; << " fetched in " << time << " microseconds";
boost::json::array objects; boost::json::array objects;
objects.reserve(results.size());
for (auto const& [key, object] : results) for (auto const& [key, object] : results)
{ {
ripple::STLedgerEntry sle{ ripple::STLedgerEntry sle{
@@ -174,12 +177,19 @@ doLedgerData(Context const& context)
boost::json::object entry; boost::json::object entry;
entry["data"] = ripple::serializeHex(sle); entry["data"] = ripple::serializeHex(sle);
entry["index"] = ripple::to_string(sle.key()); entry["index"] = ripple::to_string(sle.key());
objects.push_back(entry); objects.push_back(std::move(entry));
} }
else else
objects.push_back(toJson(sle)); objects.push_back(toJson(sle));
} }
response["state"] = objects; response["state"] = std::move(objects);
auto end2 = std::chrono::system_clock::now();
time = std::chrono::duration_cast<std::chrono::microseconds>(end2 - end)
.count();
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " number of results = " << results.size()
<< " serialized in " << time << " microseconds";
return response; return response;
} }

View File

@@ -503,7 +503,7 @@ async def ledger_data_full(ip, port, ledger, binary, limit, typ=None, count=-1):
else: else:
await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"cursor":marker, "marker":marker,"binary":bool(binary), "limit":int(limit)})) await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"cursor":marker, "marker":marker,"binary":bool(binary), "limit":int(limit),"out_of_order":True}))
res = json.loads(await ws.recv()) res = json.loads(await ws.recv())

View File

@@ -804,7 +804,7 @@ TEST(BackendTest, Basic)
{ {
uint32_t limit = 10; uint32_t limit = 10;
page = backend->fetchLedgerPage( page = backend->fetchLedgerPage(
page.cursor, seq, limit, yield); page.cursor, seq, limit, false, yield);
std::cout << "fetched a page " << page.objects.size() std::cout << "fetched a page " << page.objects.size()
<< std::endl; << std::endl;
if (page.cursor) if (page.cursor)
@@ -2186,7 +2186,7 @@ TEST(Backend, cacheIntegration)
{ {
uint32_t limit = 10; uint32_t limit = 10;
page = backend->fetchLedgerPage( page = backend->fetchLedgerPage(
page.cursor, seq, limit, yield); page.cursor, seq, limit, false, yield);
std::cout << "fetched a page " << page.objects.size() std::cout << "fetched a page " << page.objects.size()
<< std::endl; << std::endl;
if (page.cursor) if (page.cursor)