Move Python code to its own directory.

This commit is contained in:
Tom Ritchford
2015-04-16 17:51:06 -04:00
committed by seelabs
parent 0dd6b95ac2
commit adf4860988
52 changed files with 1 additions and 1 deletions

View File

@@ -0,0 +1,40 @@
from __future__ import absolute_import, division, print_function, unicode_literals
from collections import defaultdict
class Cache(object):
def __init__(self):
self._value_to_index = {}
self._index_to_value = []
def value_to_index(self, value, **kwds):
index = self._value_to_index.get(value, None)
if index is None:
index = len(self._index_to_value)
self._index_to_value.append((value, kwds))
self._value_to_index[value] = index
return index
def index_to_value(self, index):
return self._index_to_value[index]
def NamedCache():
return defaultdict(Cache)
def cache_by_key(d, keyfunc=None, exclude=None):
cache = defaultdict(Cache)
exclude = exclude or None
keyfunc = keyfunc or (lambda x: x)
def visit(item):
if isinstance(item, list):
for i, x in enumerate(item):
item[i] = visit(x)
elif isinstance(item, dict):
for k, v in item.items():
item[k] = visit(v)
return item
return cache

View File

@@ -0,0 +1,77 @@
from __future__ import absolute_import, division, print_function, unicode_literals
# Code taken from github/rec/grit.
import os
import sys
from collections import namedtuple
from ripple.ledger.Args import ARGS
from ripple.util import Log
Command = namedtuple('Command', 'function help safe')
def make_command(module):
name = module.__name__.split('.')[-1].lower()
return name, Command(getattr(module, name, None) or
getattr(module, 'run_' + name),
getattr(module, 'HELP'),
getattr(module, 'SAFE', False))
class CommandList(object):
def __init__(self, *args, **kwds):
self.registry = {}
self.register(*args, **kwds)
def register(self, *modules, **kwds):
for module in modules:
name, command = make_command(module)
self.registry[name] = command
for k, v in kwds.items():
if not isinstance(v, (list, tuple)):
v = [v]
self.register_one(k, *v)
def keys(self):
return self.registry.keys()
def register_one(self, name, function, help='', safe=False):
assert name not in self.registry
self.registry[name] = Command(function, help, safe)
def _get(self, command):
command = command.lower()
c = self.registry.get(command)
if c:
return command, c
commands = [c for c in self.registry if c.startswith(command)]
if len(commands) == 1:
command = commands[0]
return command, self.registry[command]
if not commands:
raise ValueError('No such command: %s. Commands are %s.' %
(command, ', '.join(sorted(self.registry))))
if len(commands) > 1:
raise ValueError('Command %s was ambiguous: %s.' %
(command, ', '.join(commands)))
def get(self, command):
return self._get(command)[1]
def run(self, command, *args):
return self.get(command).function(*args)
def run_safe(self, command, *args):
name, cmd = self._get(command)
if not (ARGS.yes or cmd.safe):
confirm = raw_input('OK to execute "rl %s %s"? (y/N) ' %
(name, ' '.join(args)))
if not confirm.lower().startswith('y'):
Log.error('Cancelled.')
return
cmd.function(*args)
def help(self, command):
return self.get(command).help()

View File

@@ -0,0 +1,54 @@
from __future__ import absolute_import, division, print_function, unicode_literals
import json
"""Ripple has a proprietary format for their .cfg files, so we need a reader for
them."""
def read(lines):
sections = []
section = []
for line in lines:
line = line.strip()
if (not line) or line[0] == '#':
continue
if line.startswith('['):
if section:
sections.append(section)
section = []
section.append(line)
if section:
sections.append(section)
result = {}
for section in sections:
option = section.pop(0)
assert section, ('No value for option "%s".' % option)
assert option.startswith('[') and option.endswith(']'), (
'No option name in block "%s"' % p[0])
option = option[1:-1]
assert option not in result, 'Duplicate option "%s".' % option
subdict = {}
items = []
for part in section:
if '=' in part:
assert not items, 'Dictionary mixed with list.'
k, v = part.split('=', 1)
assert k not in subdict, 'Repeated dictionary entry ' + k
subdict[k] = v
else:
assert not subdict, 'List mixed with dictionary.'
if part.startswith('{'):
items.append(json.loads(part))
else:
words = part.split()
if len(words) > 1:
items.append(words)
else:
items.append(part)
if len(items) == 1:
result[option] = items[0]
else:
result[option] = items or subdict
return result

