rippled
Loading...
Searching...
No Matches
OrderBookDB.cpp
1#include <xrpld/app/ledger/LedgerMaster.h>
2#include <xrpld/app/ledger/OrderBookDB.h>
3#include <xrpld/app/main/Application.h>
4#include <xrpld/app/misc/AMMUtils.h>
5#include <xrpld/app/misc/NetworkOPs.h>
6#include <xrpld/core/Config.h>
7
8#include <xrpl/basics/Log.h>
9#include <xrpl/core/JobQueue.h>
10#include <xrpl/protocol/Indexes.h>
11
12namespace xrpl {
13
14OrderBookDB::OrderBookDB(Application& app) : app_(app), seq_(0), j_(app.journal("OrderBookDB"))
15{
16}
17
18void
20{
22 {
23 JLOG(j_.warn()) << "Eliding full order book update: no ledger";
24 return;
25 }
26
27 auto seq = seq_.load();
28
29 if (seq != 0)
30 {
31 if ((ledger->seq() > seq) && ((ledger->seq() - seq) < 25600))
32 return;
33
34 if ((ledger->seq() <= seq) && ((seq - ledger->seq()) < 16))
35 return;
36 }
37
38 if (seq_.exchange(ledger->seq()) != seq)
39 return;
40
41 JLOG(j_.debug()) << "Full order book update: " << seq << " to " << ledger->seq();
42
43 if (app_.config().PATH_SEARCH_MAX != 0)
44 {
45 if (app_.config().standalone())
46 update(ledger);
47 else
48 app_.getJobQueue().addJob(jtUPDATE_PF, "OrderBookUpd", [this, ledger]() { update(ledger); });
49 }
50}
51
52void
54{
55 if (app_.config().PATH_SEARCH_MAX == 0)
56 return; // pathfinding has been disabled
57
58 // A newer full update job is pending
59 if (auto const seq = seq_.load(); seq > ledger->seq())
60 {
61 JLOG(j_.debug()) << "Eliding update for " << ledger->seq() << " because of pending update to later " << seq;
62 return;
63 }
64
65 decltype(allBooks_) allBooks;
66 decltype(xrpBooks_) xrpBooks;
67 decltype(domainBooks_) domainBooks;
68 decltype(xrpDomainBooks_) xrpDomainBooks;
69
70 allBooks.reserve(allBooks_.size());
71 xrpBooks.reserve(xrpBooks_.size());
72
73 JLOG(j_.debug()) << "Beginning update (" << ledger->seq() << ")";
74
75 // walk through the entire ledger looking for orderbook/AMM entries
76 int cnt = 0;
77
78 try
79 {
80 for (auto& sle : ledger->sles)
81 {
82 if (app_.isStopping())
83 {
84 JLOG(j_.info()) << "Update halted because the process is stopping";
85 seq_.store(0);
86 return;
87 }
88
89 if (sle->getType() == ltDIR_NODE && sle->isFieldPresent(sfExchangeRate) &&
90 sle->getFieldH256(sfRootIndex) == sle->key())
91 {
92 Book book;
93
94 book.in.currency = sle->getFieldH160(sfTakerPaysCurrency);
95 book.in.account = sle->getFieldH160(sfTakerPaysIssuer);
96 book.out.currency = sle->getFieldH160(sfTakerGetsCurrency);
97 book.out.account = sle->getFieldH160(sfTakerGetsIssuer);
98 book.domain = (*sle)[~sfDomainID];
99
100 if (book.domain)
101 domainBooks[{book.in, *book.domain}].insert(book.out);
102 else
103 allBooks[book.in].insert(book.out);
104
105 if (book.domain && isXRP(book.out))
106 xrpDomainBooks.insert({book.in, *book.domain});
107 else if (isXRP(book.out))
108 xrpBooks.insert(book.in);
109
110 ++cnt;
111 }
112 else if (sle->getType() == ltAMM)
113 {
114 auto const issue1 = (*sle)[sfAsset].get<Issue>();
115 auto const issue2 = (*sle)[sfAsset2].get<Issue>();
116 auto addBook = [&](Issue const& in, Issue const& out) {
117 allBooks[in].insert(out);
118
119 if (isXRP(out))
120 xrpBooks.insert(in);
121
122 ++cnt;
123 };
124 addBook(issue1, issue2);
125 addBook(issue2, issue1);
126 }
127 }
128 }
129 catch (SHAMapMissingNode const& mn)
130 {
131 JLOG(j_.info()) << "Missing node in " << ledger->seq() << " during update: " << mn.what();
132 seq_.store(0);
133 return;
134 }
135
136 JLOG(j_.debug()) << "Update completed (" << ledger->seq() << "): " << cnt << " books found";
137
138 {
140 allBooks_.swap(allBooks);
141 xrpBooks_.swap(xrpBooks);
142 domainBooks_.swap(domainBooks);
143 xrpDomainBooks_.swap(xrpDomainBooks);
144 }
145
147}
148
149void
151{
152 bool toXRP = isXRP(book.out);
153
155
156 if (book.domain)
157 domainBooks_[{book.in, *book.domain}].insert(book.out);
158 else
159 allBooks_[book.in].insert(book.out);
160
161 if (book.domain && toXRP)
162 xrpDomainBooks_.insert({book.in, *book.domain});
163 else if (toXRP)
164 xrpBooks_.insert(book.in);
165}
166
167// return list of all orderbooks that want this issuerID and currencyID
170{
172
173 {
175
176 auto getBooks = [&](auto const& container, auto const& key) {
177 if (auto it = container.find(key); it != container.end())
178 {
179 auto const& books = it->second;
180 ret.reserve(books.size());
181
182 for (auto const& gets : books)
183 ret.emplace_back(issue, gets, domain);
184 }
185 };
186
187 if (!domain)
188 getBooks(allBooks_, issue);
189 else
190 getBooks(domainBooks_, std::make_pair(issue, *domain));
191 }
192
193 return ret;
194}
195
196int
198{
200
201 if (!domain)
202 {
203 if (auto it = allBooks_.find(issue); it != allBooks_.end())
204 return static_cast<int>(it->second.size());
205 }
206 else
207 {
208 if (auto it = domainBooks_.find({issue, *domain}); it != domainBooks_.end())
209 return static_cast<int>(it->second.size());
210 }
211
212 return 0;
213}
214
215bool
217{
219 if (domain)
220 return xrpDomainBooks_.contains({issue, *domain});
221 return xrpBooks_.contains(issue);
222}
223
226{
228 auto ret = getBookListeners(book);
229
230 if (!ret)
231 {
233
234 mListeners[book] = ret;
235 XRPL_ASSERT(
236 getBookListeners(book) == ret,
237 "xrpl::OrderBookDB::makeBookListeners : result roundtrip "
238 "lookup");
239 }
240
241 return ret;
242}
243
246{
249
250 auto it0 = mListeners.find(book);
251 if (it0 != mListeners.end())
252 ret = it0->second;
253
254 return ret;
255}
256
257// Based on the meta, send the meta to the streams that are listening.
258// We need to determine which streams a given meta effects.
259void
262 AcceptedLedgerTx const& alTx,
263 MultiApiJson const& jvObj)
264{
266
267 // For this particular transaction, maintain the set of unique
268 // subscriptions that have already published it. This prevents sending
269 // the transaction multiple times if it touches multiple ltOFFER
270 // entries for the same book, or if it touches multiple books and a
271 // single client has subscribed to those books.
272 hash_set<std::uint64_t> havePublished;
273
274 for (auto const& node : alTx.getMeta().getNodes())
275 {
276 try
277 {
278 if (node.getFieldU16(sfLedgerEntryType) == ltOFFER)
279 {
280 auto process = [&, this](SField const& field) {
281 if (auto data = dynamic_cast<STObject const*>(node.peekAtPField(field));
282 data && data->isFieldPresent(sfTakerPays) && data->isFieldPresent(sfTakerGets))
283 {
284 auto listeners = getBookListeners(
285 {data->getFieldAmount(sfTakerGets).issue(),
286 data->getFieldAmount(sfTakerPays).issue(),
287 (*data)[~sfDomainID]});
288 if (listeners)
289 listeners->publish(jvObj, havePublished);
290 }
291 };
292
293 // We need a field that contains the TakerGets and TakerPays
294 // parameters.
295 if (node.getFName() == sfModifiedNode)
296 process(sfPreviousFields);
297 else if (node.getFName() == sfCreatedNode)
298 process(sfNewFields);
299 else if (node.getFName() == sfDeletedNode)
300 process(sfFinalFields);
301 }
302 }
303 catch (std::exception const& ex)
304 {
305 JLOG(j_.info()) << "processTxn: field not found (" << ex.what() << ")";
306 }
307 }
308}
309
310} // namespace xrpl
Stream debug() const
Definition Journal.h:301
Stream info() const
Definition Journal.h:307
Stream warn() const
Definition Journal.h:313
A transaction that is in a closed ledger.
TxMeta const & getMeta() const
virtual Config & config()=0
virtual LedgerMaster & getLedgerMaster()=0
virtual JobQueue & getJobQueue()=0
virtual bool isStopping() const =0
virtual NetworkOPs & getOPs()=0
Specifies an order book.
Definition Book.h:17
Issue in
Definition Book.h:19
std::optional< uint256 > domain
Definition Book.h:21
Issue out
Definition Book.h:20
int PATH_SEARCH_MAX
Definition Config.h:180
bool standalone() const
Definition Config.h:312
A currency issued by an account.
Definition Issue.h:14
Currency currency
Definition Issue.h:16
AccountID account
Definition Issue.h:17
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition JobQueue.h:146
virtual bool isNeedNetworkLedger()=0
Application & app_
Definition OrderBookDB.h:52
BookToListenersMap mListeners
Definition OrderBookDB.h:69
BookListeners::pointer getBookListeners(Book const &)
beast::Journal const j_
Definition OrderBookDB.h:73
hardened_hash_map< Issue, hardened_hash_set< Issue > > allBooks_
Definition OrderBookDB.h:55
hash_set< std::pair< Issue, Domain > > xrpDomainBooks_
Definition OrderBookDB.h:63
void processTxn(std::shared_ptr< ReadView const > const &ledger, AcceptedLedgerTx const &alTx, MultiApiJson const &jvObj)
std::vector< Book > getBooksByTakerPays(Issue const &, std::optional< Domain > const &domain=std::nullopt)
hardened_hash_map< std::pair< Issue, Domain >, hardened_hash_set< Issue > > domainBooks_
Definition OrderBookDB.h:57
hash_set< Issue > xrpBooks_
Definition OrderBookDB.h:60
OrderBookDB(Application &app)
int getBookSize(Issue const &, std::optional< Domain > const &domain=std::nullopt)
std::atomic< std::uint32_t > seq_
Definition OrderBookDB.h:71
std::recursive_mutex mLock
Definition OrderBookDB.h:65
void setup(std::shared_ptr< ReadView const > const &ledger)
void update(std::shared_ptr< ReadView const > const &ledger)
void addOrderBook(Book const &)
bool isBookToXRP(Issue const &, std::optional< Domain > domain=std::nullopt)
BookListeners::pointer makeBookListeners(Book const &)
Identifies fields.
Definition SField.h:127
STArray & getNodes()
Definition TxMeta.h:70
T emplace_back(T... args)
T end(T... args)
T exchange(T... args)
T find(T... args)
T is_same_v
T load(T... args)
T make_pair(T... args)
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
bool isXRP(AccountID const &c)
Definition AccountID.h:71
@ jtUPDATE_PF
Definition Job.h:36
T reserve(T... args)
T store(T... args)
T what(T... args)