postgres writes

This commit is contained in:
CJ Cobb
2021-03-01 11:53:53 -05:00
parent 009688175e
commit 6839538b3c
3 changed files with 42 additions and 28 deletions

View File

@@ -743,9 +743,10 @@ CREATE TABLE IF NOT EXISTS ledgers (
CREATE TABLE IF NOT EXISTS objects ( CREATE TABLE IF NOT EXISTS objects (
key bytea PRIMARY KEY, key bytea NOT NULL,
ledger_seq bigint NOT NULL, ledger_seq bigint NOT NULL,
object bytea NOT NULL object bytea NOT NULL,
PRIMARY KEY(key, ledger_seq)
); );
-- Index for lookups by ledger hash. -- Index for lookups by ledger hash.
@@ -759,11 +760,7 @@ CREATE TABLE IF NOT EXISTS transactions (
ledger_seq bigint, ledger_seq bigint,
transaction bytea, transaction bytea,
metadata bytea, metadata bytea,
transaction_index bigint NOT NULL, FOREIGN KEY (ledger_seq)
trans_id bytea NOT NULL,
nodestore_hash bytea NOT NULL,
constraint transactions_pkey PRIMARY KEY (ledger_seq, transaction_index),
constraint transactions_fkey FOREIGN KEY (ledger_seq)
REFERENCES ledgers (ledger_seq) ON DELETE CASCADE REFERENCES ledgers (ledger_seq) ON DELETE CASCADE
); );

View File

