20 #include <ripple/app/ledger/AcceptedLedger.h>
21 #include <ripple/app/ledger/LedgerMaster.h>
22 #include <ripple/app/ledger/LedgerToJson.h>
23 #include <ripple/app/ledger/TransactionMaster.h>
24 #include <ripple/app/main/Application.h>
25 #include <ripple/app/misc/Manifest.h>
26 #include <ripple/app/misc/impl/AccountTxPaging.h>
27 #include <ripple/app/rdb/backend/PostgresDatabase.h>
28 #include <ripple/app/rdb/backend/detail/Node.h>
29 #include <ripple/basics/BasicConfig.h>
30 #include <ripple/basics/StringUtilities.h>
31 #include <ripple/core/DatabaseCon.h>
32 #include <ripple/core/Pg.h>
33 #include <ripple/core/SociDB.h>
34 #include <ripple/json/json_reader.h>
35 #include <ripple/json/to_string.h>
36 #include <ripple/nodestore/DatabaseShard.h>
37 #include <boost/algorithm/string.hpp>
38 #include <boost/range/adaptor/transformed.hpp>
39 #include <soci/sqlite3/soci-sqlite3.h>
57 ,
j_(
app_.journal(
"PgPool"))
59 #ifdef RIPPLED_REPORTING
60 make_PgPool(config.section(
"ledger_tx_tables"),
j_)
65 #ifdef RIPPLED_REPORTING
76 #ifdef RIPPLED_REPORTING
170 #ifdef RIPPLED_REPORTING
171 auto log = app.
journal(
"Ledger");
174 sql <<
"SELECT ledger_hash, prev_hash, account_set_hash, trans_set_hash, "
175 "total_coins, closing_time, prev_closing_time, close_time_res, "
176 "close_flags, ledger_seq FROM ledgers ";
178 if (
auto ledgerSeq = std::get_if<uint32_t>(&whichLedger))
182 else if (
auto ledgerHash = std::get_if<uint256>(&whichLedger))
184 sql << (
"WHERE ledger_hash = \'\\x" +
strHex(*ledgerHash) +
"\'");
196 sql << (
"ORDER BY ledger_seq desc LIMIT 1");
200 JLOG(log.trace()) << __func__ <<
" : sql = " << sql.
str();
202 auto res = PgQuery(pgPool)(sql.
str().data());
205 JLOG(log.error()) << __func__ <<
" : Postgres response is null - sql = "
210 else if (res.status() != PGRES_TUPLES_OK)
212 JLOG(log.error()) << __func__
213 <<
" : Postgres response should have been "
214 "PGRES_TUPLES_OK but instead was "
215 << res.status() <<
" - msg = " << res.msg()
216 <<
" - sql = " << sql.
str();
221 JLOG(log.trace()) << __func__ <<
" Postgres result msg : " << res.msg();
223 if (res.isNull() || res.ntuples() == 0)
225 JLOG(log.debug()) << __func__
226 <<
" : Ledger not found. sql = " << sql.
str();
229 else if (res.ntuples() > 0)
231 if (res.nfields() != 10)
233 JLOG(log.error()) << __func__
234 <<
" : Wrong number of fields in Postgres "
235 "response. Expected 10, but got "
236 << res.nfields() <<
" . sql = " << sql.
str();
242 for (
size_t i = 0; i < res.ntuples(); ++i)
244 char const* hash = res.c_str(i, 0);
245 char const* prevHash = res.c_str(i, 1);
246 char const* accountHash = res.c_str(i, 2);
247 char const* txHash = res.c_str(i, 3);
255 JLOG(log.trace()) << __func__ <<
" - Postgres response = " << hash
256 <<
" , " << prevHash <<
" , " << accountHash <<
" , "
257 << txHash <<
" , " << totalCoins <<
", " << closeTime
258 <<
", " << parentCloseTime <<
", " << closeTimeRes
259 <<
", " << closeFlags <<
", " << ledgerSeq
260 <<
" - sql = " << sql.
str();
261 JLOG(log.debug()) << __func__
262 <<
" - Successfully fetched ledger with sequence = "
263 << ledgerSeq <<
" from Postgres";
275 info.
drops = totalCoins;
276 info.
closeTime = time_point{duration{closeTime}};
280 info.
seq = ledgerSeq;
307 [&infos, &app, &pgPool](
auto&& arg) {
311 assert(infos.
size() <= 1);
317 #ifdef RIPPLED_REPORTING
321 JLOG(j.
debug()) << __func__;
322 auto cmd = boost::format(
323 R
"(INSERT INTO ledgers
324 VALUES (%u,'\x%s', '\x%s',%u,%u,%u,%u,%u,'\x%s','\x%s'))");
326 auto ledgerInsert = boost::str(
327 cmd % info.seq %
strHex(info.hash) %
strHex(info.parentHash) %
328 info.drops.drops() % info.closeTime.time_since_epoch().count() %
329 info.parentCloseTime.time_since_epoch().count() %
330 info.closeTimeResolution.count() % info.closeFlags %
332 JLOG(j.
trace()) << __func__ <<
" : "
334 <<
"query string = " << ledgerInsert;
336 auto res = pgQuery(ledgerInsert.data());
341 enum class DataFormat { binary, expanded };
350 if (format == DataFormat::binary)
358 for (
size_t i = 0; i < txns.size(); ++i)
360 auto& [txn, meta] = txns[i];
361 if (format == DataFormat::binary)
363 auto& transactions = std::get<TxnsDataBinary>(ret);
364 Serializer txnSer = txn->getSerializer();
365 Serializer metaSer = meta->getSerializer();
367 Blob txnBlob = txnSer.getData();
368 Blob metaBlob = metaSer.getData();
374 auto& transactions = std::get<TxnsData>(ret);
376 auto txnRet = std::make_shared<Transaction>(txn, reason, app);
377 txnRet->setLedger(ledgerSequences[i]);
379 auto txMeta = std::make_shared<TxMeta>(
380 txnRet->getID(), ledgerSequences[i], *meta);
388 processAccountTxStoredProcedureResult(
389 RelationalDatabase::AccountTxArgs
const& args,
395 ret.limit = args.limit;
399 if (result.
isMember(
"transactions"))
403 for (
auto& t : result[
"transactions"])
405 if (t.isMember(
"ledger_seq") && t.isMember(
"nodestore_hash"))
407 uint32_t ledgerSequence = t[
"ledger_seq"].asUInt();
409 t[
"nodestore_hash"].asString();
410 nodestoreHashHex.
erase(0, 2);
412 if (!nodestoreHash.parseHex(nodestoreHashHex))
415 if (nodestoreHash.isNonZero())
417 ledgerSequences.
push_back(ledgerSequence);
418 nodestoreHashes.
push_back(nodestoreHash);
423 return {ret, {
rpcINTERNAL,
"nodestoreHash is zero"}};
429 return {ret, {
rpcINTERNAL,
"missing postgres fields"}};
433 assert(nodestoreHashes.
size() == ledgerSequences.
size());
438 args.binary ? DataFormat::binary : DataFormat::expanded);
440 JLOG(j.
trace()) << __func__ <<
" : processed db results";
442 if (result.isMember(
"marker"))
444 auto& marker = result[
"marker"];
445 assert(marker.isMember(
"ledger"));
446 assert(marker.isMember(
"seq"));
448 marker[
"ledger"].asUInt(), marker[
"seq"].asUInt()};
450 assert(result.isMember(
"ledger_index_min"));
451 assert(result.isMember(
"ledger_index_max"));
453 result[
"ledger_index_min"].asUInt(),
454 result[
"ledger_index_max"].asUInt()};
457 else if (result.isMember(
"error"))
460 << __func__ <<
" : error = " << result[
"error"].asString();
467 return {ret, {
rpcINTERNAL,
"unexpected Postgres response"}};
472 JLOG(j.
debug()) << __func__ <<
" : "
473 <<
"Caught exception : " << e.
what();
482 #ifdef RIPPLED_REPORTING
490 #ifdef RIPPLED_REPORTING
491 auto seq = PgQuery(
pgPool_)(
"SELECT min_ledger()");
494 JLOG(
j_.
error()) <<
"Error querying minimum ledger sequence.";
496 else if (!seq.isNull())
505 #ifdef RIPPLED_REPORTING
506 auto seq = PgQuery(
pgPool_)(
"SELECT max_ledger()");
507 if (seq && !seq.isNull())
508 return seq.asBigInt();
516 #ifdef RIPPLED_REPORTING
517 auto range = PgQuery(
pgPool_)(
"SELECT complete_ledgers()");
519 return range.c_str();
527 using namespace std::chrono_literals;
528 #ifdef RIPPLED_REPORTING
529 auto age = PgQuery(
pgPool_)(
"SELECT age()");
530 if (!age || age.isNull())
531 JLOG(
j_.
debug()) <<
"No ledgers in database";
543 #ifdef RIPPLED_REPORTING
544 JLOG(
j_.
debug()) << __func__ <<
" : "
545 <<
"Beginning write to Postgres";
552 auto res = pg(
"BEGIN");
553 if (!res || res.status() != PGRES_COMMAND_OK)
556 msg <<
"bulkWriteToTable : Postgres insert error: " << res.msg();
557 Throw<std::runtime_error>(msg.
str());
563 if (!writeToLedgersDB(info, pg,
j_))
565 JLOG(
j_.
warn()) << __func__ <<
" : "
566 <<
"Failed to write to ledgers database.";
572 for (
auto const& data : accountTxData)
576 auto idx = data.transactionIndex;
577 auto ledgerSeq = data.ledgerSequence;
581 << txHash <<
'\t' <<
"\\\\x" << nodestoreHash
584 for (
auto const& a : data.accounts)
587 accountTransactionsCopyBuffer
593 pg.bulkInsert(
"transactions", transactionsCopyBuffer.
str());
595 "account_transactions", accountTransactionsCopyBuffer.
str());
598 if (!res || res.status() != PGRES_COMMAND_OK)
601 msg <<
"bulkWriteToTable : Postgres insert error: " << res.msg();
603 Throw<std::runtime_error>(msg.
str());
606 JLOG(
j_.
info()) << __func__ <<
" : "
607 <<
"Successfully wrote to Postgres";
613 <<
"Caught exception writing to Postgres : "
645 assert(infos.size() <= 1);
647 return infos[0].hash;
656 assert(infos.size() <= 1);
671 for (
auto& info : infos)
673 ret[info.seq] = {info.hash, info.parentHash};
683 #ifdef RIPPLED_REPORTING
687 "SELECT nodestore_hash"
688 " FROM transactions "
689 " WHERE ledger_seq = " +
695 JLOG(log.error()) << __func__
696 <<
" : Postgres response is null - query = " << query;
700 else if (res.status() != PGRES_TUPLES_OK)
702 JLOG(log.error()) << __func__
703 <<
" : Postgres response should have been "
704 "PGRES_TUPLES_OK but instead was "
705 << res.status() <<
" - msg = " << res.msg()
706 <<
" - query = " << query;
711 JLOG(log.trace()) << __func__ <<
" Postgres result msg : " << res.msg();
713 if (res.isNull() || res.ntuples() == 0)
715 JLOG(log.debug()) << __func__
716 <<
" : Ledger not found. query = " << query;
719 else if (res.ntuples() > 0)
721 if (res.nfields() != 1)
723 JLOG(log.error()) << __func__
724 <<
" : Wrong number of fields in Postgres "
725 "response. Expected 1, but got "
726 << res.nfields() <<
" . query = " << query;
732 JLOG(log.trace()) << __func__ <<
" : result = " << res.c_str()
733 <<
" : query = " << query;
734 for (
size_t i = 0; i < res.ntuples(); ++i)
736 char const* nodestoreHash = res.
c_str(i, 0);
738 if (!hash.
parseHex(nodestoreHash + 2))
745 return nodestoreHashes;
753 #ifdef RIPPLED_REPORTING
757 Throw<std::runtime_error>(
758 "called getTxHistory but not in reporting mode");
762 boost::format(
"SELECT nodestore_hash, ledger_seq "
764 " ORDER BY ledger_seq DESC LIMIT 20 "
773 <<
" : Postgres response is null - sql = " << sql;
777 else if (res.status() != PGRES_TUPLES_OK)
780 <<
" : Postgres response should have been "
781 "PGRES_TUPLES_OK but instead was "
782 << res.status() <<
" - msg = " << res.msg()
783 <<
" - sql = " << sql;
788 JLOG(
j_.
trace()) << __func__ <<
" Postgres result msg : " << res.msg();
790 if (res.isNull() || res.ntuples() == 0)
792 JLOG(
j_.
debug()) << __func__ <<
" : Empty postgres response";
796 else if (res.ntuples() > 0)
798 if (res.nfields() != 2)
801 <<
" : Wrong number of fields in Postgres "
802 "response. Expected 1, but got "
803 << res.nfields() <<
" . sql = " << sql;
809 JLOG(
j_.
trace()) << __func__ <<
" : Postgres result = " << res.c_str();
813 for (
size_t i = 0; i < res.ntuples(); ++i)
816 if (!hash.
parseHex(res.c_str(i, 0) + 2))
819 ledgerSequences.
push_back(res.asBigInt(i, 1));
823 for (
size_t i = 0; i < txns.size(); ++i)
825 auto const& [sttx, meta] = txns[i];
829 auto txn = std::make_shared<Transaction>(sttx, reason,
app_);
830 txn->setLedger(ledgerSequences[i]);
842 #ifdef RIPPLED_REPORTING
845 char const*& command = dbParams.first;
848 "SELECT account_tx($1::bytea, $2::bool, "
849 "$3::bigint, $4::bigint, $5::bigint, $6::bytea, "
850 "$7::bigint, $8::bool, $9::bigint, $10::bigint)";
853 values[1] = args.
forward ?
"true" :
"false";
856 if (args.
limit == 0 || args.
limit > page_length)
863 if (
auto range = std::get_if<LedgerRange>(&args.
ledger.value()))
868 else if (
auto hash = std::get_if<LedgerHash>(&args.
ledger.value()))
870 values[5] = (
"\\x" +
strHex(*hash));
873 auto sequence = std::get_if<LedgerSequence>(&args.
ledger.value()))
877 else if (std::get_if<LedgerShortcut>(&args.
ledger.value()))
884 JLOG(
j_.
error()) <<
"doAccountTxStoredProcedure - "
885 <<
"Error parsing ledger args";
895 for (
size_t i = 0; i < values.
size(); ++i)
898 << (values[i] ? values[i].value() :
"null");
901 auto res = PgQuery(
pgPool_)(dbParams);
905 <<
" : Postgres response is null - account = "
910 else if (res.status() != PGRES_TUPLES_OK)
913 <<
" : Postgres response should have been "
914 "PGRES_TUPLES_OK but instead was "
915 << res.status() <<
" - msg = " << res.msg()
921 JLOG(
j_.
trace()) << __func__ <<
" Postgres result msg : " << res.msg();
922 if (res.isNull() || res.ntuples() == 0)
925 <<
" : No data returned from Postgres : account = "
932 char const* resultStr = res.c_str();
933 JLOG(
j_.
trace()) << __func__ <<
" : "
934 <<
"postgres result = " << resultStr
939 bool success = reader.
parse(resultStr, resultStr + strlen(resultStr), v);
942 return processAccountTxStoredProcedureResult(args, v,
app_,
j_);
947 return {{}, {
rpcINTERNAL,
"Failed to deserialize Postgres result"}};
953 #ifdef RIPPLED_REPORTING
954 auto baseCmd = boost::format(R
"(SELECT tx('%s');)");
965 <<
" : Postgres response is null - tx ID = " <<
strHex(
id);
969 else if (res.status() != PGRES_TUPLES_OK)
973 <<
" : Postgres response should have been "
974 "PGRES_TUPLES_OK but instead was "
975 << res.status() <<
" - msg = " << res.msg()
976 <<
" - tx ID = " <<
strHex(
id);
982 << __func__ <<
" Postgres result msg : " << res.msg();
983 if (res.isNull() || res.ntuples() == 0)
987 <<
" : No data returned from Postgres : tx ID = " <<
strHex(
id);
993 char const* resultStr = res.c_str();
995 <<
"postgres result = " << resultStr;
999 bool success = reader.
parse(resultStr, resultStr + strlen(resultStr), v);
1008 uint32_t ledgerSeq = v[
"ledger_seq"].
asUInt();
1015 v[
"min_seq"].asUInt(), v[
"max_seq"].asUInt())};
1022 Throw<std::runtime_error>(
1023 "Transaction::Locate - Invalid Postgres response");
1050 return std::make_unique<PostgresDatabaseImp>(app, config, jobQueue);
1056 #ifdef RIPPLED_REPORTING
1057 using namespace std::chrono_literals;
1058 auto age = PgQuery(
pgPool_)(
"SELECT age()");
1059 if (!age || age.isNull())
1061 reason =
"No ledgers in database";
1066 reason =
"No recently-published ledger";