fix books table

This commit is contained in:
CJ Cobb
2021-01-27 15:39:31 -05:00
parent f8185d8b8a
commit b3731f54e0
7 changed files with 97 additions and 34 deletions

View File

@@ -93,6 +93,7 @@ doBookOffers(
CassandraFlatMapBackend const& backend, CassandraFlatMapBackend const& backend,
std::shared_ptr<PgPool>& pool) std::shared_ptr<PgPool>& pool)
{ {
std::cout << "enter" << std::endl;
boost::json::object response; boost::json::object response;
auto sequence = ledgerSequenceFromRequest(request, pool); auto sequence = ledgerSequenceFromRequest(request, pool);
@@ -292,7 +293,8 @@ doBookOffers(
} }
std::uint32_t limit = 2048; std::uint32_t limit = 2048;
if (request.at("limit").kind() == boost::json::kind::int64) if (request.contains("limit") and
request.at("limit").kind() == boost::json::kind::int64)
limit = request.at("limit").as_int64(); limit = request.at("limit").as_int64();
ripple::Book book = { ripple::Book book = {

View File

@@ -60,7 +60,12 @@ getBook(std::string const& offer)
{ {
ripple::SerialIter it{offer.data(), offer.size()}; ripple::SerialIter it{offer.data(), offer.size()};
ripple::SLE sle{it, {}}; ripple::SLE sle{it, {}};
return sle.getFieldH256(ripple::sfBookDirectory); ripple::uint256 book = sle.getFieldH256(ripple::sfBookDirectory);
for (size_t i = 0; i < 8; ++i)
{
book.data()[book.size() - 1 - i] = 0x00;
}
return book;
} }
/// Write new ledger and transaction data to Postgres /// Write new ledger and transaction data to Postgres

View File

@@ -452,6 +452,10 @@ public:
ripple::SerialIter it{obj.data().data(), obj.data().size()}; ripple::SerialIter it{obj.data().data(), obj.data().size()};
ripple::SLE sle{it, {}}; ripple::SLE sle{it, {}};
book = sle.getFieldH256(ripple::sfBookDirectory); book = sle.getFieldH256(ripple::sfBookDirectory);
for (size_t i = 0; i < 8; ++i)
{
book->data()[book->size() - 1 - i] = 0x00;
}
} }
backend.store( backend.store(
std::move(*obj.mutable_key()), std::move(*obj.mutable_key()),
@@ -571,8 +575,7 @@ ETLSource::fetchLedger(uint32_t ledgerSequence, bool getObjects)
<< "ETLSource::fetchLedger - is_unlimited is " << "ETLSource::fetchLedger - is_unlimited is "
"false. Make sure secure_gateway is set " "false. Make sure secure_gateway is set "
"correctly on the ETL source. source = " "correctly on the ETL source. source = "
<< toString() << " response = " << response.DebugString() << toString() << " status = " << status.error_message();
<< " status = " << status.error_message();
assert(false); assert(false);
} }
return {status, std::move(response)}; return {status, std::move(response)};

View File

@@ -321,8 +321,10 @@ flatMapReadObjectCallback(CassFuture* fut, void* cbData)
if (!row) if (!row)
{ {
cass_result_free(res); cass_result_free(res);
BOOST_LOG_TRIVIAL(error) << "Cassandra fetch get row error : " << rc BOOST_LOG_TRIVIAL(error)
<< ", " << cass_error_desc(rc); << "Cassandra fetch get row error : " << rc << ", "
<< cass_error_desc(rc)
<< " key = " << ripple::strHex(requestParams.key);
finish(); finish();
return; return;
} }

View File

@@ -94,6 +94,7 @@ private:
const CassPrepared* getCreated_ = nullptr; const CassPrepared* getCreated_ = nullptr;
const CassPrepared* getBook_ = nullptr; const CassPrepared* getBook_ = nullptr;
const CassPrepared* insertBook_ = nullptr; const CassPrepared* insertBook_ = nullptr;
const CassPrepared* deleteBook_ = nullptr;
// io_context used for exponential backoff for write retries // io_context used for exponential backoff for write retries
mutable boost::asio::io_context ioContext_; mutable boost::asio::io_context ioContext_;
@@ -485,8 +486,8 @@ public:
query = {}; query = {};
query << "CREATE TABLE IF NOT EXISTS " << tableName << "books" query << "CREATE TABLE IF NOT EXISTS " << tableName << "books"
<< " ( book blob, sequence bigint, key blob, deleted_at " << " ( book blob, sequence bigint, key blob, deleted_at "
"bigint static, PRIMARY KEY " "bigint, PRIMARY KEY "
"(book, sequence, key))"; "(book, key))";
statement = makeStatement(query.str().c_str(), 0); statement = makeStatement(query.str().c_str(), 0);
fut = cass_session_execute(session_.get(), statement); fut = cass_session_execute(session_.get(), statement);
rc = cass_future_error_code(fut); rc = cass_future_error_code(fut);
@@ -618,7 +619,7 @@ public:
query = {}; query = {};
query << "INSERT INTO " << tableName << "books" query << "INSERT INTO " << tableName << "books"
<< " (book, sequence, key, deleted_at) VALUES (?, ?, ?, ?)"; << " (book, key, sequence, deleted_at) VALUES (?, ?, ?, ?)";
prepare_future = prepare_future =
cass_session_prepare(session_.get(), query.str().c_str()); cass_session_prepare(session_.get(), query.str().c_str());
@@ -640,6 +641,31 @@ public:
/* Get the prepared object from the future */ /* Get the prepared object from the future */
insertBook_ = cass_future_get_prepared(prepare_future); insertBook_ = cass_future_get_prepared(prepare_future);
cass_future_free(prepare_future); cass_future_free(prepare_future);
query = {};
query << "INSERT INTO " << tableName << "books"
<< " (book, key, deleted_at) VALUES (?, ?, ?)";
prepare_future =
cass_session_prepare(session_.get(), query.str().c_str());
/* Wait for the statement to prepare and get the result */
rc = cass_future_error_code(prepare_future);
if (rc != CASS_OK)
{
/* Handle error */
cass_future_free(prepare_future);
std::stringstream ss;
ss << "nodestore: Error preparing insert : " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << ss.str();
continue;
}
/* Get the prepared object from the future */
deleteBook_ = cass_future_get_prepared(prepare_future);
cass_future_free(prepare_future);
query = {}; query = {};
query << "SELECT created FROM " << tableName << "keys" query << "SELECT created FROM " << tableName << "keys"
@@ -873,6 +899,16 @@ public:
cass_prepared_free(getBook_); cass_prepared_free(getBook_);
getBook_ = nullptr; getBook_ = nullptr;
} }
if (insertBook_)
{
cass_prepared_free(insertBook_);
insertBook_ = nullptr;
}
if (deleteBook_)
{
cass_prepared_free(deleteBook_);
deleteBook_ = nullptr;
}
work_.reset(); work_.reset();
ioThread_.join(); ioThread_.join();
} }
@@ -1209,7 +1245,7 @@ public:
doBookOffers(ripple::uint256 const& book, uint32_t sequence) const doBookOffers(ripple::uint256 const& book, uint32_t sequence) const
{ {
BOOST_LOG_TRIVIAL(debug) << "Starting doBookOffers"; BOOST_LOG_TRIVIAL(debug) << "Starting doBookOffers";
CassStatement* statement = cass_prepared_bind(upperBound_); CassStatement* statement = cass_prepared_bind(getBook_);
cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
CassError rc = cass_statement_bind_bytes( CassError rc = cass_statement_bind_bytes(
statement, 0, static_cast<cass_byte_t const*>(book.data()), 32); statement, 0, static_cast<cass_byte_t const*>(book.data()), 32);
@@ -1284,9 +1320,10 @@ public:
BOOST_LOG_TRIVIAL(warning) << ss.str(); BOOST_LOG_TRIVIAL(warning) << ss.str();
} }
keys.push_back(ripple::uint256::fromVoid(outData)); keys.push_back(ripple::uint256::fromVoid(outData));
std::cout << ripple::strHex(keys.back()) << std::endl;
} }
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)
<< "doUpperBound - populated keys. num keys = " << keys.size(); << "doBookOffers - populated keys. num keys = " << keys.size();
if (keys.size()) if (keys.size())
{ {
std::vector<LedgerObject> results; std::vector<LedgerObject> results;
@@ -1766,6 +1803,10 @@ public:
void void
writeBook(WriteCallbackData& data, bool isRetry) const writeBook(WriteCallbackData& data, bool isRetry) const
{ {
assert(data.isCreated or data.isDeleted);
if (!data.isCreated and !data.isDeleted)
throw std::runtime_error(
"writing book that's neither created or deleted");
{ {
std::unique_lock<std::mutex> lck(throttleMutex_); std::unique_lock<std::mutex> lck(throttleMutex_);
if (!isRetry && numRequestsOutstanding_ > maxRequestsOutstanding) if (!isRetry && numRequestsOutstanding_ > maxRequestsOutstanding)
@@ -1779,7 +1820,11 @@ public:
}); });
} }
} }
CassStatement* statement = cass_prepared_bind(insertBook_); CassStatement* statement = nullptr;
if (data.isCreated)
statement = cass_prepared_bind(insertBook_);
else if (data.isDeleted)
statement = cass_prepared_bind(deleteBook_);
cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM);
const unsigned char* bookData = (unsigned char*)data.book->data(); const unsigned char* bookData = (unsigned char*)data.book->data();
CassError rc = cass_statement_bind_bytes( CassError rc = cass_statement_bind_bytes(
@@ -1796,21 +1841,10 @@ public:
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
throw std::runtime_error(ss.str()); throw std::runtime_error(ss.str());
} }
rc = cass_statement_bind_int64(
statement, 1, (data.isCreated ? data.sequence : 0));
if (rc != CASS_OK)
{
cass_statement_free(statement);
std::stringstream ss;
ss << "binding cassandra insert object: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
throw std::runtime_error(ss.str());
}
const unsigned char* keyData = (unsigned char*)data.key.data(); const unsigned char* keyData = (unsigned char*)data.key.data();
rc = cass_statement_bind_bytes( rc = cass_statement_bind_bytes(
statement, statement,
2, 1,
static_cast<cass_byte_t const*>(keyData), static_cast<cass_byte_t const*>(keyData),
data.key.size()); data.key.size());
if (rc != CASS_OK) if (rc != CASS_OK)
@@ -1822,8 +1856,19 @@ public:
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
throw std::runtime_error(ss.str()); throw std::runtime_error(ss.str());
} }
rc = cass_statement_bind_int64( rc = cass_statement_bind_int64(statement, 2, data.sequence);
statement, 3, (data.isDeleted ? data.sequence : INT64_MAX)); if (rc != CASS_OK)
{
cass_statement_free(statement);
std::stringstream ss;
ss << "binding cassandra insert object: " << rc << ", "
<< cass_error_desc(rc);
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
throw std::runtime_error(ss.str());
}
if (data.isCreated)
{
rc = cass_statement_bind_int64(statement, 3, INT64_MAX);
if (rc != CASS_OK) if (rc != CASS_OK)
{ {
cass_statement_free(statement); cass_statement_free(statement);
@@ -1834,6 +1879,7 @@ public:
BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str();
throw std::runtime_error(ss.str()); throw std::runtime_error(ss.str());
} }
}
CassFuture* fut = cass_session_execute(session_.get(), statement); CassFuture* fut = cass_session_execute(session_.get(), statement);
cass_statement_free(statement); cass_statement_free(statement);
@@ -1850,7 +1896,7 @@ public:
bool isDeleted, bool isDeleted,
std::optional<ripple::uint256>&& book) const std::optional<ripple::uint256>&& book) const
{ {
BOOST_LOG_TRIVIAL(info) << "Writing ledger object to cassandra"; BOOST_LOG_TRIVIAL(trace) << "Writing ledger object to cassandra";
WriteCallbackData* data = new WriteCallbackData( WriteCallbackData* data = new WriteCallbackData(
this, this,
std::move(key), std::move(key),

View File

@@ -295,7 +295,7 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
isDeleted = true; isDeleted = true;
std::optional<ripple::uint256> bookDir; std::optional<ripple::uint256> bookDir;
if (obj.mod_type() != org::xrpl::rpc::v1::RawLedgerObject::DELETED) if (isCreated)
{ {
if (isOffer(obj.data())) if (isOffer(obj.data()))
bookDir = getBook(obj.data()); bookDir = getBook(obj.data());
@@ -304,6 +304,10 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
{ {
bookDir = bookDir =
ripple::uint256::fromVoid(obj.book_of_deleted_offer().data()); ripple::uint256::fromVoid(obj.book_of_deleted_offer().data());
for (size_t i = 0; i < 8; ++i)
{
bookDir->data()[bookDir->size() - 1 - i] = 0x00;
}
} }
assert(not(isCreated and isDeleted)); assert(not(isCreated and isDeleted));
@@ -335,6 +339,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence)
* Behold, mortals! This function spawns three separate threads, which talk * Behold, mortals! This function spawns three separate threads, which talk
* to each other via 2 different thread safe queues and 1 atomic variable. * to each other via 2 different thread safe queues and 1 atomic variable.
* All threads and queues are function local. This function returns when all * All threads and queues are function local. This function returns when all
*
* of the threads exit. There are two termination conditions: the first is * of the threads exit. There are two termination conditions: the first is
* if the load thread encounters a write conflict. In this case, the load * if the load thread encounters a write conflict. In this case, the load
* thread sets writeConflict, an atomic bool, to true, which signals the * thread sets writeConflict, an atomic bool, to true, which signals the

10
test.py
View File

@@ -93,10 +93,10 @@ async def book_offers(ip, port, ledger, pay_currency, pay_issuer, get_currency,
try: try:
async with websockets.connect(address) as ws: async with websockets.connect(address) as ws:
taker_gets = json.loads("{\"currency\":" + get_currency+"}") taker_gets = json.loads("{\"currency\":\"" + get_currency+"\"}")
if get_issuer is not None: if get_issuer is not None:
taker_gets["issuer"] = get_issuer taker_gets["issuer"] = get_issuer
taker_pays = json.loads("{\"currency\":" + pay_currency + "}") taker_pays = json.loads("{\"currency\":\"" + pay_currency + "\"}")
if pay_issuer is not None: if pay_issuer is not None:
taker_pays["issuer"] = pay_issuer taker_pays["issuer"] = pay_issuer
@@ -116,9 +116,9 @@ parser.add_argument('--hash')
parser.add_argument('--account', default="rLC64xxNif3GiY9FQnbaM4kcE6VvDhwRod") parser.add_argument('--account', default="rLC64xxNif3GiY9FQnbaM4kcE6VvDhwRod")
parser.add_argument('--ledger') parser.add_argument('--ledger')
parser.add_argument('--limit', default='200') parser.add_argument('--limit', default='200')
paresr.add_argument('--taker_pays_issuer') parser.add_argument('--taker_pays_issuer')
parser.add_argument('--taker_pays_currency') parser.add_argument('--taker_pays_currency')
paresr.add_argument('--taker_gets_issuer') parser.add_argument('--taker_gets_issuer')
parser.add_argument('--taker_gets_currency') parser.add_argument('--taker_gets_currency')
@@ -145,7 +145,7 @@ def run(args):
ledger_data_full(args.ip, args.port, args.ledger)) ledger_data_full(args.ip, args.port, args.ledger))
elif args.action == "book_offers": elif args.action == "book_offers":
asyncio.get_event_loop().run_until_complete( asyncio.get_event_loop().run_until_complete(
book_offers(args.ip, args.port, args.ledger, args.taker_pays_currency, args.taker_pays_issuer, args.taker_gets_currency, args.taker_gets_issuer); book_offers(args.ip, args.port, args.ledger, args.taker_pays_currency, args.taker_pays_issuer, args.taker_gets_currency, args.taker_gets_issuer))
else: else:
print("incorrect arguments") print("incorrect arguments")