support forward flag for cassandra

This commit is contained in:
CJ Cobb
2021-07-16 18:37:17 +00:00
parent 7040860d8d
commit 63a66b00a6
6 changed files with 81 additions and 66 deletions

View File

@@ -422,19 +422,34 @@ CassandraBackend::fetchAccountTransactions(
do do
{ {
{ {
CassandraStatement statement{selectAccountTx_}; CassandraStatement statement = [this, forward]() {
if (forward)
return CassandraStatement{selectAccountTxForward_};
else
return CassandraStatement{selectAccountTx_};
}();
statement.bindBytes(account); statement.bindBytes(account);
if (cursor) if (cursor)
{ {
statement.bindUInt(cursor->ledgerSequence >> 20 << 20); statement.bindUInt(cursor->ledgerSequence >> 20 << 20);
statement.bindIntTuple( statement.bindIntTuple(
cursor->ledgerSequence, cursor->transactionIndex); cursor->ledgerSequence, cursor->transactionIndex);
BOOST_LOG_TRIVIAL(debug)
<< " account = " << ripple::strHex(account)
<< " idx = " << (cursor->ledgerSequence >> 20 << 20)
<< " tuple = " << cursor->ledgerSequence << " : "
<< cursor->transactionIndex;
} }
else else
{ {
statement.bindUInt(rng->maxSequence >> 20 << 20); int seq = forward ? rng->minSequence : rng->maxSequence;
statement.bindUInt(seq >> 20 << 20);
int placeHolder = forward ? 0 : INT32_MAX;
statement.bindIntTuple(INT32_MAX, INT32_MAX); statement.bindIntTuple(placeHolder, placeHolder);
BOOST_LOG_TRIVIAL(debug)
<< " account = " << ripple::strHex(account)
<< " idx = " << seq << " tuple = " << placeHolder;
} }
uint32_t adjustedLimit = limit - hashes.size(); uint32_t adjustedLimit = limit - hashes.size();
statement.bindUInt(adjustedLimit); statement.bindUInt(adjustedLimit);
@@ -457,6 +472,8 @@ CassandraBackend::fetchAccountTransactions(
BOOST_LOG_TRIVIAL(debug) << __func__ << " setting cursor"; BOOST_LOG_TRIVIAL(debug) << __func__ << " setting cursor";
auto [lgrSeq, txnIdx] = result.getInt64Tuple(); auto [lgrSeq, txnIdx] = result.getInt64Tuple();
cursor = {(uint32_t)lgrSeq, (uint32_t)txnIdx}; cursor = {(uint32_t)lgrSeq, (uint32_t)txnIdx};
if (forward)
++cursor->transactionIndex;
} }
} while (result.nextRow()); } while (result.nextRow());
} }
@@ -464,9 +481,14 @@ CassandraBackend::fetchAccountTransactions(
{ {
BOOST_LOG_TRIVIAL(debug) << __func__ << " less than limit"; BOOST_LOG_TRIVIAL(debug) << __func__ << " less than limit";
uint32_t seq = cursor->ledgerSequence; uint32_t seq = cursor->ledgerSequence;
seq = ((seq >> 20) - 1) << 20; seq = seq >> 20;
if (forward)
seq += 1;
else
seq -= 1;
seq = seq << 20;
cursor->ledgerSequence = seq; cursor->ledgerSequence = seq;
cursor->transactionIndex = INT32_MAX; cursor->transactionIndex = forward ? 0 : INT32_MAX;
BOOST_LOG_TRIVIAL(debug) << __func__ << " walking back"; BOOST_LOG_TRIVIAL(debug) << __func__ << " walking back";
CassandraStatement statement{selectObject_}; CassandraStatement statement{selectObject_};
statement.bindBytes(keylet.key); statement.bindBytes(keylet.key);
@@ -1245,6 +1267,13 @@ CassandraBackend::open(bool readOnly)
<< " AND seq_idx < ? LIMIT ?"; << " AND seq_idx < ? LIMIT ?";
if (!selectAccountTx_.prepareStatement(query, session_.get())) if (!selectAccountTx_.prepareStatement(query, session_.get()))
continue; continue;
query.str("");
query << " SELECT hash,seq_idx FROM " << tablePrefix << "account_tx"
<< " WHERE account = ? "
<< " AND idx = ? "
<< " AND seq_idx >= ? ORDER BY seq_idx ASC LIMIT ?";
if (!selectAccountTxForward_.prepareStatement(query, session_.get()))
continue;
query.str(""); query.str("");
query << " INSERT INTO " << tablePrefix << "ledgers " query << " INSERT INTO " << tablePrefix << "ledgers "

View File

@@ -576,6 +576,7 @@ private:
CassandraPreparedStatement selectKeys_; CassandraPreparedStatement selectKeys_;
CassandraPreparedStatement insertAccountTx_; CassandraPreparedStatement insertAccountTx_;
CassandraPreparedStatement selectAccountTx_; CassandraPreparedStatement selectAccountTx_;
CassandraPreparedStatement selectAccountTxForward_;
CassandraPreparedStatement insertLedgerHeader_; CassandraPreparedStatement insertLedgerHeader_;
CassandraPreparedStatement insertLedgerHash_; CassandraPreparedStatement insertLedgerHash_;
CassandraPreparedStatement updateLedgerRange_; CassandraPreparedStatement updateLedgerRange_;

View File

@@ -1,3 +1,4 @@
#include <etl/ETLSource.h>
#include <handlers/Handlers.h> #include <handlers/Handlers.h>
#include <handlers/methods/Account.h> #include <handlers/methods/Account.h>
#include <handlers/methods/Channel.h> #include <handlers/methods/Channel.h>
@@ -5,14 +6,11 @@
#include <handlers/methods/Ledger.h> #include <handlers/methods/Ledger.h>
#include <handlers/methods/Subscribe.h> #include <handlers/methods/Subscribe.h>
#include <handlers/methods/Transaction.h> #include <handlers/methods/Transaction.h>
#include <etl/ETLSource.h>
namespace RPC namespace RPC {
{
static std::unordered_map<std::string, std::function<Result(Context const&)>> static std::unordered_map<std::string, std::function<Result(Context const&)>>
handlerTable handlerTable{
{
{"account_channels", &doAccountChannels}, {"account_channels", &doAccountChannels},
{"account_currencies", &doAccountCurrencies}, {"account_currencies", &doAccountCurrencies},
{"account_info", &doAccountInfo}, {"account_info", &doAccountInfo},
@@ -39,37 +37,26 @@ static std::unordered_set<std::string> forwardCommands {
"fee", "fee",
"path_find", "path_find",
"ripple_path_find", "ripple_path_find",
"manifest" "manifest"};
};
bool bool
shouldForwardToRippled(Context const& ctx) shouldForwardToRippled(Context const& ctx)
{ {
auto request = ctx.params; auto request = ctx.params;
if (request.contains("forward") && request.at("forward").is_bool())
return request.at("forward").as_bool();
BOOST_LOG_TRIVIAL(debug) << "checked forward";
if (forwardCommands.find(ctx.method) != forwardCommands.end()) if (forwardCommands.find(ctx.method) != forwardCommands.end())
return true; return true;
BOOST_LOG_TRIVIAL(debug) << "checked command";
if (request.contains("ledger_index")) if (request.contains("ledger_index"))
{ {
auto indexValue = request.at("ledger_index"); auto indexValue = request.at("ledger_index");
if (indexValue.is_string()) if (indexValue.is_string())
{ {
BOOST_LOG_TRIVIAL(debug) << "checking ledger as string";
std::string index = indexValue.as_string().c_str(); std::string index = indexValue.as_string().c_str();
return index == "current" || index == "closed"; return index == "current" || index == "closed";
} }
} }
BOOST_LOG_TRIVIAL(debug) << "checked ledger";
return false; return false;
} }
@@ -87,4 +74,4 @@ buildResponse(Context const& ctx)
return method(ctx); return method(ctx);
} }
} } // namespace RPC

View File

@@ -61,7 +61,6 @@ doAccountTx(Context const& context)
} }
std::optional<Backend::AccountTransactionsCursor> cursor; std::optional<Backend::AccountTransactionsCursor> cursor;
cursor = {context.range.maxSequence, 0};
if (request.contains("marker")) if (request.contains("marker"))
{ {
@@ -119,10 +118,10 @@ doAccountTx(Context const& context)
bool forward = false; bool forward = false;
if (request.contains("forward")) if (request.contains("forward"))
{ {
if (!request.at("limit").is_bool()) if (!request.at("forward").is_bool())
return Status{Error::rpcINVALID_PARAMS, "forwardNotBool"}; return Status{Error::rpcINVALID_PARAMS, "forwardNotBool"};
forward = request.at("limit").as_bool(); forward = request.at("forward").as_bool();
} }
boost::json::array txns; boost::json::array txns;

View File

@@ -168,7 +168,6 @@ public:
mgr->clearSession(this); mgr->clearSession(this);
} }
void void
do_read() do_read()
{ {
@@ -222,25 +221,25 @@ public:
subscriptions_.lock(), subscriptions_.lock(),
balancer_, balancer_,
shared_from_this(), shared_from_this(),
*range *range);
);
if (!context) if (!context)
return send(boost::json::serialize( return send(boost::json::serialize(
RPC::make_error(RPC::Error::rpcBAD_SYNTAX))); RPC::make_error(RPC::Error::rpcBAD_SYNTAX)));
auto id = request.contains("id") auto id =
? request.at("id") request.contains("id") ? request.at("id") : nullptr;
: nullptr;
auto response = getDefaultWsResponse(id); auto response = getDefaultWsResponse(id);
boost::json::object& result = response["result"].as_object(); boost::json::object& result =
response["result"].as_object();
auto v = RPC::buildResponse(*context); auto v = RPC::buildResponse(*context);
if (auto status = std::get_if<RPC::Status>(&v)) if (auto status = std::get_if<RPC::Status>(&v))
{ {
auto error = RPC::make_error(status->error); auto error =
RPC::make_error(status->error, status->message);
if (!id.is_null()) if (!id.is_null())
error["id"] = id; error["id"] = id;
@@ -256,7 +255,6 @@ public:
if (!dosGuard_.add(ip, response_.size())) if (!dosGuard_.add(ip, response_.size()))
result["warning"] = "Too many requests"; result["warning"] = "Too many requests";
} }
catch (Backend::DatabaseTimeout const& t) catch (Backend::DatabaseTimeout const& t)
{ {

17
test.py
View File

@@ -208,15 +208,15 @@ def getMinAndMax(res):
return (minSeq,maxSeq) return (minSeq,maxSeq)
async def account_tx(ip, port, account, binary, minLedger=None, maxLedger=None): async def account_tx(ip, port, account, binary, forward=False, minLedger=None, maxLedger=None):
address = 'ws://' + str(ip) + ':' + str(port) address = 'ws://' + str(ip) + ':' + str(port)
try: try:
async with websockets.connect(address) as ws: async with websockets.connect(address) as ws:
if minLedger is None or maxLedger is None: if minLedger is None or maxLedger is None:
await ws.send(json.dumps({"command":"account_tx","account":account, "binary":bool(binary),"limit":200})) await ws.send(json.dumps({"command":"account_tx","account":account, "binary":bool(binary),"forward":bool(forward),"limit":200}))
else: else:
await ws.send(json.dumps({"command":"account_tx","account":account, "binary":bool(binary),"limit":200,"ledger_index_min":minLedger, "ledger_index_max":maxLedger})) await ws.send(json.dumps({"command":"account_tx","account":account, "binary":bool(binary),"forward":bool(forward),"limit":200,"ledger_index_min":minLedger, "ledger_index_max":maxLedger}))
res = json.loads(await ws.recv()) res = json.loads(await ws.recv())
print(json.dumps(res,indent=4,sort_keys=True)) print(json.dumps(res,indent=4,sort_keys=True))
@@ -303,12 +303,12 @@ async def account_txs(ip, port, accounts, numCalls):
except websockets.exceptions.connectionclosederror as e: except websockets.exceptions.connectionclosederror as e:
print(e) print(e)
async def account_tx_full(ip, port, account, binary,minLedger=None, maxLedger=None): async def account_tx_full(ip, port, account, binary,forward=False,minLedger=None, maxLedger=None):
address = 'ws://' + str(ip) + ':' + str(port) address = 'ws://' + str(ip) + ':' + str(port)
try: try:
cursor = None cursor = None
marker = None marker = None
req = {"command":"account_tx","account":account, "binary":bool(binary),"limit":200} req = {"command":"account_tx","account":account, "binary":bool(binary),"forward":bool(forward),"limit":200}
results = {"transactions":[]} results = {"transactions":[]}
numCalls = 0 numCalls = 0
async with websockets.connect(address) as ws: async with websockets.connect(address) as ws:
@@ -826,6 +826,7 @@ parser.add_argument('--p2pIp', default='s2.ripple.com')
parser.add_argument('--p2pPort', default='51233') parser.add_argument('--p2pPort', default='51233')
parser.add_argument('--verify',default=False) parser.add_argument('--verify',default=False)
parser.add_argument('--binary',default=True) parser.add_argument('--binary',default=True)
parser.add_argument('--forward',default=False)
parser.add_argument('--expand',default=False) parser.add_argument('--expand',default=False)
parser.add_argument('--transactions',default=False) parser.add_argument('--transactions',default=False)
parser.add_argument('--minLedger',default=-1) parser.add_argument('--minLedger',default=-1)
@@ -1029,7 +1030,7 @@ def run(args):
args.account = res["Account"] args.account = res["Account"]
res = asyncio.get_event_loop().run_until_complete( res = asyncio.get_event_loop().run_until_complete(
account_tx(args.ip, args.port, args.account, args.binary)) account_tx(args.ip, args.port, args.account, args.binary, args.forward))
rng = getMinAndMax(res) rng = getMinAndMax(res)
@@ -1048,7 +1049,7 @@ def run(args):
args.account = res["Account"] args.account = res["Account"]
print("starting") print("starting")
res = asyncio.get_event_loop().run_until_complete( res = asyncio.get_event_loop().run_until_complete(
account_tx_full(args.ip, args.port, args.account, args.binary,None,None)) account_tx_full(args.ip, args.port, args.account, args.binary,args.forward,None,None))
rng = getMinAndMax(res) rng = getMinAndMax(res)
print(len(res["transactions"])) print(len(res["transactions"]))
print(args.account) print(args.account)
@@ -1060,7 +1061,7 @@ def run(args):
if args.verify: if args.verify:
print("requesting p2p node") print("requesting p2p node")
res2 = asyncio.get_event_loop().run_until_complete( res2 = asyncio.get_event_loop().run_until_complete(
account_tx_full(args.p2pIp, args.p2pPort, args.account, args.binary, rng[0],rng[1],int(args.numPages))) account_tx_full(args.p2pIp, args.p2pPort, args.account, args.binary,args.forward, rng[0],rng[1],int(args.numPages)))
print(compareAccountTx(res,res2)) print(compareAccountTx(res,res2))
elif args.action == "ledger_data": elif args.action == "ledger_data":