import plom_socket_io
import socket
import threading
-from functools import partial
-import unittest
-
-
-class ArgumentError(Exception):
- pass
-
-
-class ParseError(Exception):
- pass
-
-
-class Parser:
-
- def __init__(self, game):
- self.game = game
-
- def tokenize(self, msg):
- """Parse msg string into tokens.
-
- Separates by ' ' and '\n', but allows whitespace in tokens quoted by
- '"', and allows escaping within quoted tokens by a prefixed backslash.
- """
- tokens = []
- token = ''
- quoted = False
- escaped = False
- for c in msg:
- if quoted:
- if escaped:
- token += c
- escaped = False
- elif c == '\\':
- escaped = True
- elif c == '"':
- quoted = False
- else:
- token += c
- elif c == '"':
- quoted = True
- elif c in {' ', '\n'}:
- if len(token) > 0:
- tokens += [token]
- token = ''
- else:
- token += c
- if len(token) > 0:
- tokens += [token]
- return tokens
-
- def parse(self, msg):
- """Parse msg as call to self.game method, return method with arguments.
-
- Respects method signatures defined in methods' .argtypes attributes.
- """
- tokens = self.tokenize(msg)
- if len(tokens) == 0:
- return None
- method_candidate = 'cmd_' + tokens[0]
- if not hasattr(self.game, method_candidate):
- return None
- method = getattr(self.game, method_candidate)
- if len(tokens) == 1:
- if not hasattr(method, 'argtypes'):
- return method
- else:
- raise ParseError('Command expects argument(s).')
- args_candidates = tokens[1:]
- if not hasattr(method, 'argtypes'):
- raise ParseError('Command expects no argument(s).')
- args, kwargs = self.argsparse(method.argtypes, args_candidates)
- return partial(method, *args, **kwargs)
-
- def parse_yx_tuple(self, yx_string):
- """Parse yx_string as yx_tuple:nonneg argtype, return result."""
-
- def get_axis_position_from_argument(axis, token):
- if len(token) < 3 or token[:2] != axis + ':' or \
- not token[2:].isdigit():
- raise ParseError('Non-int arg for ' + axis + ' position.')
- n = int(token[2:])
- if n < 1:
- raise ParseError('Arg for ' + axis + ' position < 1.')
- return n
-
- tokens = yx_string.split(',')
- if len(tokens) != 2:
- raise ParseError('Wrong number of yx-tuple arguments.')
- y = get_axis_position_from_argument('Y', tokens[0])
- x = get_axis_position_from_argument('X', tokens[1])
- return (y, x)
-
- def argsparse(self, signature, args_tokens):
- """Parse into / return args_tokens as args/kwargs defined by signature.
-
- Expects signature to be a ' '-delimited sequence of any of the strings
- 'int:nonneg', 'yx_tuple:nonneg', 'string', defining the respective
- argument types.
- """
- tmpl_tokens = signature.split()
- if len(tmpl_tokens) != len(args_tokens):
- raise ParseError('Number of arguments (' + str(len(args_tokens)) +
- ') not expected number (' + str(len(tmpl_tokens))
- + ').')
- args = []
- for i in range(len(tmpl_tokens)):
- tmpl = tmpl_tokens[i]
- arg = args_tokens[i]
- if tmpl == 'int:nonneg':
- if not arg.isdigit():
- raise ParseError('Argument must be non-negative integer.')
- args += [int(arg)]
- elif tmpl == 'yx_tuple:nonneg':
- args += [self.parse_yx_tuple(arg)]
- elif tmpl == 'string':
- args += [arg]
- else:
- raise ParseError('Unknown argument type.')
- return args, {}
+from parser import ArgError, Parser
class Game:
"""Reset self.terrain_map from terrain_map."""
lines = terrain_map.split('\n')
if len(lines) != self.map_size[0]:
- raise ArgumentError('wrong map height')
+ raise ArgError('wrong map height')
for line in lines:
if len(line) != self.map_size[1]:
- raise ArgumentError('wrong map width')
+ raise ArgError('wrong map width')
self.terrain_map = terrain_map
cmd_TERRAIN.argtypes = 'string'
self.game.log('UNHANDLED INPUT: ' + msg)
else:
command()
- except (ArgumentError, ParseError) as e:
+ except ArgError as e:
self.game.log('ARGUMENT ERROR: ' + msg + '\n' + str(e))
self.widget_manager.update()
del self.server_output[0]
self.recv_loop_thread.join()
-class TestParser(unittest.TestCase):
-
- def test_tokenizer(self):
- p = Parser(Game())
- self.assertEqual(p.tokenize(''), [])
- self.assertEqual(p.tokenize(' '), [])
- self.assertEqual(p.tokenize('abc'), ['abc'])
- self.assertEqual(p.tokenize('a b\nc "d"'), ['a', 'b', 'c', 'd'])
- self.assertEqual(p.tokenize('a "b\nc d"'), ['a', 'b\nc d'])
- self.assertEqual(p.tokenize('a"b"c'), ['abc'])
- self.assertEqual(p.tokenize('a\\b'), ['a\\b'])
- self.assertEqual(p.tokenize('"a\\b"'), ['ab'])
- self.assertEqual(p.tokenize('a"b'), ['ab'])
- self.assertEqual(p.tokenize('a"\\"b'), ['a"b'])
-
- def test_unhandled(self):
- p = Parser(Game())
- self.assertEqual(p.parse(''), None)
- self.assertEqual(p.parse(' '), None)
- self.assertEqual(p.parse('x'), None)
-
- def test_argsparse(self):
- p = Parser(Game())
- assertErr = partial(self.assertRaises, ParseError, p.argsparse)
- assertErr('', ['foo'])
- assertErr('string', [])
- assertErr('string string', ['foo'])
- self.assertEqual(p.argsparse('string', ('foo',)),
- (['foo'], {}))
- self.assertEqual(p.argsparse('string string', ('foo', 'bar')),
- (['foo', 'bar'], {}))
- assertErr('int:nonneg', [''])
- assertErr('int:nonneg', ['x'])
- assertErr('int:nonneg', ['-1'])
- assertErr('int:nonneg', ['0.1'])
- self.assertEqual(p.argsparse('int:nonneg', ('0',)),
- ([0], {}))
- assertErr('yx_tuple:nonneg', ['x'])
- assertErr('yx_tuple:nonneg', ['Y:0,X:1'])
- assertErr('yx_tuple:nonneg', ['Y:1,X:0'])
- assertErr('yx_tuple:nonneg', ['Y:1.1,X:1'])
- assertErr('yx_tuple:nonneg', ['Y:1,X:1.1'])
- self.assertEqual(p.argsparse('yx_tuple:nonneg', ('Y:1,X:2',)),
- ([(1, 2)], {}))
-
-
if __name__ == '__main__':
game = Game()
s = socket.create_connection(('127.0.0.1', 5000))
--- /dev/null
+import unittest
+from functools import partial
+
+
+class ArgError(Exception):
+ pass
+
+
+class Parser:
+
+ def __init__(self, game=None):
+ self.game = game
+
+ def tokenize(self, msg):
+ """Parse msg string into tokens.
+
+ Separates by ' ' and '\n', but allows whitespace in tokens quoted by
+ '"', and allows escaping within quoted tokens by a prefixed backslash.
+ """
+ tokens = []
+ token = ''
+ quoted = False
+ escaped = False
+ for c in msg:
+ if quoted:
+ if escaped:
+ token += c
+ escaped = False
+ elif c == '\\':
+ escaped = True
+ elif c == '"':
+ quoted = False
+ else:
+ token += c
+ elif c == '"':
+ quoted = True
+ elif c in {' ', '\n'}:
+ if len(token) > 0:
+ tokens += [token]
+ token = ''
+ else:
+ token += c
+ if len(token) > 0:
+ tokens += [token]
+ return tokens
+
+ def parse(self, msg):
+ """Parse msg as call to self.game method, return method with arguments.
+
+ Respects method signatures defined in methods' .argtypes attributes.
+ """
+ tokens = self.tokenize(msg)
+ if len(tokens) == 0:
+ return None
+ method_candidate = 'cmd_' + tokens[0]
+ if not hasattr(self.game, method_candidate):
+ return None
+ method = getattr(self.game, method_candidate)
+ if len(tokens) == 1:
+ if not hasattr(method, 'argtypes'):
+ return method
+ else:
+ raise ArgError('Command expects argument(s).')
+ args_candidates = tokens[1:]
+ if not hasattr(method, 'argtypes'):
+ raise ArgError('Command expects no argument(s).')
+ args, kwargs = self.argsparse(method.argtypes, args_candidates)
+ return partial(method, *args, **kwargs)
+
+ def parse_yx_tuple(self, yx_string):
+ """Parse yx_string as yx_tuple:nonneg argtype, return result."""
+
+ def get_axis_position_from_argument(axis, token):
+ if len(token) < 3 or token[:2] != axis + ':' or \
+ not token[2:].isdigit():
+ raise ArgError('Non-int arg for ' + axis + ' position.')
+ n = int(token[2:])
+ if n < 1:
+ raise ArgError('Arg for ' + axis + ' position < 1.')
+ return n
+
+ tokens = yx_string.split(',')
+ if len(tokens) != 2:
+ raise ArgError('Wrong number of yx-tuple arguments.')
+ y = get_axis_position_from_argument('Y', tokens[0])
+ x = get_axis_position_from_argument('X', tokens[1])
+ return (y, x)
+
+ def argsparse(self, signature, args_tokens):
+ """Parse into / return args_tokens as args/kwargs defined by signature.
+
+ Expects signature to be a ' '-delimited sequence of any of the strings
+ 'int:nonneg', 'yx_tuple:nonneg', 'string', 'seq:int:nonneg', defining
+ the respective argument types.
+ """
+ tmpl_tokens = signature.split()
+ if len(tmpl_tokens) != len(args_tokens):
+ raise ArgError('Number of arguments (' + str(len(args_tokens)) +
+ ') not expected number (' + str(len(tmpl_tokens))
+ + ').')
+ args = []
+ for i in range(len(tmpl_tokens)):
+ tmpl = tmpl_tokens[i]
+ arg = args_tokens[i]
+ if tmpl == 'int:nonneg':
+ if not arg.isdigit():
+ raise ArgError('Argument must be non-negative integer.')
+ args += [int(arg)]
+ elif tmpl == 'yx_tuple:nonneg':
+ args += [self.parse_yx_tuple(arg)]
+ elif tmpl == 'string':
+ args += [arg]
+ elif tmpl == 'seq:int:nonneg':
+ sub_tokens = arg.split(',')
+ if len(sub_tokens) < 1:
+ raise ArgError('Argument must be non-empty sequence.')
+ seq = []
+ for tok in sub_tokens:
+ if not tok.isdigit():
+ raise ArgError('Argument sequence must only contain '
+ 'non-negative integers.')
+ seq += [int(tok)]
+ args += [seq]
+ else:
+ raise ArgError('Unknown argument type.')
+ return args, {}
+
+
+class TestParser(unittest.TestCase):
+
+ def test_tokenizer(self):
+ p = Parser()
+ self.assertEqual(p.tokenize(''), [])
+ self.assertEqual(p.tokenize(' '), [])
+ self.assertEqual(p.tokenize('abc'), ['abc'])
+ self.assertEqual(p.tokenize('a b\nc "d"'), ['a', 'b', 'c', 'd'])
+ self.assertEqual(p.tokenize('a "b\nc d"'), ['a', 'b\nc d'])
+ self.assertEqual(p.tokenize('a"b"c'), ['abc'])
+ self.assertEqual(p.tokenize('a\\b'), ['a\\b'])
+ self.assertEqual(p.tokenize('"a\\b"'), ['ab'])
+ self.assertEqual(p.tokenize('a"b'), ['ab'])
+ self.assertEqual(p.tokenize('a"\\"b'), ['a"b'])
+
+ def test_unhandled(self):
+ p = Parser()
+ self.assertEqual(p.parse(''), None)
+ self.assertEqual(p.parse(' '), None)
+ self.assertEqual(p.parse('x'), None)
+
+ def test_argsparse(self):
+ from functools import partial
+ p = Parser()
+ assertErr = partial(self.assertRaises, ArgError, p.argsparse)
+ assertErr('', ['foo'])
+ assertErr('string', [])
+ assertErr('string string', ['foo'])
+ self.assertEqual(p.argsparse('string', ('foo',)),
+ (['foo'], {}))
+ self.assertEqual(p.argsparse('string string', ('foo', 'bar')),
+ (['foo', 'bar'], {}))
+ assertErr('int:nonneg', [''])
+ assertErr('int:nonneg', ['x'])
+ assertErr('int:nonneg', ['-1'])
+ assertErr('int:nonneg', ['0.1'])
+ self.assertEqual(p.argsparse('int:nonneg', ('0',)),
+ ([0], {}))
+ assertErr('yx_tuple:nonneg', ['x'])
+ assertErr('yx_tuple:nonneg', ['Y:0,X:1'])
+ assertErr('yx_tuple:nonneg', ['Y:1,X:0'])
+ assertErr('yx_tuple:nonneg', ['Y:1.1,X:1'])
+ assertErr('yx_tuple:nonneg', ['Y:1,X:1.1'])
+ self.assertEqual(p.argsparse('yx_tuple:nonneg', ('Y:1,X:2',)),
+ ([(1, 2)], {}))
+ assertErr('seq:int:nonneg', [''])
+ assertErr('seq:int:nonneg', [','])
+ assertErr('seq:int:nonneg', ['a'])
+ assertErr('seq:int:nonneg', ['a,1'])
+ assertErr('seq:int:nonneg', [',1'])
+ assertErr('seq:int:nonneg', ['1,'])
+ self.assertEqual(p.argsparse('seq:int:nonneg', ('1,2,3',)),
+ ([[1, 2, 3]], {}))
import socketserver
import threading
import queue
+from parser import ArgError, Parser
+
# Avoid "Address already in use" errors.
socketserver.TCPServer.allow_reuse_address = True
return fib(n-1) + fib(n-2)
-class ArgumentError(Exception):
- pass
-
-
class CommandHandler:
def __init__(self, queues_out):
from multiprocessing import Pool
self.queues_out = queues_out
self.world = World()
+ self.parser = Parser(self)
# self.pool and self.pool_result are currently only needed by the FIB
# command and the demo of a parallelized game loop in cmd_inc_p.
self.pool = Pool()
self.send_all('THING TYPE:' + thing.type + ' '
+ self.stringify_yx(thing.position))
- def cmd_fib(self, tokens, connection_id):
+ def cmd_FIB(self, numbers, connection_id):
"""Reply with n-th Fibonacci numbers, n taken from tokens[1:].
Numbers are calculated in parallel as far as possible, using fib().
A 'CALCULATING …' message is sent to caller before the result.
"""
- if len(tokens) < 2:
- raise ArgumentError('FIB NEEDS AT LEAST ONE ARGUMENT')
- numbers = []
- for token in tokens[1:]:
- if token == '0' or not token.isdigit():
- raise ArgumentError('FIB ARGUMENTS MUST BE INTEGERS > 0')
- numbers += [int(token)]
self.send_to(connection_id, 'CALCULATING …')
results = self.pool.map(fib, numbers)
reply = ' '.join([str(r) for r in results])
self.send_to(connection_id, reply)
+ cmd_FIB.argtypes = 'seq:int:nonneg'
- def cmd_inc_p(self, connection_id):
+ def cmd_INC_P(self, connection_id):
"""Increment world.turn, send game turn data to everyone.
To simulate game processing waiting times, a one second delay between
+ self.stringify_yx(thing.position))
self.pool_result = self.pool.map_async(fib, (35, 35))
- def cmd_get_turn(self, connection_id):
+ def cmd_GET_TURN(self, connection_id):
"""Send world.turn to caller."""
self.send_to(connection_id, str(self.world.turn))
- def cmd_move(self, direction, connection_id):
+ def cmd_MOVE(self, direction, connection_id):
"""Set player task to 'move' with direction arg, finish player turn."""
if direction not in {'UP', 'DOWN', 'RIGHT', 'LEFT'}:
- raise ArgumentError('MOVE ARGUMENT MUST BE ONE OF: '
- 'UP, DOWN, RIGHT, LEFT')
+ raise ArgError('Move argument must be one of: '
+ 'UP, DOWN, RIGHT, LEFT')
self.world.player.set_task('move', direction=direction)
self.proceed_to_next_player_turn(connection_id)
+ cmd_MOVE.argtypes = 'string'
- def cmd_wait(self, connection_id):
+ def cmd_WAIT(self, connection_id):
"""Set player task to 'wait', finish player turn."""
self.world.player.set_task('wait')
self.proceed_to_next_player_turn(connection_id)
- def cmd_echo(self, tokens, input_, connection_id):
- """Send message in input_ beyond tokens[0] to caller."""
- msg = input_[len(tokens[0]) + 1:]
+ def cmd_ECHO(self, msg, connection_id):
+ """Send msg to caller."""
self.send_to(connection_id, msg)
+ cmd_ECHO.argtypes = 'string'
- def cmd_all(self, tokens, input_):
- """Send message in input_ beyond tokens[0] to all clients."""
- msg = input_[len(tokens[0]) + 1:]
+ def cmd_ALL(self, msg, connection_id):
+ """Send msg to all clients."""
self.send_all(msg)
+ cmd_ALL.argtypes = 'string'
def handle_input(self, input_, connection_id):
"""Process input_ to command grammar, call command handler if found."""
- tokens = [token for token in input_.split(' ') if len(token) > 0]
try:
- if len(tokens) == 0:
- self.send_to(connection_id, 'EMPTY COMMAND')
- elif len(tokens) == 1 and tokens[0] == 'INC_P':
- self.cmd_inc_p(connection_id)
- elif len(tokens) == 1 and tokens[0] == 'GET_TURN':
- self.cmd_get_turn(connection_id)
- elif len(tokens) == 1 and tokens[0] == 'WAIT':
- self.cmd_wait(connection_id)
- elif len(tokens) == 2 and tokens[0] == 'MOVE':
- self.cmd_move(tokens[1], connection_id)
- elif len(tokens) >= 1 and tokens[0] == 'ECHO':
- self.cmd_echo(tokens, input_, connection_id)
- elif len(tokens) >= 1 and tokens[0] == 'ALL':
- self.cmd_all(tokens, input_)
- elif len(tokens) >= 1 and tokens[0] == 'FIB':
- # TODO: Should this really block the whole loop?
- self.cmd_fib(tokens, connection_id)
+ command = self.parser.parse(input_)
+ if command is None:
+ self.send_to(connection_id, 'UNHANDLED INPUT')
else:
- self.send_to(connection_id, 'UNKNOWN COMMAND')
- except ArgumentError as e:
+ command(connection_id=connection_id)
+ except ArgError as e:
self.send_to(connection_id, 'ARGUMENT ERROR: ' + str(e))