writes without threadsafequeue

This commit is contained in:
natenichols
2021-11-23 12:44:37 -06:00
committed by CJ Cobb
parent cd9a321da6
commit 942754f4f0
4 changed files with 15 additions and 64 deletions

View File

@@ -484,7 +484,6 @@ class AsyncCallData
grpc::Status status_; grpc::Status status_;
unsigned char nextPrefix_; unsigned char nextPrefix_;
public: public:
AsyncCallData( AsyncCallData(
uint32_t seq, uint32_t seq,
@@ -522,7 +521,7 @@ public:
process( process(
std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>& stub, std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>& stub,
grpc::CompletionQueue& cq, grpc::CompletionQueue& cq,
ThreadSafeQueue<std::shared_ptr<ripple::SLE>>& queue, BackendInterface const& backend,
bool abort = false) bool abort = false)
{ {
BOOST_LOG_TRIVIAL(debug) << "Processing calldata"; BOOST_LOG_TRIVIAL(debug) << "Processing calldata";
@@ -566,15 +565,12 @@ public:
call(stub, cq); call(stub, cq);
} }
for (auto& obj : cur_->ledger_objects().objects()) for (auto& obj : *(cur_->mutable_ledger_objects()->mutable_objects()))
{ {
auto key = ripple::uint256::fromVoid(obj.key().data()); backend.writeLedgerObject(
auto& data = obj.data(); std::move(*obj.mutable_key()),
request_.ledger().sequence(),
ripple::SerialIter it{data.data(), data.size()}; std::move(*obj.mutable_data()));
std::shared_ptr<ripple::SLE> sle = std::make_shared<ripple::SLE>(it, key);
queue.push(sle);
} }
return more ? CallStatus::MORE : CallStatus::DONE; return more ? CallStatus::MORE : CallStatus::DONE;
@@ -610,8 +606,7 @@ template <class Derived>
bool bool
ETLSourceImpl<Derived>::loadInitialLedger( ETLSourceImpl<Derived>::loadInitialLedger(
uint32_t sequence, uint32_t sequence,
uint32_t numMarkers, uint32_t numMarkers)
ThreadSafeQueue<std::shared_ptr<ripple::SLE>>& writeQueue)
{ {
if (!stub_) if (!stub_)
return false; return false;
@@ -658,7 +653,7 @@ ETLSourceImpl<Derived>::loadInitialLedger(
{ {
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)
<< "Marker prefix = " << ptr->getMarkerPrefix(); << "Marker prefix = " << ptr->getMarkerPrefix();
auto result = ptr->process(stub_, cq, writeQueue, abort); auto result = ptr->process(stub_, cq, *backend_, abort);
if (result != AsyncCallData::CallStatus::MORE) if (result != AsyncCallData::CallStatus::MORE)
{ {
numFinished++; numFinished++;
@@ -739,14 +734,12 @@ ETLLoadBalancer::ETLLoadBalancer(
} }
void void
ETLLoadBalancer::loadInitialLedger( ETLLoadBalancer::loadInitialLedger(uint32_t sequence)
uint32_t sequence,
ThreadSafeQueue<std::shared_ptr<ripple::SLE>>& writeQueue)
{ {
execute( execute(
[this, &sequence, &writeQueue](auto& source) { [this, &sequence](auto& source) {
bool res = bool res =
source->loadInitialLedger(sequence, downloadRanges_, writeQueue); source->loadInitialLedger(sequence, downloadRanges_);
if (!res) if (!res)
{ {
BOOST_LOG_TRIVIAL(error) << "Failed to download initial ledger." BOOST_LOG_TRIVIAL(error) << "Failed to download initial ledger."

View File

@@ -50,8 +50,7 @@ public:
virtual bool virtual bool
loadInitialLedger( loadInitialLedger(
uint32_t sequence, uint32_t sequence,
std::uint32_t numMarkers, std::uint32_t numMarkers) = 0;
ThreadSafeQueue<std::shared_ptr<ripple::SLE>>& writeQueue) = 0;
virtual std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub> virtual std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>
getRippledForwardingStub() const = 0; getRippledForwardingStub() const = 0;
@@ -285,8 +284,7 @@ public:
bool bool
loadInitialLedger( loadInitialLedger(
std::uint32_t ledgerSequence, std::uint32_t ledgerSequence,
std::uint32_t numMarkers, std::uint32_t numMarkers) override;
ThreadSafeQueue<std::shared_ptr<ripple::SLE>>& writeQueue) override;
/// Attempt to reconnect to the ETL source /// Attempt to reconnect to the ETL source
void void
@@ -504,11 +502,8 @@ public:
/// Load the initial ledger, writing data to the queue /// Load the initial ledger, writing data to the queue
/// @param sequence sequence of ledger to download /// @param sequence sequence of ledger to download
/// @param writeQueue queue to push downloaded data to
void void
loadInitialLedger( loadInitialLedger(uint32_t sequence);
uint32_t sequence,
ThreadSafeQueue<std::shared_ptr<ripple::SLE>>& writeQueue);
/// Fetch data for a specific ledger. This function will continuously try /// Fetch data for a specific ledger. This function will continuously try
/// to fetch data for the specified ledger until the fetch succeeds, the /// to fetch data for the specified ledger until the fetch succeeds, the

View File

@@ -67,27 +67,6 @@ ReportingETL::insertTransactions(
return accountTxData; return accountTxData;
} }
void
ReportingETL::consumeLedgerData(
std::uint32_t sequence,
ThreadSafeQueue<std::shared_ptr<ripple::SLE>>& writeQueue)
{
std::shared_ptr<ripple::SLE> sle;
size_t num = 0;
while (!stopping_ && (sle = writeQueue.pop()))
{
ripple::Serializer s;
sle->add(s);
std::string bytes{s.peekData().data(), s.peekData().data() + s.getLength()};
std::string key{sle->key().data(), sle->key().data() + sle->key().size()};
backend_->writeLedgerObject(
std::move(key),
sequence,
std::move(bytes));
}
}
std::optional<ripple::LedgerInfo> std::optional<ripple::LedgerInfo>
ReportingETL::loadInitialLedger(uint32_t startingSequence) ReportingETL::loadInitialLedger(uint32_t startingSequence)
{ {
@@ -127,22 +106,12 @@ ReportingETL::loadInitialLedger(uint32_t startingSequence)
insertTransactions(lgrInfo, *ledgerData); insertTransactions(lgrInfo, *ledgerData);
BOOST_LOG_TRIVIAL(debug) << __func__ << " inserted txns"; BOOST_LOG_TRIVIAL(debug) << __func__ << " inserted txns";
ThreadSafeQueue<std::shared_ptr<ripple::SLE>> writeQueue;
std::thread asyncWriter{[this, &startingSequence, &writeQueue]() {
consumeLedgerData(startingSequence, writeQueue);
}};
// download the full account state map. This function downloads full ledger // download the full account state map. This function downloads full ledger
// data and pushes the downloaded data into the writeQueue. asyncWriter // data and pushes the downloaded data into the writeQueue. asyncWriter
// consumes from the queue and inserts the data into the Ledger object. // consumes from the queue and inserts the data into the Ledger object.
// Once the below call returns, all data has been pushed into the queue // Once the below call returns, all data has been pushed into the queue
loadBalancer_->loadInitialLedger(startingSequence, writeQueue); loadBalancer_->loadInitialLedger(startingSequence);
// null is used to respresent the end of the queue
std::shared_ptr<ripple::SLE> null;
writeQueue.push({});
// wait for the writer to finish
asyncWriter.join();
BOOST_LOG_TRIVIAL(debug) << __func__ << " loaded initial ledger"; BOOST_LOG_TRIVIAL(debug) << __func__ << " loaded initial ledger";
if (!stopping_) if (!stopping_)

View File

@@ -138,12 +138,6 @@ private:
lastPublish_ = std::chrono::system_clock::now(); lastPublish_ = std::chrono::system_clock::now();
} }
void
consumeLedgerData(
std::uint32_t sequence,
ThreadSafeQueue<std::shared_ptr<ripple::SLE>>& writeQueue);
/// Download a ledger with specified sequence in full, via GetLedgerData, /// Download a ledger with specified sequence in full, via GetLedgerData,
/// and write the data to the databases. This takes several minutes or /// and write the data to the databases. This takes several minutes or
/// longer. /// longer.