View File

@@ -0,0 +1,12 @@
from __future__ import absolute_import, division, print_function, unicode_literals
import sqlite3
def fetchall(database, query, kwds):
conn = sqlite3.connect(database)
try:
cursor = conn.execute(query, kwds)
return cursor.fetchall()
finally:
conn.close()

View File

@@ -0,0 +1,46 @@
from __future__ import absolute_import, division, print_function, unicode_literals
"""Fixed point numbers."""
POSITIONS = 10
POSITIONS_SHIFT = 10 ** POSITIONS
class Decimal(object):
def __init__(self, desc='0'):
if isinstance(desc, int):
self.value = desc
return
if desc.startswith('-'):
sign = -1
desc = desc[1:]
else:
sign = 1
parts = desc.split('.')
if len(parts) == 1:
parts.append('0')
elif len(parts) > 2:
raise Exception('Too many decimals in "%s"' % desc)
number, decimal = parts
# Fix the number of positions.
decimal = (decimal + POSITIONS * '0')[:POSITIONS]
self.value = sign * int(number + decimal)
def accumulate(self, item):
if not isinstance(item, Decimal):
item = Decimal(item)
self.value += item.value
def __str__(self):
if self.value >= 0:
sign = ''
value = self.value
else:
sign = '-'
value = -self.value
number = value // POSITIONS_SHIFT
decimal = (value % POSITIONS_SHIFT) * POSITIONS_SHIFT
if decimal:
return '%s%s.%s' % (sign, number, str(decimal).rstrip('0'))
else:
return '%s%s' % (sign, number)

View File

@@ -0,0 +1,33 @@
from __future__ import absolute_import, division, print_function, unicode_literals
def count_all_subitems(x):
"""Count the subitems of a Python object, including the object itself."""
if isinstance(x, list):
return 1 + sum(count_all_subitems(i) for i in x)
if isinstance(x, dict):
return 1 + sum(count_all_subitems(i) for i in x.itervalues())
return 1
def prune(item, level, count_recursively=True):
def subitems(x):
i = count_all_subitems(x) - 1 if count_recursively else len(x)
return '1 subitem' if i == 1 else '%d subitems' % i
assert level >= 0
if not item:
return item
if isinstance(item, list):
if level:
return [prune(i, level - 1, count_recursively) for i in item]
else:
return '[list with %s]' % subitems(item)
if isinstance(item, dict):
if level:
return dict((k, prune(v, level - 1, count_recursively))
for k, v in item.iteritems())
else:
return '{dict with %s}' % subitems(item)
return item

View File

@@ -0,0 +1,7 @@
from __future__ import absolute_import, division, print_function, unicode_literals
import os
def normalize(f):
f = os.path.join(*f.split('/')) # For Windows users.
return os.path.abspath(os.path.expanduser(f))

View File

@@ -0,0 +1,56 @@
from __future__ import absolute_import, division, print_function, unicode_literals
import gzip
import json
import os
_NONE = object()
class FileCache(object):
"""A two-level cache, which stores expensive results in memory and on disk.
"""
def __init__(self, cache_directory, creator, open=gzip.open, suffix='.gz'):
self.cache_directory = cache_directory
self.creator = creator
self.open = open
self.suffix = suffix
self.cached_data = {}
if not os.path.exists(self.cache_directory):
os.makedirs(self.cache_directory)
def get_file_data(self, name):
if os.path.exists(filename):
return json.load(self.open(filename))
result = self.creator(name)
return result
def get_data(self, name, save_in_cache, can_create, default=None):
name = str(name)
result = self.cached_data.get(name, _NONE)
if result is _NONE:
filename = os.path.join(self.cache_directory, name) + self.suffix
if os.path.exists(filename):
result = json.load(self.open(filename)) or _NONE
if result is _NONE and can_create:
result = self.creator(name)
if save_in_cache:
json.dump(result, self.open(filename, 'w'))
return default if result is _NONE else result
def _files(self):
return os.listdir(self.cache_directory)
def cache_list(self):
for f in self._files():
if f.endswith(self.suffix):
yield f[:-len(self.suffix)]
def file_count(self):
return len(self._files())
def clear(self):
"""Clears both local files and memory."""
self.cached_data = {}
for f in self._files():
os.remove(os.path.join(self.cache_directory, f))

