Build wallet: start migrating to async

This commit is contained in:
mDuo13
2021-12-20 20:57:47 -08:00
parent 06218c1b97
commit 232b5284a5

View File

@@ -1,146 +1,17 @@
# "Build a Wallet" tutorial, step 5: Send XRP button.
# This step finally introduces events from the GUI to the worker thread.
import re
# "Build a Wallet" tutorial, step 6: Verification and Polish
# TODO: description
import xrpl
import asyncio
import re
import wx
import wx.lib.newevent
import wx.dataview
import wx.adv
from wxasync import AsyncBind, WxAsyncApp, StartCoroutine
from threading import Thread
from decimal import Decimal
from queue import Queue, Empty
class WSResponseError(Exception):
pass
WSC_TIMEOUT = 0.2
class SmartWSClient(xrpl.clients.WebsocketClient):
def __init__(self, *args, **kwargs):
self._handlers = {}
self._pending_requests = {}
self._id = 0
self.jobq = Queue() # for incoming UI events
super().__init__(*args, **kwargs, timeout=WSC_TIMEOUT)
def on(self, event_type, callback):
"""
Map a callback function to a type of event message from the connected
server. Only supports one callback function per event type.
"""
self._handlers[event_type] = callback
def request(self, req_dict, callback):
if "id" not in req_dict:
req_dict["id"] = f"__auto_{self._id}"
self._id += 1
# Work around xrpl-py quirk where it won't let you instantiate a request
# in proper WebSocket format because WS uses "command" instead of
# "method" but xrpl-py checks for "method":
req_dict["method"] = req_dict["command"]
del req_dict["command"]
req = xrpl.models.requests.request.Request.from_xrpl(req_dict)
req.validate()
self._pending_requests[req.id] = callback
self.send(req)
def run_forever(self):
while True:
try:
req, callback = self.jobq.get(block=False)
self.request(req, callback)
except Empty:
pass
for message in self:
if message.get("type") == "response":
if message.get("status") == "success":
del message["status"]
else:
raise WSResponseError("Unsuccessful response:", message)
msg_id = message.get("id")
if msg_id in self._pending_requests:
self._pending_requests[msg_id](message)
del self._pending_requests[msg_id]
else:
raise WSResponseError("Response to unknown request:", message)
elif message.get("type") in self._handlers:
self._handlers[message.get("type")](message)
# Set up event types to pass info from the worker thread to the main UI thread
GotNewLedger, EVT_NEW_LEDGER = wx.lib.newevent.NewEvent()
GotAccountInfo, EVT_ACCT_INFO = wx.lib.newevent.NewEvent()
GotAccountTx, EVT_ACCT_TX = wx.lib.newevent.NewEvent()
GotTxSub, EVT_TX_SUB = wx.lib.newevent.NewEvent()
class XRPLMonitorThread(Thread):
"""
A worker thread to watch for new ledger events and pass the info back to
the main frame to be shown in the UI. Using a thread lets us maintain the
responsiveness of the UI while doing work in the background.
"""
def __init__(self, ws_url, notify_window, classic_address):
Thread.__init__(self, daemon=True)
self.notify_window = notify_window
self.ws_url = ws_url
self.account = classic_address
self.client = SmartWSClient(self.ws_url)
def notify_ledger(self, message):
wx.QueueEvent(self.notify_window, GotNewLedger(data=message))
def notify_account(self, message):
wx.QueueEvent(self.notify_window, GotAccountInfo(data=message["result"]))
def notify_account_tx(self, message):
wx.QueueEvent(self.notify_window, GotAccountTx(data=message["result"]))
def on_transaction(self, message):
"""
Update our account history and re-check our balance whenever a new
transaction touches our account.
"""
self.client.request({
"command": "account_info",
"account": self.account,
"ledger_index": message["ledger_index"]
}, self.notify_account)
wx.QueueEvent(self.notify_window, GotTxSub(data=message))
def run(self):
self.client.open()
# Subscribe to ledger updates
self.client.request({
"command": "subscribe",
"streams": ["ledger"],
"accounts": [self.account]
},
lambda message: self.notify_ledger(message["result"])
)
self.client.on("ledgerClosed", self.notify_ledger)
self.client.on("transaction", self.on_transaction)
# Look up our balance right away
self.client.request({
"command": "account_info",
"account": self.account,
"ledger_index": "validated"
},
self.notify_account
)
# Look up our transaction history
self.client.request({
"command": "account_tx",
"account": self.account
},
self.notify_account_tx
)
# Start looping through messages received. This runs indefinitely.
self.client.run_forever()
class AutoGridBagSizer(wx.GridBagSizer):
"""
Helper class for adding a bunch of items uniformly to a GridBagSizer.
@@ -344,13 +215,10 @@ class TWaXLFrame(wx.Frame):
exit(1)
# Attach handlers and start bg thread for updates from the ledger ------
self.Bind(wx.EVT_BUTTON, self.send_xrp, source=self.sxb)
self.Bind(EVT_NEW_LEDGER, self.update_ledger)
self.Bind(EVT_ACCT_INFO, self.update_account)
self.Bind(EVT_ACCT_TX, self.update_account_tx)
self.Bind(EVT_TX_SUB, self.add_tx_from_sub)
self.worker = XRPLMonitorThread(url, self, self.classic_address)
self.worker.start()
# self.Bind(wx.EVT_BUTTON, self.send_xrp, source=self.sxb)
AsyncBind(wx.EVT_BUTTON, self.send_xrp, self.sxb)
self.url = url
StartCoroutine(self.monitor_xrpl, self)
def toggle_dialog_style(self, event):
"""
@@ -404,12 +272,47 @@ class TWaXLFrame(wx.Frame):
self.st_classic_address.SetLabel(self.classic_address)
self.st_x_address.SetLabel(self.xaddress)
def update_ledger(self, event):
async def monitor_xrpl(self):
"""
Coroutine to set up XRPL API subscriptions & handle incoming messages,
without making the GUI non-responsive while it waits for the network.
"""
async with xrpl.asyncio.clients.AsyncWebsocketClient(self.url) as self.client:
response = await self.client.request(xrpl.models.requests.Subscribe(
streams=["ledger"],
accounts=[self.classic_address]
))
# The immediate response contains details for the last validated ledger
self.update_ledger(response.result)
# Get starting values for account info, account transaction history
response = await self.client.request(xrpl.models.requests.AccountInfo(
account=self.classic_address,
ledger_index="validated"
))
self.update_account(response.result["account_data"])
response = await self.client.request(xrpl.models.requests.AccountTx(
account=self.classic_address
))
self.update_account_tx(response.result)
async for message in self.client:
mtype = message.get("type")
if mtype == "ledgerClosed":
self.update_ledger(message)
elif mtype == "transaction":
self.add_tx_from_sub(message)
response = await self.client.request(xrpl.models.requests.AccountInfo(
account=self.classic_address,
ledger_index=message["ledger_index"]
))
self.update_account(response.result["account_data"])
def update_ledger(self, message):
"""
Process a ledger subscription message to update the UI with
information about the latest validated ledger.
"""
message = event.data
close_time_iso = xrpl.utils.ripple_time_to_datetime(message["ledger_time"]).isoformat()
self.ledger_info.SetLabel(f"Latest validated ledger:\n"
f"Ledger Index: {message['ledger_index']}\n"
@@ -431,12 +334,11 @@ class TWaXLFrame(wx.Frame):
reserve_xrp = xrpl.utils.drops_to_xrp(str(reserve_drops))
return reserve_xrp
def update_account(self, event):
def update_account(self, acct):
"""
Process an account_info response to update the account info area of the
UI.
"""
acct = event.data["account_data"]
xrp_balance = str(xrpl.utils.drops_to_xrp(acct["Balance"]))
self.st_xrp_balance.SetLabel(xrp_balance)
@@ -504,18 +406,18 @@ class TWaXLFrame(wx.Frame):
else:
self.tx_list.AppendItem(cols)
def update_account_tx(self, event):
def update_account_tx(self, data):
"""
Update the transaction history tab with information from an account_tx
response.
"""
txs = event.data["transactions"]
txs = data["transactions"]
# TODO: with pagination, we should leave existing items
self.tx_list.DeleteAllItems()
for t in txs:
self.add_tx_row(t)
def add_tx_from_sub(self, event):
def add_tx_from_sub(self, t):
"""
Add 1 transaction to the history based on a subscription stream message.
Assumes only validated transaction streams (e.g. transactions, accounts)
@@ -523,7 +425,6 @@ class TWaXLFrame(wx.Frame):
Also send a notification to the user about it.
"""
t = event.data
# Convert to same format as account_tx results
t["tx"] = t["transaction"]
if t["tx"]["hash"] in self.pending_tx_rows.keys():
@@ -566,7 +467,7 @@ class TWaXLFrame(wx.Frame):
self.tx_list.PrependItem(cols)
self.pending_tx_rows[tx_hash] = self.tx_list.RowToItem(0)
def send_xrp(self, event):
async def send_xrp(self, event):
"""
Pop up a dialog for the user to input how much XRP to send where, and
send the transaction (if the user doesn't cancel).
@@ -582,21 +483,10 @@ class TWaXLFrame(wx.Frame):
return
paydata = dlg.GetPaymentData()
# TODO: can we safely autofill with the client in another thread??
tx = {
"TransactionType": "Payment",
"Account": self.classic_address,
"Sequence": self.wallet.sequence,
"Destination": paydata["to"],
"Amount": xrpl.utils.xrp_to_drops(paydata["amt"]),
"Fee": "10",
#TODO: LLS
"Flags": 0
}
dtag = paydata.get("dtag")
if dtag is not None and dtag != "":
if dtag.strip() == "":
dtag = None
if dtag is not None:
try:
dtag = int(dtag)
if dtag < 0 or dtag > 2**32-1:
@@ -605,17 +495,17 @@ class TWaXLFrame(wx.Frame):
print("Invalid destination tag:", e)
print("Canceled sending payment.")
return
tx["DestinationTag"] = dtag
txm = xrpl.models.transactions.transaction.Transaction.from_xrpl(tx)
signed_tx = xrpl.transaction.safe_sign_transaction(txm, self.wallet)
tx_blob = xrpl.core.binarycodec.encode(signed_tx.to_xrpl())
req = {
"command": "submit",
"tx_blob": tx_blob
}
nop = lambda x: x # TODO: actually handle response from sending
self.worker.client.jobq.put( (req, nop) )
self.add_pending_tx(signed_tx)
tx = xrpl.models.transactions.Payment(
account=self.classic_address,
sequence=self.wallet.sequence,
destination=paydata["to"],
amount=xrpl.utils.xrp_to_drops(paydata["amt"]),
destination_tag=dtag
)
tx_signed = await xrpl.asyncio.transaction.safe_sign_and_autofill_transaction(tx, self.wallet, self.client)
self.add_pending_tx(tx_signed)
await xrpl.asyncio.transaction.submit_transaction(tx_signed, self.client)
if __name__ == "__main__":
@@ -623,7 +513,8 @@ if __name__ == "__main__":
#JSON_RPC_URL = "http://localhost:5005/"
WS_URL = "wss://s.altnet.rippletest.net:51233"
app = wx.App()
app = WxAsyncApp()
frame = TWaXLFrame(WS_URL)
frame.Show()
app.MainLoop()
loop = asyncio.events.get_event_loop()
loop.run_until_complete(app.MainLoop())