diff --git a/src/backend/CassandraBackend.cpp b/src/backend/CassandraBackend.cpp index 36e6ea4a..3674ace3 100644 --- a/src/backend/CassandraBackend.cpp +++ b/src/backend/CassandraBackend.cpp @@ -422,19 +422,34 @@ CassandraBackend::fetchAccountTransactions( do { { - CassandraStatement statement{selectAccountTx_}; + CassandraStatement statement = [this, forward]() { + if (forward) + return CassandraStatement{selectAccountTxForward_}; + else + return CassandraStatement{selectAccountTx_}; + }(); statement.bindBytes(account); if (cursor) { statement.bindUInt(cursor->ledgerSequence >> 20 << 20); statement.bindIntTuple( cursor->ledgerSequence, cursor->transactionIndex); + BOOST_LOG_TRIVIAL(debug) + << " account = " << ripple::strHex(account) + << " idx = " << (cursor->ledgerSequence >> 20 << 20) + << " tuple = " << cursor->ledgerSequence << " : " + << cursor->transactionIndex; } else { - statement.bindUInt(rng->maxSequence >> 20 << 20); + int seq = forward ? rng->minSequence : rng->maxSequence; + statement.bindUInt(seq >> 20 << 20); + int placeHolder = forward ? 0 : INT32_MAX; - statement.bindIntTuple(INT32_MAX, INT32_MAX); + statement.bindIntTuple(placeHolder, placeHolder); + BOOST_LOG_TRIVIAL(debug) + << " account = " << ripple::strHex(account) + << " idx = " << seq << " tuple = " << placeHolder; } uint32_t adjustedLimit = limit - hashes.size(); statement.bindUInt(adjustedLimit); @@ -457,6 +472,8 @@ CassandraBackend::fetchAccountTransactions( BOOST_LOG_TRIVIAL(debug) << __func__ << " setting cursor"; auto [lgrSeq, txnIdx] = result.getInt64Tuple(); cursor = {(uint32_t)lgrSeq, (uint32_t)txnIdx}; + if (forward) + ++cursor->transactionIndex; } } while (result.nextRow()); } @@ -464,9 +481,14 @@ CassandraBackend::fetchAccountTransactions( { BOOST_LOG_TRIVIAL(debug) << __func__ << " less than limit"; uint32_t seq = cursor->ledgerSequence; - seq = ((seq >> 20) - 1) << 20; + seq = seq >> 20; + if (forward) + seq += 1; + else + seq -= 1; + seq = seq << 20; cursor->ledgerSequence = seq; - cursor->transactionIndex = INT32_MAX; + cursor->transactionIndex = forward ? 0 : INT32_MAX; BOOST_LOG_TRIVIAL(debug) << __func__ << " walking back"; CassandraStatement statement{selectObject_}; statement.bindBytes(keylet.key); @@ -1245,6 +1267,13 @@ CassandraBackend::open(bool readOnly) << " AND seq_idx < ? LIMIT ?"; if (!selectAccountTx_.prepareStatement(query, session_.get())) continue; + query.str(""); + query << " SELECT hash,seq_idx FROM " << tablePrefix << "account_tx" + << " WHERE account = ? " + << " AND idx = ? " + << " AND seq_idx >= ? ORDER BY seq_idx ASC LIMIT ?"; + if (!selectAccountTxForward_.prepareStatement(query, session_.get())) + continue; query.str(""); query << " INSERT INTO " << tablePrefix << "ledgers " diff --git a/src/backend/CassandraBackend.h b/src/backend/CassandraBackend.h index f141e1eb..a9c124da 100644 --- a/src/backend/CassandraBackend.h +++ b/src/backend/CassandraBackend.h @@ -576,6 +576,7 @@ private: CassandraPreparedStatement selectKeys_; CassandraPreparedStatement insertAccountTx_; CassandraPreparedStatement selectAccountTx_; + CassandraPreparedStatement selectAccountTxForward_; CassandraPreparedStatement insertLedgerHeader_; CassandraPreparedStatement insertLedgerHash_; CassandraPreparedStatement updateLedgerRange_; diff --git a/src/handlers/Handlers.cpp b/src/handlers/Handlers.cpp index 9c9fa7ea..9fc3cf8b 100644 --- a/src/handlers/Handlers.cpp +++ b/src/handlers/Handlers.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -5,70 +6,56 @@ #include #include #include -#include -namespace RPC -{ +namespace RPC { -static std::unordered_map> - handlerTable -{ - { "account_channels", &doAccountChannels }, - { "account_currencies", &doAccountCurrencies }, - { "account_info", &doAccountInfo }, - { "account_lines", &doAccountLines }, - { "account_objects", &doAccountObjects }, - { "account_offers", &doAccountOffers }, - { "account_tx", &doAccountTx }, - { "book_offers", &doBookOffers }, - { "channel_authorize", &doChannelAuthorize }, - { "channel_verify", &doChannelVerify }, - { "ledger", &doLedger }, - { "ledger_data", &doLedgerData }, - { "ledger_entry", &doLedgerEntry }, - { "ledger_range", &doLedgerRange }, - { "ledger_data", &doLedgerData }, - { "subscribe", &doSubscribe }, - { "unsubscribe", &doUnsubscribe }, - { "tx", &doTx }, -}; +static std::unordered_map> + handlerTable{ + {"account_channels", &doAccountChannels}, + {"account_currencies", &doAccountCurrencies}, + {"account_info", &doAccountInfo}, + {"account_lines", &doAccountLines}, + {"account_objects", &doAccountObjects}, + {"account_offers", &doAccountOffers}, + {"account_tx", &doAccountTx}, + {"book_offers", &doBookOffers}, + {"channel_authorize", &doChannelAuthorize}, + {"channel_verify", &doChannelVerify}, + {"ledger", &doLedger}, + {"ledger_data", &doLedgerData}, + {"ledger_entry", &doLedgerEntry}, + {"ledger_range", &doLedgerRange}, + {"ledger_data", &doLedgerData}, + {"subscribe", &doSubscribe}, + {"unsubscribe", &doUnsubscribe}, + {"tx", &doTx}, + }; -static std::unordered_set forwardCommands { +static std::unordered_set forwardCommands{ "submit", "submit_multisigned", "fee", "path_find", "ripple_path_find", - "manifest" -}; + "manifest"}; bool shouldForwardToRippled(Context const& ctx) { auto request = ctx.params; - if (request.contains("forward") && request.at("forward").is_bool()) - return request.at("forward").as_bool(); - - BOOST_LOG_TRIVIAL(debug) << "checked forward"; - if (forwardCommands.find(ctx.method) != forwardCommands.end()) return true; - BOOST_LOG_TRIVIAL(debug) << "checked command"; - if (request.contains("ledger_index")) { auto indexValue = request.at("ledger_index"); if (indexValue.is_string()) { - BOOST_LOG_TRIVIAL(debug) << "checking ledger as string"; std::string index = indexValue.as_string().c_str(); return index == "current" || index == "closed"; } } - - BOOST_LOG_TRIVIAL(debug) << "checked ledger"; return false; } @@ -87,4 +74,4 @@ buildResponse(Context const& ctx) return method(ctx); } -} \ No newline at end of file +} // namespace RPC diff --git a/src/handlers/methods/impl/AccountTx.cpp b/src/handlers/methods/impl/AccountTx.cpp index d1d253ab..60279d77 100644 --- a/src/handlers/methods/impl/AccountTx.cpp +++ b/src/handlers/methods/impl/AccountTx.cpp @@ -61,7 +61,6 @@ doAccountTx(Context const& context) } std::optional cursor; - cursor = {context.range.maxSequence, 0}; if (request.contains("marker")) { @@ -119,10 +118,10 @@ doAccountTx(Context const& context) bool forward = false; if (request.contains("forward")) { - if (!request.at("limit").is_bool()) + if (!request.at("forward").is_bool()) return Status{Error::rpcINVALID_PARAMS, "forwardNotBool"}; - forward = request.at("limit").as_bool(); + forward = request.at("forward").as_bool(); } boost::json::array txns; diff --git a/src/webserver/WsBase.h b/src/webserver/WsBase.h index 43668a61..7764c573 100644 --- a/src/webserver/WsBase.h +++ b/src/webserver/WsBase.h @@ -49,7 +49,7 @@ inline boost::json::object getDefaultWsResponse(boost::json::value const& id) { boost::json::object defaultResp = {}; - if(!id.is_null()) + if (!id.is_null()) defaultResp["id"] = id; defaultResp["result"] = boost::json::object_kind; @@ -162,13 +162,12 @@ public: { std::shared_ptr mgr = subscriptions_.lock(); - if(!mgr) + if (!mgr) return; mgr->clearSession(this); } - void do_read() { @@ -222,25 +221,25 @@ public: subscriptions_.lock(), balancer_, shared_from_this(), - *range - ); + *range); if (!context) return send(boost::json::serialize( RPC::make_error(RPC::Error::rpcBAD_SYNTAX))); - auto id = request.contains("id") - ? request.at("id") - : nullptr; + auto id = + request.contains("id") ? request.at("id") : nullptr; auto response = getDefaultWsResponse(id); - boost::json::object& result = response["result"].as_object(); + boost::json::object& result = + response["result"].as_object(); auto v = RPC::buildResponse(*context); if (auto status = std::get_if(&v)) { - auto error = RPC::make_error(status->error); + auto error = + RPC::make_error(status->error, status->message); if (!id.is_null()) error["id"] = id; @@ -256,7 +255,6 @@ public: if (!dosGuard_.add(ip, response_.size())) result["warning"] = "Too many requests"; - } catch (Backend::DatabaseTimeout const& t) { diff --git a/test.py b/test.py index 5f8e6da8..8dd32d31 100755 --- a/test.py +++ b/test.py @@ -208,15 +208,15 @@ def getMinAndMax(res): return (minSeq,maxSeq) -async def account_tx(ip, port, account, binary, minLedger=None, maxLedger=None): +async def account_tx(ip, port, account, binary, forward=False, minLedger=None, maxLedger=None): address = 'ws://' + str(ip) + ':' + str(port) try: async with websockets.connect(address) as ws: if minLedger is None or maxLedger is None: - await ws.send(json.dumps({"command":"account_tx","account":account, "binary":bool(binary),"limit":200})) + await ws.send(json.dumps({"command":"account_tx","account":account, "binary":bool(binary),"forward":bool(forward),"limit":200})) else: - await ws.send(json.dumps({"command":"account_tx","account":account, "binary":bool(binary),"limit":200,"ledger_index_min":minLedger, "ledger_index_max":maxLedger})) + await ws.send(json.dumps({"command":"account_tx","account":account, "binary":bool(binary),"forward":bool(forward),"limit":200,"ledger_index_min":minLedger, "ledger_index_max":maxLedger})) res = json.loads(await ws.recv()) print(json.dumps(res,indent=4,sort_keys=True)) @@ -303,12 +303,12 @@ async def account_txs(ip, port, accounts, numCalls): except websockets.exceptions.connectionclosederror as e: print(e) -async def account_tx_full(ip, port, account, binary,minLedger=None, maxLedger=None): +async def account_tx_full(ip, port, account, binary,forward=False,minLedger=None, maxLedger=None): address = 'ws://' + str(ip) + ':' + str(port) try: cursor = None marker = None - req = {"command":"account_tx","account":account, "binary":bool(binary),"limit":200} + req = {"command":"account_tx","account":account, "binary":bool(binary),"forward":bool(forward),"limit":200} results = {"transactions":[]} numCalls = 0 async with websockets.connect(address) as ws: @@ -826,6 +826,7 @@ parser.add_argument('--p2pIp', default='s2.ripple.com') parser.add_argument('--p2pPort', default='51233') parser.add_argument('--verify',default=False) parser.add_argument('--binary',default=True) +parser.add_argument('--forward',default=False) parser.add_argument('--expand',default=False) parser.add_argument('--transactions',default=False) parser.add_argument('--minLedger',default=-1) @@ -1029,7 +1030,7 @@ def run(args): args.account = res["Account"] res = asyncio.get_event_loop().run_until_complete( - account_tx(args.ip, args.port, args.account, args.binary)) + account_tx(args.ip, args.port, args.account, args.binary, args.forward)) rng = getMinAndMax(res) @@ -1048,7 +1049,7 @@ def run(args): args.account = res["Account"] print("starting") res = asyncio.get_event_loop().run_until_complete( - account_tx_full(args.ip, args.port, args.account, args.binary,None,None)) + account_tx_full(args.ip, args.port, args.account, args.binary,args.forward,None,None)) rng = getMinAndMax(res) print(len(res["transactions"])) print(args.account) @@ -1060,7 +1061,7 @@ def run(args): if args.verify: print("requesting p2p node") res2 = asyncio.get_event_loop().run_until_complete( - account_tx_full(args.p2pIp, args.p2pPort, args.account, args.binary, rng[0],rng[1],int(args.numPages))) + account_tx_full(args.p2pIp, args.p2pPort, args.account, args.binary,args.forward, rng[0],rng[1],int(args.numPages))) print(compareAccountTx(res,res2)) elif args.action == "ledger_data":