View File

@@ -0,0 +1,82 @@
from __future__ import absolute_import, division, print_function, unicode_literals
"""A function that can be specified at the command line, with an argument."""
import importlib
import re
import tokenize
from StringIO import StringIO
MATCHER = re.compile(r'([\w.]+)(.*)')
REMAPPINGS = {
'false': False,
'true': True,
'null': None,
'False': False,
'True': True,
'None': None,
}
def eval_arguments(args):
args = args.strip()
if not args or (args == '()'):
return ()
tokens = list(tokenize.generate_tokens(StringIO(args).readline))
def remap():
for type, name, _, _, _ in tokens:
if type == tokenize.NAME and name not in REMAPPINGS:
yield tokenize.STRING, '"%s"' % name
else:
yield type, name
untok = tokenize.untokenize(remap())
if untok[1:-1].strip():
untok = untok[:-1] + ',)' # Force a tuple.
try:
return eval(untok, REMAPPINGS)
except Exception as e:
raise ValueError('Couldn\'t evaluate expression "%s" (became "%s"), '
'error "%s"' % (args, untok, str(e)))
class Function(object):
def __init__(self, desc='', default_path=''):
self.desc = desc.strip()
if not self.desc:
# Make an empty function that does nothing.
self.args = ()
self.function = lambda *args, **kwds: None
return
m = MATCHER.match(desc)
if not m:
raise ValueError('"%s" is not a function' % desc)
self.function, self.args = (g.strip() for g in m.groups())
self.args = eval_arguments(self.args)
if '.' not in self.function:
if default_path and not default_path.endswith('.'):
default_path += '.'
self.function = default_path + self.function
p, m = self.function.rsplit('.', 1)
mod = importlib.import_module(p)
# Errors in modules are swallowed here.
# except:
# raise ValueError('Can\'t find Python module "%s"' % p)
try:
self.function = getattr(mod, m)
except:
raise ValueError('No function "%s" in module "%s"' % (m, p))
def __str__(self):
return self.desc
def __call__(self, *args, **kwds):
return self.function(*(args + self.args), **kwds)
def __eq__(self, other):
return self.function == other.function and self.args == other.args
def __ne__(self, other):
return not (self == other)

View File

@@ -0,0 +1,21 @@
from __future__ import absolute_import, division, print_function, unicode_literals
import sys
VERBOSE = False
def out(*args, **kwds):
kwds.get('print', print)(*args, file=sys.stdout, **kwds)
def info(*args, **kwds):
if VERBOSE:
out(*args, **kwds)
def warn(*args, **kwds):
out('WARNING:', *args, **kwds)
def error(*args, **kwds):
out('ERROR:', *args, **kwds)
def fatal(*args, **kwds):
raise Exception('FATAL: ' + ' '.join(str(a) for a in args))

View File

@@ -0,0 +1,42 @@
from __future__ import absolute_import, division, print_function, unicode_literals
from functools import wraps
import json
SEPARATORS = ',', ': '
INDENT = ' '
def pretty_print(item):
return json.dumps(item,
sort_keys=True,
indent=len(INDENT),
separators=SEPARATORS)
class Streamer(object):
def __init__(self, printer=print):
# No automatic spacing or carriage returns.
self.printer = lambda *args: printer(*args, end='', sep='')
self.first_key = True
def add(self, key, value):
if self.first_key:
self.first_key = False
self.printer('{')
else:
self.printer(',')
self.printer('\n', INDENT, '"', str(key), '": ')
pp = pretty_print(value).splitlines()
if len(pp) > 1:
for i, line in enumerate(pp):
if i > 0:
self.printer('\n', INDENT)
self.printer(line)
else:
self.printer(pp[0])
def finish(self):
if not self.first_key:
self.first_key = True
self.printer('\n}')

View File

