#!/usr/bin/env python3
import curses
-import socket
import queue
import threading
+from plomrogue.game import GameBase
+from plomrogue.parser import Parser
+from plomrogue.mapping import YX, MapGeometrySquare, MapGeometryHex
+from plomrogue.things import ThingBase
+from plomrogue.misc import quote
+from plomrogue.errors import BrokenSocketConnection
+
+from ws4py.client import WebSocketBaseClient
+class WebSocketClient(WebSocketBaseClient):
+
+ def __init__(self, recv_handler, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.recv_handler = recv_handler
+ self.connect()
+
+ def received_message(self, message):
+ if message.is_text:
+ message = str(message)
+ self.recv_handler(message)
+
+ @property
+ def plom_closed(self):
+ return self.client_terminated
+
from plomrogue.io_tcp import PlomSocket
+class PlomSocketClient(PlomSocket):
+
+ def __init__(self, recv_handler, url):
+ import socket
+ self.recv_handler = recv_handler
+ host, port = url.split(':')
+ super().__init__(socket.create_connection((host, port)))
+
+ def close(self):
+ self.socket.close()
+
+ def run(self):
+ try:
+ for msg in self.recv():
+ self.recv_handler(msg)
+ except BrokenSocketConnection:
+ pass # we assume socket will be known as dead by now
+
+def cmd_TURN(game, n):
+ game.turn = n
+ game.things = []
+ game.portals = {}
+ game.turn_complete = False
+cmd_TURN.argtypes = 'int:nonneg'
+
+def cmd_LOGIN_OK(game):
+ game.tui.switch_mode('post_login_wait')
+ game.tui.send('GET_GAMESTATE')
+ game.tui.log_msg('@ welcome')
+cmd_LOGIN_OK.argtypes = ''
+
+def cmd_CHAT(game, msg):
+ game.tui.log_msg('# ' + msg)
+ game.tui.do_refresh = True
+cmd_CHAT.argtypes = 'string'
+
+def cmd_PLAYER_ID(game, player_id):
+ game.player_id = player_id
+cmd_PLAYER_ID.argtypes = 'int:nonneg'
+
+def cmd_THING_POS(game, thing_id, position):
+ t = game.get_thing(thing_id, True)
+ t.position = position
+cmd_THING_POS.argtypes = 'int:nonneg yx_tuple:nonneg'
+
+def cmd_THING_NAME(game, thing_id, name):
+ t = game.get_thing(thing_id, True)
+ t.name = name
+cmd_THING_NAME.argtypes = 'int:nonneg string'
+
+def cmd_MAP(game, geometry, size, content):
+ map_geometry_class = globals()['MapGeometry' + geometry]
+ game.map_geometry = map_geometry_class(size)
+ game.map_content = content
+ if type(game.map_geometry) == MapGeometrySquare:
+ game.tui.movement_keys = {
+ game.tui.keys['square_move_up']: 'UP',
+ game.tui.keys['square_move_left']: 'LEFT',
+ game.tui.keys['square_move_down']: 'DOWN',
+ game.tui.keys['square_move_right']: 'RIGHT',
+ }
+ elif type(game.map_geometry) == MapGeometryHex:
+ game.tui.movement_keys = {
+ game.tui.keys['hex_move_upleft']: 'UPLEFT',
+ game.tui.keys['hex_move_upright']: 'UPRIGHT',
+ game.tui.keys['hex_move_right']: 'RIGHT',
+ game.tui.keys['hex_move_downright']: 'DOWNRIGHT',
+ game.tui.keys['hex_move_downleft']: 'DOWNLEFT',
+ game.tui.keys['hex_move_left']: 'LEFT',
+ }
+cmd_MAP.argtypes = 'string:map_geometry yx_tuple:pos string'
+
+def cmd_GAME_STATE_COMPLETE(game):
+ game.info_db = {}
+ if game.tui.mode.name == 'post_login_wait':
+ game.tui.switch_mode('play')
+ game.tui.help()
+ if game.tui.mode.shows_info:
+ game.tui.query_info()
+ player = game.get_thing(game.player_id, False)
+ if player.position in game.portals:
+ game.tui.teleport_target_host = game.portals[player.position]
+ game.tui.switch_mode('teleport')
+ game.turn_complete = True
+ game.tui.do_refresh = True
+cmd_GAME_STATE_COMPLETE.argtypes = ''
+
+def cmd_PORTAL(game, position, msg):
+ game.portals[position] = msg
+cmd_PORTAL.argtypes = 'yx_tuple:nonneg string'
+
+def cmd_PLAY_ERROR(game, msg):
+ game.tui.flash()
+ game.tui.do_refresh = True
+cmd_PLAY_ERROR.argtypes = 'string'
-def recv_loop(plom_socket, q):
- for msg in plom_socket.recv():
- q.put(msg)
+def cmd_GAME_ERROR(game, msg):
+ game.tui.log_msg('? game error: ' + msg)
+ game.tui.do_refresh = True
+cmd_GAME_ERROR.argtypes = 'string'
+
+def cmd_ARGUMENT_ERROR(game, msg):
+ game.tui.log_msg('? syntax error: ' + msg)
+ game.tui.do_refresh = True
+cmd_ARGUMENT_ERROR.argtypes = 'string'
+
+def cmd_ANNOTATION(game, position, msg):
+ game.info_db[position] = msg
+ if game.tui.mode.shows_info:
+ game.tui.do_refresh = True
+cmd_ANNOTATION.argtypes = 'yx_tuple:nonneg string'
+
+class Game(GameBase):
+ commands = {'LOGIN_OK': cmd_LOGIN_OK,
+ 'CHAT': cmd_CHAT,
+ 'PLAYER_ID': cmd_PLAYER_ID,
+ 'TURN': cmd_TURN,
+ 'THING_POS': cmd_THING_POS,
+ 'THING_NAME': cmd_THING_NAME,
+ 'MAP': cmd_MAP,
+ 'PORTAL': cmd_PORTAL,
+ 'ANNOTATION': cmd_ANNOTATION,
+ 'GAME_STATE_COMPLETE': cmd_GAME_STATE_COMPLETE,
+ 'ARGUMENT_ERROR': cmd_ARGUMENT_ERROR,
+ 'GAME_ERROR': cmd_GAME_ERROR,
+ 'PLAY_ERROR': cmd_PLAY_ERROR}
+ thing_type = ThingBase
+ turn_complete = False
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.map_content = ''
+ self.player_id = -1
+ self.info_db = {}
+ self.portals = {}
+
+ def get_string_options(self, string_option_type):
+ if string_option_type == 'map_geometry':
+ return ['Hex', 'Square']
+ return None
+
+ def get_command(self, command_name):
+ from functools import partial
+ f = partial(self.commands[command_name], self)
+ f.argtypes = self.commands[command_name].argtypes
+ return f
class TUI:
- def __init__(self, socket, q):
- self.queue = q
- self.socket = socket
+ class Mode:
+
+ def __init__(self, name, has_input_prompt=False, shows_info=False,
+ is_intro = False):
+ self.name = name
+ self.has_input_prompt = has_input_prompt
+ self.shows_info = shows_info
+ self.is_intro = is_intro
+
+ def __init__(self, host):
+ import os
+ import json
+ self.host = host
+ self.mode_play = self.Mode('play')
+ self.mode_study = self.Mode('study', shows_info=True)
+ self.mode_edit = self.Mode('edit')
+ self.mode_annotate = self.Mode('annotate', has_input_prompt=True, shows_info=True)
+ self.mode_portal = self.Mode('portal', has_input_prompt=True, shows_info=True)
+ self.mode_chat = self.Mode('chat', has_input_prompt=True)
+ self.mode_waiting_for_server = self.Mode('waiting_for_server', is_intro=True)
+ self.mode_login = self.Mode('login', has_input_prompt=True, is_intro=True)
+ self.mode_post_login_wait = self.Mode('post_login_wait', is_intro=True)
+ self.mode_teleport = self.Mode('teleport', has_input_prompt=True)
+ self.game = Game()
+ self.game.tui = self
+ self.parser = Parser(self.game)
self.log = []
- self.log_msg("hallo")
- self.log_msg("welt")
self.do_refresh = True
+ self.queue = queue.Queue()
+ self.login_name = None
+ self.switch_mode('waiting_for_server')
+ self.keys = {
+ 'switch_to_chat': 't',
+ 'switch_to_play': 'p',
+ 'switch_to_annotate': 'm',
+ 'switch_to_portal': 'P',
+ 'switch_to_study': '?',
+ 'switch_to_edit': 'm',
+ 'flatten': 'F',
+ 'hex_move_upleft': 'w',
+ 'hex_move_upright': 'e',
+ 'hex_move_right': 'd',
+ 'hex_move_downright': 'x',
+ 'hex_move_downleft': 'y',
+ 'hex_move_left': 'a',
+ 'square_move_up': 'w',
+ 'square_move_left': 'a',
+ 'square_move_down': 's',
+ 'square_move_right': 'd',
+ }
+ if os.path.isfile('config.json'):
+ with open('config.json', 'r') as f:
+ keys_conf = json.loads(f.read())
+ for k in keys_conf:
+ self.keys[k] = keys_conf[k]
curses.wrapper(self.loop)
+ def flash(self):
+ curses.flash()
+
+ def send(self, msg):
+ try:
+ if hasattr(self.socket, 'plom_closed') and self.socket.plom_closed:
+ raise BrokenSocketConnection
+ self.socket.send(msg)
+ except (BrokenPipeError, BrokenSocketConnection):
+ self.log_msg('@ server disconnected :(')
+ self.do_refresh = True
+
def log_msg(self, msg):
self.log += [msg]
if len(self.log) > 100:
self.log = self.log[-100:]
+ def query_info(self):
+ self.send('GET_ANNOTATION ' + str(self.explorer))
+
+ def switch_mode(self, mode_name, keep_position = False):
+ self.mode = getattr(self, 'mode_' + mode_name)
+ if self.mode.shows_info and not keep_position:
+ player = self.game.get_thing(self.game.player_id, False)
+ self.explorer = YX(player.position.y, player.position.x)
+ if self.mode.name == 'waiting_for_server':
+ self.log_msg('@ waiting for server …')
+ elif self.mode.name == 'login':
+ if self.login_name:
+ self.send('LOGIN ' + quote(self.login_name))
+ else:
+ self.log_msg('@ enter username')
+ elif self.mode.name == 'teleport':
+ self.log_msg("@ May teleport to %s" % (self.teleport_target_host)),
+ self.log_msg("@ Enter 'YES!' to enthusiastically affirm.");
+ elif self.mode.name == 'annotate' and self.explorer in self.game.info_db:
+ info = self.game.info_db[self.explorer]
+ if info != '(none)':
+ self.input_ = info
+ elif self.mode.name == 'portal' and self.explorer in self.game.portals:
+ self.input_ = self.game.portals[self.explorer]
+
+ def help(self):
+ self.log_msg("HELP:");
+ self.log_msg("chat mode commands:");
+ self.log_msg(" /nick NAME - re-name yourself to NAME");
+ self.log_msg(" /msg USER TEXT - send TEXT to USER");
+ self.log_msg(" /help - show this help");
+ self.log_msg(" /%s or /play - switch to play mode" % self.keys['switch_to_play']);
+ self.log_msg(" /%s or /study - switch to study mode" % self.keys['switch_to_study']);
+ self.log_msg("commands common to study and play mode:");
+ self.log_msg(" %s - move" % ','.join(self.movement_keys));
+ self.log_msg(" %s - switch to chat mode" % self.keys['switch_to_chat']);
+ self.log_msg("commands specific to play mode:");
+ self.log_msg(" %s - write following ASCII character" % self.keys['switch_to_edit']);
+ self.log_msg(" %s - flatten surroundings" % self.keys['flatten']);
+ self.log_msg(" %s - switch to study mode" % self.keys['switch_to_study']);
+ self.log_msg("commands specific to study mode:");
+ self.log_msg(" %s - annotate terrain" % self.keys['switch_to_annotate']);
+ self.log_msg(" %s - switch to play mode" % self.keys['switch_to_play']);
+
def loop(self, stdscr):
+ import time
+
+ def safe_addstr(y, x, line):
+ if y < self.size.y - 1 or x + len(line) < self.size.x:
+ stdscr.addstr(y, x, line)
+ else: # workaround to <https://stackoverflow.com/q/7063128>
+ cut_i = self.size.x - x - 1
+ cut = line[:cut_i]
+ last_char = line[cut_i]
+ stdscr.addstr(y, self.size.x - 2, last_char)
+ stdscr.insstr(y, self.size.x - 2, ' ')
+ stdscr.addstr(y, x, cut)
+
+ def connect():
+
+ def handle_recv(msg):
+ if msg == 'BYE':
+ self.socket.close()
+ else:
+ self.queue.put(msg)
+
+ socket_client_class = PlomSocketClient
+ if self.host.startswith('ws://') or self.host.startswith('wss://'):
+ socket_client_class = WebSocketClient
+ while True:
+ try:
+ self.socket = socket_client_class(handle_recv, self.host)
+ self.socket_thread = threading.Thread(target=self.socket.run)
+ self.socket_thread.start()
+ self.switch_mode('login')
+ return
+ except ConnectionRefusedError:
+ self.log_msg('@ server connect failure, trying again …')
+ draw_screen()
+ stdscr.refresh()
+ time.sleep(1)
+
+ def reconnect():
+ self.send('QUIT')
+ time.sleep(0.1) # FIXME necessitated by some some strange SSL race
+ # conditions with ws4py, find out what exactly
+ self.switch_mode('waiting_for_server')
+ connect()
+
+ def handle_input(msg):
+ command, args = self.parser.parse(msg)
+ command(*args)
def msg_into_lines_of_width(msg, width):
chunk = ''
lines = []
x = 0
for i in range(len(msg)):
- x += 1
if x >= width or msg[i] == "\n":
lines += [chunk]
chunk = ''
x = 0
if msg[i] != "\n":
chunk += msg[i]
+ x += 1
lines += [chunk]
return lines
def reset_screen_size():
- self.rows, self.cols = stdscr.getmaxyx()
- self.cols -= 1 # ugly TODO ncurses bug workaround, FIXME
- self.window_width = self.cols // 2
+ self.size = YX(*stdscr.getmaxyx())
+ self.size = self.size - YX(self.size.y % 4, 0)
+ self.size = self.size - YX(0, self.size.x % 4)
+ self.window_width = int(self.size.x / 2)
def recalc_input_lines():
- self.input_lines = msg_into_lines_of_width(input_prompt +input_,
- self.window_width)
+ if not self.mode.has_input_prompt:
+ self.input_lines = []
+ else:
+ self.input_lines = msg_into_lines_of_width(input_prompt + self.input_,
+ self.window_width)
+
+ def move_explorer(direction):
+ target = self.game.map_geometry.move(self.explorer, direction)
+ if target:
+ self.explorer = target
+ self.query_info()
+ else:
+ self.flash()
def draw_history():
lines = []
for line in self.log:
lines += msg_into_lines_of_width(line, self.window_width)
lines.reverse()
- max_y = self.rows - len(self.input_lines)
+ height_header = 2
+ max_y = self.size.y - len(self.input_lines)
+ for i in range(len(lines)):
+ if (i >= max_y - height_header):
+ break
+ safe_addstr(max_y - i - 1, self.window_width, lines[i])
+
+ def draw_info():
+ if not self.game.turn_complete:
+ return
+ if self.explorer in self.game.portals:
+ info = 'PORTAL: ' + self.game.portals[self.explorer] + '\n'
+ else:
+ info = 'PORTAL: (none)\n'
+ if self.explorer in self.game.info_db:
+ info += 'ANNOTATION: ' + self.game.info_db[self.explorer]
+ else:
+ info += 'ANNOTATION: waiting …'
+ lines = msg_into_lines_of_width(info, self.window_width)
+ height_header = 2
for i in range(len(lines)):
- if (i >= max_y):
+ y = height_header + i
+ if y >= self.size.y - len(self.input_lines):
break
- stdscr.addstr(max_y - i - 1, self.window_width, lines[i])
+ safe_addstr(y, self.window_width, lines[i])
def draw_input():
- y = self.rows - len(self.input_lines)
+ y = self.size.y - len(self.input_lines)
for i in range(len(self.input_lines)):
- stdscr.addstr(y, self.window_width, self.input_lines[i])
+ safe_addstr(y, self.window_width, self.input_lines[i])
y += 1
+ def draw_turn():
+ if not self.game.turn_complete:
+ return
+ safe_addstr(0, self.window_width, 'TURN: ' + str(self.game.turn))
+
+ def draw_mode():
+ safe_addstr(1, self.window_width, 'MODE: ' + self.mode.name)
+
+ def draw_map():
+ if not self.game.turn_complete:
+ return
+ map_lines_split = []
+ for y in range(self.game.map_geometry.size.y):
+ start = self.game.map_geometry.size.x * y
+ end = start + self.game.map_geometry.size.x
+ map_lines_split += [list(self.game.map_content[start:end])]
+ for t in self.game.things:
+ map_lines_split[t.position.y][t.position.x] = '@'
+ if self.mode.shows_info:
+ map_lines_split[self.explorer.y][self.explorer.x] = '?'
+ map_lines = []
+ if type(self.game.map_geometry) == MapGeometryHex:
+ indent = 0
+ for line in map_lines_split:
+ map_lines += [indent*' ' + ' '.join(line)]
+ indent = 0 if indent else 1
+ else:
+ for line in map_lines_split:
+ map_lines += [' '.join(line)]
+ window_center = YX(int(self.size.y / 2),
+ int(self.window_width / 2))
+ player = self.game.get_thing(self.game.player_id, False)
+ center = player.position
+ if self.mode.shows_info:
+ center = self.explorer
+ center = YX(center.y, center.x * 2)
+ offset = center - window_center
+ if type(self.game.map_geometry) == MapGeometryHex and offset.y % 2:
+ offset += YX(0, 1)
+ term_y = max(0, -offset.y)
+ term_x = max(0, -offset.x)
+ map_y = max(0, offset.y)
+ map_x = max(0, offset.x)
+ while (term_y < self.size.y and map_y < self.game.map_geometry.size.y):
+ to_draw = map_lines[map_y][map_x:self.window_width + offset.x]
+ safe_addstr(term_y, term_x, to_draw)
+ term_y += 1
+ map_y += 1
+
+ def draw_screen():
+ stdscr.clear()
+ recalc_input_lines()
+ if self.mode.has_input_prompt:
+ draw_input()
+ if self.mode.shows_info:
+ draw_info()
+ else:
+ draw_history()
+ draw_mode()
+ if not self.mode.is_intro:
+ draw_turn()
+ draw_map()
+
curses.curs_set(False) # hide cursor
stdscr.timeout(10)
reset_screen_size()
- input_ = ''
+ self.explorer = YX(0, 0)
+ self.input_ = ''
input_prompt = '> '
+ connect()
while True:
-
if self.do_refresh:
+ draw_screen()
self.do_refresh = False
- stdscr.clear()
- recalc_input_lines()
-
- draw_input()
- draw_history()
-
while True:
try:
msg = self.queue.get(block=False)
- self.log_msg(msg)
- self.do_refresh = True
+ handle_input(msg)
except queue.Empty:
break
-
try:
key = stdscr.getkey()
+ self.do_refresh = True
except curses.error:
continue
- self.do_refresh = True
-
if key == 'KEY_RESIZE':
reset_screen_size()
- elif key == 'KEY_BACKSPACE':
- input_ = input_[:-1]
- elif key == '\n': # Return key
- self.socket.send(input_)
- input_ = ""
- else:
- input_ += key
- # TODO find out why - 1 is necessary here
- max_length = self.window_width * self.rows - len(input_prompt) - 1
- if len(input_) > max_length:
- input_ = input_[:max_length]
-
-s = socket.create_connection(('127.0.0.1', 5000))
-plom_socket = PlomSocket(s)
-q = queue.Queue()
-t = threading.Thread(target=recv_loop, args=(plom_socket, q))
-t.start()
-TUI(plom_socket, q)
+ elif self.mode.has_input_prompt and key == 'KEY_BACKSPACE':
+ self.input_ = self.input_[:-1]
+ elif self.mode.has_input_prompt and key != '\n': # Return key
+ self.input_ += key
+ max_length = self.window_width * self.size.y - len(input_prompt) - 1
+ if len(self.input_) > max_length:
+ self.input_ = self.input_[:max_length]
+ elif self.mode == self.mode_login and key == '\n':
+ self.login_name = self.input_
+ self.send('LOGIN ' + quote(self.input_))
+ self.input_ = ""
+ elif self.mode == self.mode_chat and key == '\n':
+ if self.input_[0] == '/':
+ if self.input_ in {'/P', '/play'}:
+ self.switch_mode('play')
+ elif self.input_ in {'/?', '/study'}:
+ self.switch_mode('study')
+ elif self.input_ == '/help':
+ self.help()
+ elif self.input_ == '/reconnect':
+ reconnect()
+ elif self.input_.startswith('/nick'):
+ tokens = self.input_.split(maxsplit=1)
+ if len(tokens) == 2:
+ self.send('LOGIN ' + quote(tokens[1]))
+ else:
+ self.log_msg('? need login name')
+ elif self.input_.startswith('/msg'):
+ tokens = self.input_.split(maxsplit=2)
+ if len(tokens) == 3:
+ self.send('QUERY %s %s' % (quote(tokens[1]),
+ quote(tokens[2])))
+ else:
+ self.log_msg('? need message target and message')
+ else:
+ self.log_msg('? unknown command')
+ else:
+ self.send('ALL ' + quote(self.input_))
+ self.input_ = ""
+ elif self.mode == self.mode_annotate and key == '\n':
+ if self.input_ == '':
+ self.input_ = ' '
+ self.send('ANNOTATE %s %s' % (self.explorer, quote(self.input_)))
+ self.input_ = ""
+ self.switch_mode('study', keep_position=True)
+ elif self.mode == self.mode_portal and key == '\n':
+ if self.input_ == '':
+ self.input_ = ' '
+ self.send('PORTAL %s %s' % (self.explorer, quote(self.input_)))
+ self.input_ = ""
+ self.switch_mode('study', keep_position=True)
+ elif self.mode == self.mode_teleport and key == '\n':
+ if self.input_ == 'YES!':
+ self.host = self.teleport_target_host
+ reconnect()
+ else:
+ self.log_msg('@ teleport aborted')
+ self.switch_mode('play')
+ self.input_ = ''
+ elif self.mode == self.mode_study:
+ if key == self.keys['switch_to_chat']:
+ self.switch_mode('chat')
+ elif key == self.keys['switch_to_play']:
+ self.switch_mode('play')
+ elif key == self.keys['switch_to_annotate']:
+ self.switch_mode('annotate', keep_position=True)
+ elif key == self.keys['switch_to_portal']:
+ self.switch_mode('portal', keep_position=True)
+ elif key in self.movement_keys:
+ move_explorer(self.movement_keys[key])
+ elif self.mode == self.mode_play:
+ if key == self.keys['switch_to_chat']:
+ self.switch_mode('chat')
+ elif key == self.keys['switch_to_study']:
+ self.switch_mode('study')
+ if key == self.keys['switch_to_edit']:
+ self.switch_mode('edit')
+ elif key == self.keys['flatten']:
+ self.send('TASK:FLATTEN_SURROUNDINGS')
+ elif key in self.movement_keys:
+ self.send('TASK:MOVE ' + self.movement_keys[key])
+ elif self.mode == self.mode_edit:
+ self.send('TASK:WRITE ' + key)
+ self.switch_mode('play')
+
+TUI('127.0.0.1:5000')