From 232b5284a53ddedb26e406834310e06cf5d010ad Mon Sep 17 00:00:00 2001 From: mDuo13 Date: Mon, 20 Dec 2021 20:57:47 -0800 Subject: [PATCH] Build wallet: start migrating to async --- .../py/6_verification_and_polish.py | 247 +++++------------- 1 file changed, 69 insertions(+), 178 deletions(-) diff --git a/content/_code-samples/build-a-wallet/py/6_verification_and_polish.py b/content/_code-samples/build-a-wallet/py/6_verification_and_polish.py index 0bd1be9735..0b73a7d262 100644 --- a/content/_code-samples/build-a-wallet/py/6_verification_and_polish.py +++ b/content/_code-samples/build-a-wallet/py/6_verification_and_polish.py @@ -1,146 +1,17 @@ -# "Build a Wallet" tutorial, step 5: Send XRP button. -# This step finally introduces events from the GUI to the worker thread. - -import re +# "Build a Wallet" tutorial, step 6: Verification and Polish +# TODO: description import xrpl + +import asyncio +import re import wx -import wx.lib.newevent import wx.dataview import wx.adv +from wxasync import AsyncBind, WxAsyncApp, StartCoroutine from threading import Thread from decimal import Decimal from queue import Queue, Empty -class WSResponseError(Exception): - pass - -WSC_TIMEOUT = 0.2 -class SmartWSClient(xrpl.clients.WebsocketClient): - def __init__(self, *args, **kwargs): - self._handlers = {} - self._pending_requests = {} - self._id = 0 - self.jobq = Queue() # for incoming UI events - super().__init__(*args, **kwargs, timeout=WSC_TIMEOUT) - - def on(self, event_type, callback): - """ - Map a callback function to a type of event message from the connected - server. Only supports one callback function per event type. - """ - self._handlers[event_type] = callback - - def request(self, req_dict, callback): - if "id" not in req_dict: - req_dict["id"] = f"__auto_{self._id}" - self._id += 1 - # Work around xrpl-py quirk where it won't let you instantiate a request - # in proper WebSocket format because WS uses "command" instead of - # "method" but xrpl-py checks for "method": - req_dict["method"] = req_dict["command"] - del req_dict["command"] - - req = xrpl.models.requests.request.Request.from_xrpl(req_dict) - req.validate() - self._pending_requests[req.id] = callback - self.send(req) - - def run_forever(self): - while True: - try: - req, callback = self.jobq.get(block=False) - self.request(req, callback) - except Empty: - pass - - for message in self: - if message.get("type") == "response": - if message.get("status") == "success": - del message["status"] - else: - raise WSResponseError("Unsuccessful response:", message) - - msg_id = message.get("id") - if msg_id in self._pending_requests: - self._pending_requests[msg_id](message) - del self._pending_requests[msg_id] - else: - raise WSResponseError("Response to unknown request:", message) - - elif message.get("type") in self._handlers: - self._handlers[message.get("type")](message) - -# Set up event types to pass info from the worker thread to the main UI thread -GotNewLedger, EVT_NEW_LEDGER = wx.lib.newevent.NewEvent() -GotAccountInfo, EVT_ACCT_INFO = wx.lib.newevent.NewEvent() -GotAccountTx, EVT_ACCT_TX = wx.lib.newevent.NewEvent() -GotTxSub, EVT_TX_SUB = wx.lib.newevent.NewEvent() - -class XRPLMonitorThread(Thread): - """ - A worker thread to watch for new ledger events and pass the info back to - the main frame to be shown in the UI. Using a thread lets us maintain the - responsiveness of the UI while doing work in the background. - """ - def __init__(self, ws_url, notify_window, classic_address): - Thread.__init__(self, daemon=True) - self.notify_window = notify_window - self.ws_url = ws_url - self.account = classic_address - self.client = SmartWSClient(self.ws_url) - - def notify_ledger(self, message): - wx.QueueEvent(self.notify_window, GotNewLedger(data=message)) - - def notify_account(self, message): - wx.QueueEvent(self.notify_window, GotAccountInfo(data=message["result"])) - - def notify_account_tx(self, message): - wx.QueueEvent(self.notify_window, GotAccountTx(data=message["result"])) - - def on_transaction(self, message): - """ - Update our account history and re-check our balance whenever a new - transaction touches our account. - """ - self.client.request({ - "command": "account_info", - "account": self.account, - "ledger_index": message["ledger_index"] - }, self.notify_account) - wx.QueueEvent(self.notify_window, GotTxSub(data=message)) - - def run(self): - self.client.open() - # Subscribe to ledger updates - self.client.request({ - "command": "subscribe", - "streams": ["ledger"], - "accounts": [self.account] - }, - lambda message: self.notify_ledger(message["result"]) - ) - self.client.on("ledgerClosed", self.notify_ledger) - self.client.on("transaction", self.on_transaction) - - # Look up our balance right away - self.client.request({ - "command": "account_info", - "account": self.account, - "ledger_index": "validated" - }, - self.notify_account - ) - # Look up our transaction history - self.client.request({ - "command": "account_tx", - "account": self.account - }, - self.notify_account_tx - ) - # Start looping through messages received. This runs indefinitely. - self.client.run_forever() - class AutoGridBagSizer(wx.GridBagSizer): """ Helper class for adding a bunch of items uniformly to a GridBagSizer. @@ -344,13 +215,10 @@ class TWaXLFrame(wx.Frame): exit(1) # Attach handlers and start bg thread for updates from the ledger ------ - self.Bind(wx.EVT_BUTTON, self.send_xrp, source=self.sxb) - self.Bind(EVT_NEW_LEDGER, self.update_ledger) - self.Bind(EVT_ACCT_INFO, self.update_account) - self.Bind(EVT_ACCT_TX, self.update_account_tx) - self.Bind(EVT_TX_SUB, self.add_tx_from_sub) - self.worker = XRPLMonitorThread(url, self, self.classic_address) - self.worker.start() + # self.Bind(wx.EVT_BUTTON, self.send_xrp, source=self.sxb) + AsyncBind(wx.EVT_BUTTON, self.send_xrp, self.sxb) + self.url = url + StartCoroutine(self.monitor_xrpl, self) def toggle_dialog_style(self, event): """ @@ -404,12 +272,47 @@ class TWaXLFrame(wx.Frame): self.st_classic_address.SetLabel(self.classic_address) self.st_x_address.SetLabel(self.xaddress) - def update_ledger(self, event): + async def monitor_xrpl(self): + """ + Coroutine to set up XRPL API subscriptions & handle incoming messages, + without making the GUI non-responsive while it waits for the network. + """ + async with xrpl.asyncio.clients.AsyncWebsocketClient(self.url) as self.client: + response = await self.client.request(xrpl.models.requests.Subscribe( + streams=["ledger"], + accounts=[self.classic_address] + )) + # The immediate response contains details for the last validated ledger + self.update_ledger(response.result) + + # Get starting values for account info, account transaction history + response = await self.client.request(xrpl.models.requests.AccountInfo( + account=self.classic_address, + ledger_index="validated" + )) + self.update_account(response.result["account_data"]) + response = await self.client.request(xrpl.models.requests.AccountTx( + account=self.classic_address + )) + self.update_account_tx(response.result) + + async for message in self.client: + mtype = message.get("type") + if mtype == "ledgerClosed": + self.update_ledger(message) + elif mtype == "transaction": + self.add_tx_from_sub(message) + response = await self.client.request(xrpl.models.requests.AccountInfo( + account=self.classic_address, + ledger_index=message["ledger_index"] + )) + self.update_account(response.result["account_data"]) + + def update_ledger(self, message): """ Process a ledger subscription message to update the UI with information about the latest validated ledger. """ - message = event.data close_time_iso = xrpl.utils.ripple_time_to_datetime(message["ledger_time"]).isoformat() self.ledger_info.SetLabel(f"Latest validated ledger:\n" f"Ledger Index: {message['ledger_index']}\n" @@ -431,12 +334,11 @@ class TWaXLFrame(wx.Frame): reserve_xrp = xrpl.utils.drops_to_xrp(str(reserve_drops)) return reserve_xrp - def update_account(self, event): + def update_account(self, acct): """ Process an account_info response to update the account info area of the UI. """ - acct = event.data["account_data"] xrp_balance = str(xrpl.utils.drops_to_xrp(acct["Balance"])) self.st_xrp_balance.SetLabel(xrp_balance) @@ -504,18 +406,18 @@ class TWaXLFrame(wx.Frame): else: self.tx_list.AppendItem(cols) - def update_account_tx(self, event): + def update_account_tx(self, data): """ Update the transaction history tab with information from an account_tx response. """ - txs = event.data["transactions"] + txs = data["transactions"] # TODO: with pagination, we should leave existing items self.tx_list.DeleteAllItems() for t in txs: self.add_tx_row(t) - def add_tx_from_sub(self, event): + def add_tx_from_sub(self, t): """ Add 1 transaction to the history based on a subscription stream message. Assumes only validated transaction streams (e.g. transactions, accounts) @@ -523,7 +425,6 @@ class TWaXLFrame(wx.Frame): Also send a notification to the user about it. """ - t = event.data # Convert to same format as account_tx results t["tx"] = t["transaction"] if t["tx"]["hash"] in self.pending_tx_rows.keys(): @@ -566,7 +467,7 @@ class TWaXLFrame(wx.Frame): self.tx_list.PrependItem(cols) self.pending_tx_rows[tx_hash] = self.tx_list.RowToItem(0) - def send_xrp(self, event): + async def send_xrp(self, event): """ Pop up a dialog for the user to input how much XRP to send where, and send the transaction (if the user doesn't cancel). @@ -582,21 +483,10 @@ class TWaXLFrame(wx.Frame): return paydata = dlg.GetPaymentData() - - # TODO: can we safely autofill with the client in another thread?? - - tx = { - "TransactionType": "Payment", - "Account": self.classic_address, - "Sequence": self.wallet.sequence, - "Destination": paydata["to"], - "Amount": xrpl.utils.xrp_to_drops(paydata["amt"]), - "Fee": "10", - #TODO: LLS - "Flags": 0 - } dtag = paydata.get("dtag") - if dtag is not None and dtag != "": + if dtag.strip() == "": + dtag = None + if dtag is not None: try: dtag = int(dtag) if dtag < 0 or dtag > 2**32-1: @@ -605,17 +495,17 @@ class TWaXLFrame(wx.Frame): print("Invalid destination tag:", e) print("Canceled sending payment.") return - tx["DestinationTag"] = dtag - txm = xrpl.models.transactions.transaction.Transaction.from_xrpl(tx) - signed_tx = xrpl.transaction.safe_sign_transaction(txm, self.wallet) - tx_blob = xrpl.core.binarycodec.encode(signed_tx.to_xrpl()) - req = { - "command": "submit", - "tx_blob": tx_blob - } - nop = lambda x: x # TODO: actually handle response from sending - self.worker.client.jobq.put( (req, nop) ) - self.add_pending_tx(signed_tx) + + tx = xrpl.models.transactions.Payment( + account=self.classic_address, + sequence=self.wallet.sequence, + destination=paydata["to"], + amount=xrpl.utils.xrp_to_drops(paydata["amt"]), + destination_tag=dtag + ) + tx_signed = await xrpl.asyncio.transaction.safe_sign_and_autofill_transaction(tx, self.wallet, self.client) + self.add_pending_tx(tx_signed) + await xrpl.asyncio.transaction.submit_transaction(tx_signed, self.client) if __name__ == "__main__": @@ -623,7 +513,8 @@ if __name__ == "__main__": #JSON_RPC_URL = "http://localhost:5005/" WS_URL = "wss://s.altnet.rippletest.net:51233" - app = wx.App() + app = WxAsyncApp() frame = TWaXLFrame(WS_URL) frame.Show() - app.MainLoop() + loop = asyncio.events.get_event_loop() + loop.run_until_complete(app.MainLoop())