@@ -0,0 +1,53 @@
from __future__ import absolute_import, division, print_function, unicode_literals
"""
Convert a discontiguous range of integers to and from a human-friendly form.
Real world example is the server_info.complete_ledgers:
8252899-8403772,8403824,8403827-8403830,8403834-8403876
"""
def from_string(desc, **aliases):
if not desc:
return []
result = set()
for d in desc.split(','):
nums = [int(aliases.get(x) or x) for x in d.split('-')]
if len(nums) == 1:
result.add(nums[0])
elif len(nums) == 2:
result.update(range(nums[0], nums[1] + 1))
return result
def to_string(r):
groups = []
next_group = []
for i, x in enumerate(sorted(r)):
if next_group and (x - next_group[-1]) > 1:
groups.append(next_group)
next_group = []
next_group.append(x)
if next_group:
groups.append(next_group)
def display(g):
if len(g) == 1:
return str(g[0])
else:
return '%s-%s' % (g[0], g[-1])
return ','.join(display(g) for g in groups)
def is_range(desc, *names):
try:
from_string(desc, **dict((n, 1) for n in names))
return True;
except ValueError:
return False
def join_ranges(*ranges, **aliases):
result = set()
for r in ranges:
result.update(from_string(r, **aliases))
return result

View File

@@ -0,0 +1,46 @@
from __future__ import absolute_import, division, print_function, unicode_literals
FIRST, LAST = range(2)
def binary_search(begin, end, condition, location=FIRST):
"""Search for an i in the interval [begin, end] where condition(i) is true.
If location is FIRST, return the first such i.
If location is LAST, return the last such i.
If there is no such i, then throw an exception.
"""
b = condition(begin)
e = condition(end)
if b and e:
return begin if location == FIRST else end
if not (b or e):
raise ValueError('%d/%d' % (begin, end))
if b and location is FIRST:
return begin
if e and location is LAST:
return end
width = end - begin + 1
if width == 1:
if not b:
raise ValueError('%d/%d' % (begin, end))
return begin
if width == 2:
return begin if b else end
mid = (begin + end) // 2
m = condition(mid)
if m == b:
return binary_search(mid, end, condition, location)
else:
return binary_search(begin, mid, condition, location)
def linear_search(items, condition):
"""Yields each i in the interval [begin, end] where condition(i) is true.
"""
for i in items:
if condition(i):
yield i

View File

@@ -0,0 +1,21 @@
from __future__ import absolute_import, division, print_function, unicode_literals
import datetime
# Format for human-readable dates in rippled
_DATE_FORMAT = '%Y-%b-%d'
_TIME_FORMAT = '%H:%M:%S'
_DATETIME_FORMAT = '%s %s' % (_DATE_FORMAT, _TIME_FORMAT)
_FORMATS = _DATE_FORMAT, _TIME_FORMAT, _DATETIME_FORMAT
def parse_datetime(desc):
for fmt in _FORMATS:
try:
return datetime.date.strptime(desc, fmt)
except:
pass
raise ValueError("Can't understand date '%s'." % date)
def format_datetime(dt):
return dt.strftime(_DATETIME_FORMAT)

View File

View File

@@ -0,0 +1,12 @@
from __future__ import absolute_import, division, print_function, unicode_literals
from ripple.util.Cache import NamedCache
from unittest import TestCase
class test_Cache(TestCase):
def setUp(self):
self.cache = NamedCache()
def test_trivial(self):
pass

View File