@@ -138,6 +138,9 @@ private:
boost::json::object config_; boost::json::object config_;
mutable uint32_t ledgerSequence_ = 0;
mutable bool isFirstLedger_ = false;
public: public:
CassandraFlatMapBackend(boost::json::object const& config) : config_(config) CassandraFlatMapBackend(boost::json::object const& config) : config_(config)
{ {
@@ -410,33 +413,22 @@ public:
{ {
} }
}; };
bool bool
writeLedger( finishWrites() const override
ripple::LedgerInfo const& ledgerInfo,
std::string&& header,
bool isFirst = false) const override
{ {
WriteLedgerHeaderCallbackData* headerCb =
new WriteLedgerHeaderCallbackData(
this, ledgerInfo.seq, std::move(header));
WriteLedgerHashCallbackData* hashCb = new WriteLedgerHashCallbackData(
this, ledgerInfo.hash, ledgerInfo.seq);
++numRequestsOutstanding_;
++numRequestsOutstanding_;
writeLedgerHeader(*headerCb, false);
writeLedgerHash(*hashCb, false);
// wait for all other writes to finish // wait for all other writes to finish
sync(); sync();
// write range // write range
if (isFirst) if (isFirstLedger_)
{ {
CassStatement* statement = cass_prepared_bind(updateLedgerRange_); CassStatement* statement = cass_prepared_bind(updateLedgerRange_);
cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
CassError rc = CassError rc =
cass_statement_bind_int64(statement, 0, ledgerInfo.seq); cass_statement_bind_int64(statement, 0, ledgerSequence_);
rc = cass_statement_bind_bool(statement, 1, cass_false); rc = cass_statement_bind_bool(statement, 1, cass_false);
rc = cass_statement_bind_int64(statement, 2, ledgerInfo.seq); rc = cass_statement_bind_int64(statement, 2, ledgerSequence_);
CassFuture* fut; CassFuture* fut;
do do
{ {
@@ -456,11 +448,11 @@ public:
CassStatement* statement = cass_prepared_bind(updateLedgerRange_); CassStatement* statement = cass_prepared_bind(updateLedgerRange_);
cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
// TODO check rc // TODO check rc
CassError rc = cass_statement_bind_int64(statement, 0, ledgerInfo.seq); CassError rc = cass_statement_bind_int64(statement, 0, ledgerSequence_);
assert(rc == CASS_OK); assert(rc == CASS_OK);
rc = cass_statement_bind_bool(statement, 1, cass_true); rc = cass_statement_bind_bool(statement, 1, cass_true);
assert(rc == CASS_OK); assert(rc == CASS_OK);
rc = cass_statement_bind_int64(statement, 2, ledgerInfo.seq); rc = cass_statement_bind_int64(statement, 2, ledgerSequence_);
assert(rc == CASS_OK); assert(rc == CASS_OK);
CassFuture* fut; CassFuture* fut;
do do
@@ -500,6 +492,22 @@ public:
return success == cass_true; return success == cass_true;
} }
void void
writeLedger(
ripple::LedgerInfo const& ledgerInfo,
std::string&& header,
bool isFirst = false) const override
{
WriteLedgerHeaderCallbackData* headerCb =
new WriteLedgerHeaderCallbackData(
this, ledgerInfo.seq, std::move(header));
WriteLedgerHashCallbackData* hashCb = new WriteLedgerHashCallbackData(
this, ledgerInfo.hash, ledgerInfo.seq);
++numRequestsOutstanding_;
++numRequestsOutstanding_;
writeLedgerHeader(*headerCb, false);
writeLedgerHash(*hashCb, false);
}
void
writeLedgerHash(WriteLedgerHashCallbackData& cb, bool isRetry) const writeLedgerHash(WriteLedgerHashCallbackData& cb, bool isRetry) const
{ {
{ {
@@ -1958,7 +1966,12 @@ public:
} }
void void
sync() const override startWrites() const override
{
}
void
sync() const
{ {
std::unique_lock<std::mutex> lck(syncMutex_); std::unique_lock<std::mutex> lck(syncMutex_);

View File

@@ -114,6 +114,7 @@ ReportingETL::loadInitialLedger(uint32_t startingSequence)
<< __func__ << " : " << __func__ << " : "
<< "Deserialized ledger header. " << detail::toString(lgrInfo); << "Deserialized ledger header. " << detail::toString(lgrInfo);
flatMapBackend_->startWrites();
std::vector<AccountTransactionsData> accountTxData = std::vector<AccountTransactionsData> accountTxData =
insertTransactions(lgrInfo, *ledgerData); insertTransactions(lgrInfo, *ledgerData);
@@ -128,9 +129,10 @@ ReportingETL::loadInitialLedger(uint32_t startingSequence)
if (!stopping_) if (!stopping_)
{ {
flatMapBackend_->writeAccountTransactions(std::move(accountTxData)); flatMapBackend_->writeAccountTransactions(std::move(accountTxData));
bool success = flatMapBackend_->writeLedger( flatMapBackend_->writeLedger(
lgrInfo, std::move(*ledgerData->mutable_ledger_header())); lgrInfo, std::move(*ledgerData->mutable_ledger_header()));
} }
flatMapBackend_->finishWrites();
auto end = std::chrono::system_clock::now(); auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(debug) << "Time to download and store ledger = " BOOST_LOG_TRIVIAL(debug) << "Time to download and store ledger = "
<< ((end - start).count()) / 1000000000.0; << ((end - start).count()) / 1000000000.0;
@@ -254,6 +256,7 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : " << __func__ << " : "
<< "Deserialized ledger header. " << detail::toString(lgrInfo); << "Deserialized ledger header. " << detail::toString(lgrInfo);
flatMapBackend_->startWrites();
std::vector<AccountTransactionsData> accountTxData{ std::vector<AccountTransactionsData> accountTxData{
insertTransactions(lgrInfo, rawData)}; insertTransactions(lgrInfo, rawData)};
@@ -299,8 +302,9 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
std::move(bookDir)); std::move(bookDir));
} }
flatMapBackend_->writeAccountTransactions(std::move(accountTxData)); flatMapBackend_->writeAccountTransactions(std::move(accountTxData));
bool success = flatMapBackend_->writeLedger( flatMapBackend_->writeLedger(
lgrInfo, std::move(*rawData.mutable_ledger_header())); lgrInfo, std::move(*rawData.mutable_ledger_header()));
bool success = flatMapBackend_->finishWrites();
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : " << __func__ << " : "
<< "Inserted/modified/deleted all objects. Number of objects = " << "Inserted/modified/deleted all objects. Number of objects = "