Files
rippled/bin/ripple/ledger/RippledReader.py
Tom Ritchford 9160b46c1e Bug fixes and new features for LedgerTool:
* Fix RIPD-509, RIPD-514, RIPD-519, RIPD-525, RIPD-527, RIPD-529,
  RIPD-530 and RIPD-531.
* Protect people from ledger-spew and remove cruft.
* Better error messages and handling.
* Cache command lists or clears ledger cache.
* Better ledger summaries.
* Offline mode.
2014-08-27 18:05:44 -04:00

69 lines
2.1 KiB
Python

from __future__ import absolute_import, division, print_function, unicode_literals
import json
import os
import subprocess
from ripple.ledger.Args import ARGS
from ripple.util import File
from ripple.util import Log
from ripple.util import Range
_ERROR_CODE_REASON = {
62: 'No rippled server is running.',
}
_ERROR_TEXT = {
'lgrNotFound': 'The ledger you requested was not found.',
'noCurrent': 'The server has no current ledger.',
'noNetwork': 'The server did not respond to your request.',
}
_DEFAULT_ERROR_ = "Couldn't connect to server."
class RippledReader(object):
def __init__(self):
fname = File.normalize(ARGS.rippled)
if not os.path.exists(fname):
raise Exception('No rippled found at %s.' % fname)
self.cmd = [fname]
if ARGS.config:
self.cmd.extend(['--conf', _normalize(ARGS.config)])
self.info = self._command('server_info')['info']
c = self.info.get('complete_ledgers')
if c == 'empty':
self.complete = []
else:
self.complete = sorted(Range.from_string(c))
def name_to_ledger_index(self, ledger_name, is_full=False):
return self.get_ledger(ledger_name, is_full)['ledger_index']
def get_ledger(self, name, is_full=False):
cmd = ['ledger', str(name)]
if is_full:
cmd.append('full')
response = self._command(*cmd)
result = response.get('ledger')
if result:
return result
error = response['error']
etext = _ERROR_TEXT.get(error)
if etext:
error = '%s (%s)' % (etext, error)
Log.fatal(_ERROR_TEXT.get(error, error))
def _command(self, *cmds):
cmd = self.cmd + list(cmds)
try:
data = subprocess.check_output(cmd, stderr=subprocess.PIPE)
except subprocess.CalledProcessError as e:
raise Exception(_ERROR_CODE_REASON.get(
e.returncode, _DEFAULT_ERROR_))
part = json.loads(data)
try:
return part['result']
except:
raise ValueError(part.get('error', 'unknown error'))