@@ -0,0 +1,163 @@
from __future__ import absolute_import, division, print_function, unicode_literals
from ripple.util import ConfigFile
from unittest import TestCase
class test_ConfigFile(TestCase):
def test_trivial(self):
self.assertEquals(ConfigFile.read(''), {})
def test_full(self):
self.assertEquals(ConfigFile.read(FULL.splitlines()), RESULT)
RESULT = {
'websocket_port': '6206',
'database_path': '/development/alpha/db',
'sntp_servers':
['time.windows.com', 'time.apple.com', 'time.nist.gov', 'pool.ntp.org'],
'validation_seed': 'sh1T8T9yGuV7Jb6DPhqSzdU2s5LcV',
'node_size': 'medium',
'rpc_startup': {
'command': 'log_level',
'severity': 'debug'},
'ips': ['r.ripple.com', '51235'],
'node_db': {
'file_size_mult': '2',
'file_size_mb': '8',
'cache_mb': '256',
'path': '/development/alpha/db/rocksdb',
'open_files': '2000',
'type': 'RocksDB',
'filter_bits': '12'},
'peer_port': '53235',
'ledger_history': 'full',
'rpc_ip': '127.0.0.1',
'websocket_public_ip': '0.0.0.0',
'rpc_allow_remote': '0',
'validators':
[['n949f75evCHwgyP4fPVgaHqNHxUVN15PsJEZ3B3HnXPcPjcZAoy7', 'RL1'],
['n9MD5h24qrQqiyBC8aeqqCWvpiBiYQ3jxSr91uiDvmrkyHRdYLUj', 'RL2'],
['n9L81uNCaPgtUJfaHh89gmdvXKAmSt5Gdsw2g1iPWaPkAHW5Nm4C', 'RL3'],
['n9KiYM9CgngLvtRCQHZwgC2gjpdaZcCcbt3VboxiNFcKuwFVujzS', 'RL4'],
['n9LdgEtkmGB9E2h3K4Vp7iGUaKuq23Zr32ehxiU8FWY7xoxbWTSA', 'RL5']],
'debug_logfile': '/development/alpha/debug.log',
'websocket_public_port': '5206',
'peer_ip': '0.0.0.0',
'rpc_port': '5205',
'validation_quorum': '3',
'websocket_ip': '127.0.0.1'}
FULL = """
[ledger_history]
full
# Allow other peers to connect to this server.
#
[peer_ip]
0.0.0.0
[peer_port]
53235
# Allow untrusted clients to connect to this server.
#
[websocket_public_ip]
0.0.0.0
[websocket_public_port]
5206
# Provide trusted websocket ADMIN access to the localhost.
#
[websocket_ip]
127.0.0.1
[websocket_port]
6206
# Provide trusted json-rpc ADMIN access to the localhost.
#
[rpc_ip]
127.0.0.1
[rpc_port]
5205
[rpc_allow_remote]
0
[node_size]
medium
# This is primary persistent datastore for rippled. This includes transaction
# metadata, account states, and ledger headers. Helpful information can be
# found here: https://ripple.com/wiki/NodeBackEnd
[node_db]
type=RocksDB
path=/development/alpha/db/rocksdb
open_files=2000
filter_bits=12
cache_mb=256
file_size_mb=8
file_size_mult=2
[database_path]
/development/alpha/db
# This needs to be an absolute directory reference, not a relative one.
# Modify this value as required.
[debug_logfile]
/development/alpha/debug.log
[sntp_servers]
time.windows.com
time.apple.com
time.nist.gov
pool.ntp.org
# Where to find some other servers speaking the Ripple protocol.
#
[ips]
r.ripple.com 51235
# The latest validators can be obtained from
# https://ripple.com/ripple.txt
#
[validators]
n949f75evCHwgyP4fPVgaHqNHxUVN15PsJEZ3B3HnXPcPjcZAoy7 RL1
n9MD5h24qrQqiyBC8aeqqCWvpiBiYQ3jxSr91uiDvmrkyHRdYLUj RL2
n9L81uNCaPgtUJfaHh89gmdvXKAmSt5Gdsw2g1iPWaPkAHW5Nm4C RL3
n9KiYM9CgngLvtRCQHZwgC2gjpdaZcCcbt3VboxiNFcKuwFVujzS RL4
n9LdgEtkmGB9E2h3K4Vp7iGUaKuq23Zr32ehxiU8FWY7xoxbWTSA RL5
# Ditto.
[validation_quorum]
3
[validation_seed]
sh1T8T9yGuV7Jb6DPhqSzdU2s5LcV
# Turn down default logging to save disk space in the long run.
# Valid values here are trace, debug, info, warning, error, and fatal
[rpc_startup]
{ "command": "log_level", "severity": "debug" }
# Configure SSL for WebSockets. Not enabled by default because not everybody
# has an SSL cert on their server, but if you uncomment the following lines and
# set the path to the SSL certificate and private key the WebSockets protocol
# will be protected by SSL/TLS.
#[websocket_secure]
#1
#[websocket_ssl_cert]
#/etc/ssl/certs/server.crt
#[websocket_ssl_key]
#/etc/ssl/private/server.key
# Defaults to 0 ("no") so that you can use self-signed SSL certificates for
# development, or internally.
#[ssl_verify]
#0
""".strip()

View File

@@ -0,0 +1,20 @@
from __future__ import absolute_import, division, print_function, unicode_literals
from ripple.util.Decimal import Decimal
from unittest import TestCase
class test_Decimal(TestCase):
def test_construct(self):
self.assertEquals(str(Decimal('')), '0')
self.assertEquals(str(Decimal('0')), '0')
self.assertEquals(str(Decimal('0.2')), '0.2')
self.assertEquals(str(Decimal('-0.2')), '-0.2')
self.assertEquals(str(Decimal('3.1416')), '3.1416')
def test_accumulate(self):
d = Decimal()
d.accumulate('0.5')
d.accumulate('3.1416')
d.accumulate('-23.34234')
self.assertEquals(str(d), '-19.70074')

View File

@@ -0,0 +1,56 @@
from __future__ import absolute_import, division, print_function, unicode_literals
from ripple.util import Dict
from unittest import TestCase
class test_Dict(TestCase):
def test_count_all_subitems(self):
self.assertEquals(Dict.count_all_subitems({}), 1)
self.assertEquals(Dict.count_all_subitems({'a': {}}), 2)
self.assertEquals(Dict.count_all_subitems([1]), 2)
self.assertEquals(Dict.count_all_subitems([1, 2]), 3)
self.assertEquals(Dict.count_all_subitems([1, {2: 3}]), 4)
self.assertEquals(Dict.count_all_subitems([1, {2: [3]}]), 5)
self.assertEquals(Dict.count_all_subitems([1, {2: [3, 4]}]), 6)
def test_prune(self):
self.assertEquals(Dict.prune({}, 0), {})
self.assertEquals(Dict.prune({}, 1), {})
self.assertEquals(Dict.prune({1: 2}, 0), '{dict with 1 subitem}')
self.assertEquals(Dict.prune({1: 2}, 1), {1: 2})
self.assertEquals(Dict.prune({1: 2}, 2), {1: 2})
self.assertEquals(Dict.prune([1, 2, 3], 0), '[list with 3 subitems]')
self.assertEquals(Dict.prune([1, 2, 3], 1), [1, 2, 3])
self.assertEquals(Dict.prune([{1: [2, 3]}], 0),
'[list with 4 subitems]')
self.assertEquals(Dict.prune([{1: [2, 3]}], 1),
['{dict with 3 subitems}'])
self.assertEquals(Dict.prune([{1: [2, 3]}], 2),
[{1: u'[list with 2 subitems]'}])
self.assertEquals(Dict.prune([{1: [2, 3]}], 3),
[{1: [2, 3]}])
def test_prune_nosub(self):
self.assertEquals(Dict.prune({}, 0, False), {})
self.assertEquals(Dict.prune({}, 1, False), {})
self.assertEquals(Dict.prune({1: 2}, 0, False), '{dict with 1 subitem}')
self.assertEquals(Dict.prune({1: 2}, 1, False), {1: 2})
self.assertEquals(Dict.prune({1: 2}, 2, False), {1: 2})
self.assertEquals(Dict.prune([1, 2, 3], 0, False),
'[list with 3 subitems]')
self.assertEquals(Dict.prune([1, 2, 3], 1, False), [1, 2, 3])
self.assertEquals(Dict.prune([{1: [2, 3]}], 0, False),
'[list with 1 subitem]')
self.assertEquals(Dict.prune([{1: [2, 3]}], 1, False),
['{dict with 1 subitem}'])
self.assertEquals(Dict.prune([{1: [2, 3]}], 2, False),
[{1: u'[list with 2 subitems]'}])
self.assertEquals(Dict.prune([{1: [2, 3]}], 3, False),
[{1: [2, 3]}])

View File

@@ -0,0 +1,37 @@
from __future__ import absolute_import, division, print_function, unicode_literals
from ripple.util.Function import Function, MATCHER
from unittest import TestCase
def FN(*args, **kwds):
return args, kwds
class test_Function(TestCase):
def match_test(self, item, *results):
self.assertEquals(MATCHER.match(item).groups(), results)
def test_simple(self):
self.match_test('function', 'function', '')
self.match_test('f(x)', 'f', '(x)')
def test_empty_function(self):
self.assertEquals(Function()(), None)
def test_empty_args(self):
f = Function('ripple.util.test_Function.FN()')
self.assertEquals(f(), ((), {}))
def test_function(self):
f = Function('ripple.util.test_Function.FN(True, {1: 2}, None)')
self.assertEquals(f(), ((True, {1: 2}, None), {}))
self.assertEquals(f('hello', foo='bar'),
(('hello', True, {1: 2}, None), {'foo':'bar'}))
self.assertEquals(
f, Function('ripple.util.test_Function.FN(true, {1: 2}, null)'))
def test_quoting(self):
f = Function('ripple.util.test_Function.FN(testing)')
self.assertEquals(f(), (('testing',), {}))
f = Function('ripple.util.test_Function.FN(testing, true, false, null)')
self.assertEquals(f(), (('testing', True, False, None), {}))

View File

@@ -0,0 +1,56 @@
from __future__ import absolute_import, division, print_function, unicode_literals
from ripple.util import PrettyPrint
from unittest import TestCase
class test_PrettyPrint(TestCase):
def setUp(self):
self._results = []
self.printer = PrettyPrint.Streamer(printer=self.printer)
def printer(self, *args, **kwds):
self._results.extend(args)
def run_test(self, expected, *args):
for i in range(0, len(args), 2):
self.printer.add(args[i], args[i + 1])
self.printer.finish()
self.assertEquals(''.join(self._results), expected)
def test_simple_printer(self):
self.run_test(
'{\n "foo": "bar"\n}',
'foo', 'bar')
def test_multiple_lines(self):
self.run_test(
'{\n "foo": "bar",\n "baz": 5\n}',
'foo', 'bar', 'baz', 5)
def test_multiple_lines(self):
self.run_test(
"""
{
"foo": {
"bar": 1,
"baz": true
},
"bang": "bing"
}
""".strip(), 'foo', {'bar': 1, 'baz': True}, 'bang', 'bing')
def test_multiple_lines_with_list(self):
self.run_test(
"""
{
"foo": [
"bar",
1
],
"baz": [
23,
42
]
}
""".strip(), 'foo', ['bar', 1], 'baz', [23, 42])

View File

@@ -0,0 +1,28 @@
from __future__ import absolute_import, division, print_function, unicode_literals
from ripple.util import Range
from unittest import TestCase
class test_Range(TestCase):
def round_trip(self, s, *items):
self.assertEquals(Range.from_string(s), set(items))
self.assertEquals(Range.to_string(items), s)
def test_complete(self):
self.round_trip('10,19', 10, 19)
self.round_trip('10', 10)
self.round_trip('10-12', 10, 11, 12)
self.round_trip('10,19,42-45', 10, 19, 42, 43, 44, 45)
def test_names(self):
self.assertEquals(
Range.from_string('first,last,current', first=1, last=3, current=5),
set([1, 3, 5]))
def test_is_range(self):
self.assertTrue(Range.is_range(''))
self.assertTrue(Range.is_range('10'))
self.assertTrue(Range.is_range('10,12'))
self.assertFalse(Range.is_range('10,12,fred'))
self.assertTrue(Range.is_range('10,12,fred', 'fred'))

View File

@@ -0,0 +1,44 @@
from __future__ import absolute_import, division, print_function, unicode_literals
from ripple.util.Search import binary_search, linear_search, FIRST, LAST
from unittest import TestCase
class test_Search(TestCase):
def condition(self, i):
return 10 <= i < 15;
def test_linear_full(self):
self.assertEquals(list(linear_search(range(21), self.condition)),
[10, 11, 12, 13, 14])
def test_linear_partial(self):
self.assertEquals(list(linear_search(range(8, 14), self.condition)),
[10, 11, 12, 13])
self.assertEquals(list(linear_search(range(11, 14), self.condition)),
[11, 12, 13])
self.assertEquals(list(linear_search(range(12, 18), self.condition)),
[12, 13, 14])
def test_linear_empty(self):
self.assertEquals(list(linear_search(range(1, 4), self.condition)), [])
def test_binary_first(self):
self.assertEquals(binary_search(0, 14, self.condition, FIRST), 10)
self.assertEquals(binary_search(10, 19, self.condition, FIRST), 10)
self.assertEquals(binary_search(14, 14, self.condition, FIRST), 14)
self.assertEquals(binary_search(14, 15, self.condition, FIRST), 14)
self.assertEquals(binary_search(13, 15, self.condition, FIRST), 13)
def test_binary_last(self):
self.assertEquals(binary_search(10, 20, self.condition, LAST), 14)
self.assertEquals(binary_search(0, 14, self.condition, LAST), 14)
self.assertEquals(binary_search(14, 14, self.condition, LAST), 14)
self.assertEquals(binary_search(14, 15, self.condition, LAST), 14)
self.assertEquals(binary_search(13, 15, self.condition, LAST), 14)
def test_binary_throws(self):
self.assertRaises(
ValueError, binary_search, 0, 20, self.condition, LAST)
self.assertRaises(
ValueError, binary_search, 0, 20, self.condition, FIRST)