#!/usr/bin/python3 # This file is part of PlomRogue. PlomRogue is licensed under the GPL version 3 # or any later version. For details on its copyright, license, and warranties, # see the file NOTICE in the root directory of the PlomRogue source package. import argparse import errno import os import shlex import shutil import time import ctypes import math class RandomnessIO: """"Interface to libplomrogue's pseudo-randomness generator.""" def set_seed(self, seed): libpr.seed_rrand(1, seed) def get_seed(self): return libpr.seed_rrand(0, 0) def next(self): return libpr.rrand() seed = property(get_seed, set_seed) def prep_library(): """Prepare ctypes library at ./libplomrogue.so""" libpath = ("./libplomrogue.so") if not os.access(libpath, os.F_OK): raise SystemExit("No library " + libpath + ", run ./redo first?") libpr = ctypes.cdll.LoadLibrary(libpath) libpr.seed_rrand.restype = ctypes.c_uint32 return libpr def c_pointer_to_bytearray(ba): """Return C char * pointer to ba.""" type = ctypes.c_char * len(ba) return type.from_buffer(ba) def strong_write(file, string): """Apply write(string), then flush().""" file.write(string) file.flush() def setup_server_io(): """Fill IO files DB with proper file( path)s. Write process IO test string. Ensure IO files directory at server/. Remove any old input file if found. Set up new input file for reading, and new output file for appending. Start output file with process hash line of format PID + " " + floated UNIX time (io_db["teststring"]). Raise SystemExit if file is found at path of either record or save file plus io_db["tmp_suffix"]. """ def detect_atomic_leftover(path, tmp_suffix): path_tmp = path + tmp_suffix msg = "Found file '" + path_tmp + "' that may be a leftover from an " \ "aborted previous attempt to write '" + path + "'. Aborting " \ "until matter is resolved by removing it from its current path." if os.access(path_tmp, os.F_OK): raise SystemExit(msg) io_db["teststring"] = str(os.getpid()) + " " + str(time.time()) io_db["save_wait"] = 0 io_db["verbose"] = False io_db["record_chunk"] = "" os.makedirs(io_db["path_server"], exist_ok=True) io_db["file_out"] = open(io_db["path_out"], "a") strong_write(io_db["file_out"], io_db["teststring"] + "\n") if os.access(io_db["path_in"], os.F_OK): os.remove(io_db["path_in"]) io_db["file_in"] = open(io_db["path_in"], "w") io_db["file_in"].close() io_db["file_in"] = open(io_db["path_in"], "r") detect_atomic_leftover(io_db["path_save"], io_db["tmp_suffix"]) detect_atomic_leftover(io_db["path_record"], io_db["tmp_suffix"]) def cleanup_server_io(): """Close and (if io_db["kicked_by_rival"] false) remove files in io_db.""" def helper(file_key, path_key): if file_key in io_db: io_db[file_key].close() if not io_db["kicked_by_rival"] \ and os.access(io_db[path_key], os.F_OK): os.remove(io_db[path_key]) helper("file_in", "path_in") helper("file_out", "path_out") helper("file_worldstate", "path_worldstate") if "file_record" in io_db: io_db["file_record"].close() def log(msg): """Send "msg" to log.""" strong_write(io_db["file_out"], "LOG " + msg + "\n") def obey(command, prefix, replay=False, do_record=False): """Call function from commands_db mapped to command's first token. Tokenize command string with shlex.split(comments=True). If replay is set, a non-meta command from the commands_db merely triggers obey() on the next command from the records file. If not, non-meta commands set io_db["worldstate_updateable"] to world_db["WORLD_ACTIVE"], and, if do_record is set, are recorded to io_db["record_chunk"], and save_world() is called (and io_db["record_chunk"] written) if 15 seconds have passed since the last time it was called. The prefix string is inserted into the server's input message between its beginning 'input ' and ':'. All activity is preceded by a server_test() call. Commands that start with a lowercase letter are ignored when world_db["WORLD_ACTIVE"] is False/0. """ server_test() if io_db["verbose"]: print("input " + prefix + ": " + command) try: tokens = shlex.split(command, comments=True) except ValueError as err: print("Can't tokenize command string: " + str(err) + ".") return if len(tokens) > 0 and tokens[0] in commands_db \ and len(tokens) == commands_db[tokens[0]][0] + 1: if commands_db[tokens[0]][1]: commands_db[tokens[0]][2](*tokens[1:]) elif tokens[0][0].islower() and not world_db["WORLD_ACTIVE"]: print("Ignoring lowercase-starting commands when world inactive.") elif replay: print("Due to replay mode, reading command as 'go on in record'.") line = io_db["file_record"].readline() if len(line) > 0: obey(line.rstrip(), io_db["file_record"].prefix + str(io_db["file_record"].line_n)) io_db["file_record"].line_n = io_db["file_record"].line_n + 1 else: print("Reached end of record file.") else: commands_db[tokens[0]][2](*tokens[1:]) if do_record: io_db["record_chunk"] += command + "\n" if time.time() > io_db["save_wait"] + 15: atomic_write(io_db["path_record"], io_db["record_chunk"], do_append=True) if world_db["WORLD_ACTIVE"]: save_world() io_db["record_chunk"] = "" io_db["save_wait"] = time.time() io_db["worldstate_updateable"] = world_db["WORLD_ACTIVE"] elif 0 != len(tokens): print("Invalid command/argument, or bad number of tokens.") def atomic_write(path, text, do_append=False, delete=True): """Atomic write of text to file at path, appended if do_append is set.""" path_tmp = path + io_db["tmp_suffix"] mode = "w" if do_append: mode = "a" if os.access(path, os.F_OK): shutil.copyfile(path, path_tmp) file = open(path_tmp, mode) strong_write(file, text) file.close() if delete and os.access(path, os.F_OK): os.remove(path) os.rename(path_tmp, path) def save_world(): """Save all commands needed to reconstruct current world state.""" def quote(string): string = string.replace("\u005C", '\u005C\u005C') return '"' + string.replace('"', '\u005C"') + '"' def mapsetter(key): def helper(id=None): string = "" if key == "MAP" or world_db["Things"][id][key]: map = world_db["MAP"] if key == "MAP" \ else world_db["Things"][id][key] length = world_db["MAP_LENGTH"] for i in range(length): line = map[i * length:(i * length) + length].decode() string = string + key + " " + str(i) + " " + quote(line) \ + "\n" return string return helper def memthing(id): string = "" for memthing in world_db["Things"][id]["T_MEMTHING"]: string = string + "T_MEMTHING " + str(memthing[0]) + " " + \ str(memthing[1]) + " " + str(memthing[2]) + "\n" return string def helper(category, id_string, special_keys={}): string = "" for id in sorted(world_db[category].keys()): string = string + id_string + " " + str(id) + "\n" for key in sorted(world_db[category][id].keys()): if not key in special_keys: x = world_db[category][id][key] argument = quote(x) if str == type(x) else str(x) string = string + key + " " + argument + "\n" elif special_keys[key]: string = string + special_keys[key](id) return string string = "" for key in sorted(world_db.keys()): if (not isinstance(world_db[key], dict)) and key != "MAP" and \ key != "WORLD_ACTIVE": string = string + key + " " + str(world_db[key]) + "\n" string = string + mapsetter("MAP")() string = string + helper("ThingActions", "TA_ID") string = string + helper("ThingTypes", "TT_ID", {"TT_CORPSE_ID": False}) for id in sorted(world_db["ThingTypes"].keys()): string = string + "TT_ID " + str(id) + "\n" + "TT_CORPSE_ID " + \ str(world_db["ThingTypes"][id]["TT_CORPSE_ID"]) + "\n" string = string + helper("Things", "T_ID", {"T_CARRIES": False, "carried": False, "T_MEMMAP": mapsetter("T_MEMMAP"), "T_MEMTHING": memthing, "fovmap": False, "T_MEMDEPTHMAP": mapsetter("T_MEMDEPTHMAP")}) for id in sorted(world_db["Things"].keys()): if [] != world_db["Things"][id]["T_CARRIES"]: string = string + "T_ID " + str(id) + "\n" for carried in sorted(world_db["Things"][id]["T_CARRIES"]): string = string + "T_CARRIES " + str(carried) + "\n" string = string + "SEED_RANDOMNESS " + str(rand.seed) + "\n" + \ "WORLD_ACTIVE " + str(world_db["WORLD_ACTIVE"]) atomic_write(io_db["path_save"], string) def obey_lines_in_file(path, name, do_record=False): """Call obey() on each line of path's file, use name in input prefix.""" file = open(path, "r") line_n = 1 for line in file.readlines(): obey(line.rstrip(), name + "file line " + str(line_n), do_record=do_record) line_n = line_n + 1 file.close() def parse_command_line_arguments(): """Return settings values read from command line arguments.""" parser = argparse.ArgumentParser() parser.add_argument('-s', nargs='?', type=int, dest='replay', const=1, action='store') parser.add_argument('-l', nargs="?", const="save", dest='savefile', action="store") parser.add_argument('-v', dest='verbose', action='store_true') opts, unknown = parser.parse_known_args() return opts def server_test(): """Ensure valid server out file belonging to current process. This is done by comparing io_db["teststring"] to what's found at the start of the current file at io_db["path_out"]. On failure, set io_db["kicked_by_rival"] and raise SystemExit. """ if not os.access(io_db["path_out"], os.F_OK): raise SystemExit("Server output file has disappeared.") file = open(io_db["path_out"], "r") test = file.readline().rstrip("\n") file.close() if test != io_db["teststring"]: io_db["kicked_by_rival"] = True msg = "Server test string in server output file does not match. This" \ " indicates that the current server process has been " \ "superseded by another one." raise SystemExit(msg) def read_command(): """Return next newline-delimited command from server in file. Keep building return string until a newline is encountered. Pause between unsuccessful reads, and after too much waiting, run server_test(). """ wait_on_fail = 0.03333 max_wait = 5 now = time.time() command = "" while True: add = io_db["file_in"].readline() if len(add) > 0: command = command + add if len(command) > 0 and "\n" == command[-1]: command = command[:-1] break else: time.sleep(wait_on_fail) if now + max_wait < time.time(): server_test() now = time.time() return command def try_worldstate_update(): """Write worldstate file if io_db["worldstate_updateable"] is set.""" if io_db["worldstate_updateable"]: def write_map(string, map): for i in range(length): line = map[i * length:(i * length) + length].decode() string = string + line + "\n" return string inventory = "" if [] == world_db["Things"][0]["T_CARRIES"]: inventory = "(none)\n" else: for id in world_db["Things"][0]["T_CARRIES"]: type_id = world_db["Things"][id]["T_TYPE"] name = world_db["ThingTypes"][type_id]["TT_NAME"] inventory = inventory + name + "\n" string = str(world_db["TURN"]) + "\n" + \ str(world_db["Things"][0]["T_LIFEPOINTS"]) + "\n" + \ str(world_db["Things"][0]["T_SATIATION"]) + "\n" + \ inventory + "%\n" + \ str(world_db["Things"][0]["T_POSY"]) + "\n" + \ str(world_db["Things"][0]["T_POSX"]) + "\n" + \ str(world_db["MAP_LENGTH"]) + "\n" length = world_db["MAP_LENGTH"] fov = bytearray(b' ' * (length ** 2)) ord_v = ord("v") for pos in [pos for pos in range(length ** 2) if ord_v == world_db["Things"][0]["fovmap"][pos]]: fov[pos] = world_db["MAP"][pos] length = world_db["MAP_LENGTH"] for id in [id for tid in reversed(sorted(list(world_db["ThingTypes"]))) for id in world_db["Things"] if not world_db["Things"][id]["carried"] if world_db["Things"][id]["T_TYPE"] == tid if world_db["Things"][0]["fovmap"][ world_db["Things"][id]["T_POSY"] * length + world_db["Things"][id]["T_POSX"]] == ord_v]: type = world_db["Things"][id]["T_TYPE"] c = ord(world_db["ThingTypes"][type]["TT_SYMBOL"]) fov[world_db["Things"][id]["T_POSY"] * length + world_db["Things"][id]["T_POSX"]] = c string = write_map(string, fov) mem = world_db["Things"][0]["T_MEMMAP"][:] for mt in [mt for tid in reversed(sorted(list(world_db["ThingTypes"]))) for mt in world_db["Things"][0]["T_MEMTHING"] if mt[0] == tid]: c = world_db["ThingTypes"][mt[0]]["TT_SYMBOL"] mem[(mt[1] * length) + mt[2]] = ord(c) string = write_map(string, mem) atomic_write(io_db["path_worldstate"], string, delete=False) strong_write(io_db["file_out"], "WORLD_UPDATED\n") io_db["worldstate_updateable"] = False def replay_game(): """Replay game from record file. Use opts.replay as breakpoint turn to which to replay automatically before switching to manual input by non-meta commands in server input file triggering further reads of record file. Ensure opts.replay is at least 1. Run try_worldstate_update() before each interactive obey()/read_command(). """ if opts.replay < 1: opts.replay = 1 print("Replay mode. Auto-replaying up to turn " + str(opts.replay) + " (if so late a turn is to be found).") if not os.access(io_db["path_record"], os.F_OK): raise SystemExit("No record file found to replay.") io_db["file_record"] = open(io_db["path_record"], "r") io_db["file_record"].prefix = "record file line " io_db["file_record"].line_n = 1 while world_db["TURN"] < opts.replay: line = io_db["file_record"].readline() if "" == line: break obey(line.rstrip(), io_db["file_record"].prefix + str(io_db["file_record"].line_n)) io_db["file_record"].line_n = io_db["file_record"].line_n + 1 while True: try_worldstate_update() obey(read_command(), "in file", replay=True) def play_game(): """Play game by server input file commands. Before, load save file found. If no save file is found, a new world is generated from the commands in the world config plus a 'MAKE WORLD [current Unix timestamp]'. Record this command and all that follow via the server input file. Run try_worldstate_update() before each interactive obey()/read_command(). """ if os.access(io_db["path_save"], os.F_OK): obey_lines_in_file(io_db["path_save"], "save") else: if not os.access(io_db["path_worldconf"], os.F_OK): msg = "No world config file from which to start a new world." raise SystemExit(msg) obey_lines_in_file(io_db["path_worldconf"], "world config ", do_record=True) obey("MAKE_WORLD " + str(int(time.time())), "in file", do_record=True) while True: try_worldstate_update() obey(read_command(), "in file", do_record=True) def make_map(): """(Re-)make island map. Let "~" represent water, "." land, "X" trees: Build island shape randomly, start with one land cell in the middle, then go into cycle of repeatedly selecting a random sea cell and transforming it into land if it is neighbor to land. The cycle ends when a land cell is due to be created at the map's border. Then put some trees on the map (TODO: more precise algorithm desc). """ def is_neighbor(coordinates, type): y = coordinates[0] x = coordinates[1] length = world_db["MAP_LENGTH"] ind = y % 2 diag_west = x + (ind > 0) diag_east = x + (ind < (length - 1)) pos = (y * length) + x if (y > 0 and diag_east and type == chr(world_db["MAP"][pos - length + ind])) \ or (x < (length - 1) and type == chr(world_db["MAP"][pos + 1])) \ or (y < (length - 1) and diag_east and type == chr(world_db["MAP"][pos + length + ind])) \ or (y > 0 and diag_west and type == chr(world_db["MAP"][pos - length - (not ind)])) \ or (x > 0 and type == chr(world_db["MAP"][pos - 1])) \ or (y < (length - 1) and diag_west and type == chr(world_db["MAP"][pos + length - (not ind)])): return True return False world_db["MAP"] = bytearray(b'~' * (world_db["MAP_LENGTH"] ** 2)) length = world_db["MAP_LENGTH"] add_half_width = (not (length % 2)) * int(length / 2) world_db["MAP"][int((length ** 2) / 2) + add_half_width] = ord(".") while (1): y = rand.next() % length x = rand.next() % length pos = (y * length) + x if "~" == chr(world_db["MAP"][pos]) and is_neighbor((y, x), "."): if y == 0 or y == (length - 1) or x == 0 or x == (length - 1): break world_db["MAP"][pos] = ord(".") n_trees = int((length ** 2) / 16) i_trees = 0 while (i_trees <= n_trees): single_allowed = rand.next() % 32 y = rand.next() % length x = rand.next() % length pos = (y * length) + x if "." == chr(world_db["MAP"][pos]) \ and ((not single_allowed) or is_neighbor((y, x), "X")): world_db["MAP"][pos] = ord("X") i_trees += 1 # This all-too-precise replica of the original C code misses iter_limit(). def eat_vs_hunger_threshold(thingtype): """Return satiation cost of eating for type. Good food for it must be >.""" hunger_unit = hunger_per_turn(thingtype) actiontype = [id for id in world_db["ThingActions"] if world_db["ThingActions"][id]["TA_NAME"] == "use"][0] return world_db["ThingActions"][actiontype]["TA_EFFORT"] * hunger_unit def update_map_memory(t, age_map=True): """Update t's T_MEMMAP with what's in its FOV now,age its T_MEMMEPTHMAP.""" def age_some_memdepthmap_on_nonfov_cells(): # OUTSOURCED FOR PERFORMANCE REASONS TO libplomrogue.so: # ord_v = ord("v") # ord_0 = ord("0") # ord_9 = ord("9") # for pos in [pos for pos in range(world_db["MAP_LENGTH"] ** 2) # if not ord_v == t["fovmap"][pos] # if ord_0 <= t["T_MEMDEPTHMAP"][pos] # if ord_9 > t["T_MEMDEPTHMAP"][pos] # if not rand.next() % (2 ** # (t["T_MEMDEPTHMAP"][pos] - 48))]: # t["T_MEMDEPTHMAP"][pos] += 1 memdepthmap = c_pointer_to_bytearray(t["T_MEMDEPTHMAP"]) fovmap = c_pointer_to_bytearray(t["fovmap"]) libpr.age_some_memdepthmap_on_nonfov_cells(memdepthmap, fovmap) if not t["T_MEMMAP"]: t["T_MEMMAP"] = bytearray(b' ' * (world_db["MAP_LENGTH"] ** 2)) if not t["T_MEMDEPTHMAP"]: t["T_MEMDEPTHMAP"] = bytearray(b' ' * (world_db["MAP_LENGTH"] ** 2)) ord_v = ord("v") ord_0 = ord("0") for pos in [pos for pos in range(world_db["MAP_LENGTH"] ** 2) if ord_v == t["fovmap"][pos]]: t["T_MEMDEPTHMAP"][pos] = ord_0 t["T_MEMMAP"][pos] = world_db["MAP"][pos] if age_map: age_some_memdepthmap_on_nonfov_cells() t["T_MEMTHING"] = [mt for mt in t["T_MEMTHING"] if ord_v != t["fovmap"][(mt[1] * world_db["MAP_LENGTH"]) + mt[2]]] for id in [id for id in world_db["Things"] if not world_db["Things"][id]["carried"]]: type = world_db["Things"][id]["T_TYPE"] if not world_db["ThingTypes"][type]["TT_LIFEPOINTS"]: y = world_db["Things"][id]["T_POSY"] x = world_db["Things"][id]["T_POSX"] if ord_v == t["fovmap"][(y * world_db["MAP_LENGTH"]) + x]: t["T_MEMTHING"].append((type, y, x)) def set_world_inactive(): """Set world_db["WORLD_ACTIVE"] to 0 and remove worldstate file.""" server_test() if os.access(io_db["path_worldstate"], os.F_OK): os.remove(io_db["path_worldstate"]) world_db["WORLD_ACTIVE"] = 0 def integer_test(val_string, min, max=None): """Return val_string if integer >= min & (if max set) <= max, else None.""" try: val = int(val_string) if val < min or (max is not None and val > max): raise ValueError return val except ValueError: msg = "Ignoring: Please use integer >= " + str(min) if max is not None: msg += " and <= " + str(max) msg += "." print(msg) return None def setter(category, key, min, max=None): """Build setter for world_db([category + "s"][id])[key] to >=min/<=max.""" if category is None: def f(val_string): val = integer_test(val_string, min, max) if None != val: world_db[key] = val else: if category == "Thing": id_store = command_tid decorator = test_Thing_id elif category == "ThingType": id_store = command_ttid decorator = test_ThingType_id elif category == "ThingAction": id_store = command_taid decorator = test_ThingAction_id @decorator def f(val_string): val = integer_test(val_string, min, max) if None != val: world_db[category + "s"][id_store.id][key] = val return f def build_fov_map(t): """Build Thing's FOV map.""" t["fovmap"] = bytearray(b'v' * (world_db["MAP_LENGTH"] ** 2)) fovmap = c_pointer_to_bytearray(t["fovmap"]) map = c_pointer_to_bytearray(world_db["MAP"]) if libpr.build_fov_map(t["T_POSY"], t["T_POSX"], fovmap, map): raise RuntimeError("Malloc error in build_fov_Map().") def log_help(): """Send quick usage info to log.""" log("LOG See README file for help.") def decrement_lifepoints(t): """Decrement t's lifepoints by 1, and if to zero, corpse it. If t is the player avatar, only blank its fovmap, so that the client may still display memory data. On non-player things, erase fovmap and memory. Dying actors drop all their things. """ t["T_LIFEPOINTS"] -= 1 if 0 == t["T_LIFEPOINTS"]: for id in t["T_CARRIES"]: t["T_CARRIES"].remove(id) world_db["Things"][id]["T_POSY"] = t["T_POSY"] world_db["Things"][id]["T_POSX"] = t["T_POSX"] world_db["Things"][id]["carried"] = False t["T_TYPE"] = world_db["ThingTypes"][t["T_TYPE"]]["TT_CORPSE_ID"] if world_db["Things"][0] == t: t["fovmap"] = bytearray(b' ' * (world_db["MAP_LENGTH"] ** 2)) log("You die.") log("See README on how to start over.") else: t["fovmap"] = False t["T_MEMMAP"] = False t["T_MEMDEPTHMAP"] = False t["T_MEMTHING"] = [] def mv_yx_in_dir_legal(dir, y, x): """Wrapper around libpr.mv_yx_in_dir_legal to simplify its use.""" dir_c = dir.encode("ascii")[0] test = libpr.mv_yx_in_dir_legal_wrap(dir_c, y, x) if -1 == test: raise RuntimeError("Too much wrapping in mv_yx_in_dir_legal_wrap()!") return (test, libpr.result_y(), libpr.result_x()) def actor_wait(t): """Make t do nothing (but loudly, if player avatar).""" if t == world_db["Things"][0]: log("You wait") def actor_move(t): """If passable, move/collide(=attack) thing into T_ARGUMENT's direction.""" passable = False move_result = mv_yx_in_dir_legal(chr(t["T_ARGUMENT"]), t["T_POSY"], t["T_POSX"]) if 1 == move_result[0]: pos = (move_result[1] * world_db["MAP_LENGTH"]) + move_result[2] hitted = [id for id in world_db["Things"] if world_db["Things"][id] != t if world_db["Things"][id]["T_LIFEPOINTS"] if world_db["Things"][id]["T_POSY"] == move_result[1] if world_db["Things"][id]["T_POSX"] == move_result[2]] if len(hitted): hit_id = hitted[0] if t == world_db["Things"][0]: hitted_type = world_db["Things"][hit_id]["T_TYPE"] hitted_name = world_db["ThingTypes"][hitted_type]["TT_NAME"] log("You WOUND" + hitted_name + ".") elif 0 == hit_id: hitter_name = world_db["ThingTypes"][t["T_TYPE"]]["TT_NAME"] log(hitter_name +" WOUNDS you.") decrement_lifepoints(world_db["Things"][hit_id]) return passable = "." == chr(world_db["MAP"][pos]) dir = [dir for dir in directions_db if directions_db[dir] == chr(t["T_ARGUMENT"])][0] if passable: t["T_POSY"] = move_result[1] t["T_POSX"] = move_result[2] for id in t["T_CARRIES"]: world_db["Things"][id]["T_POSY"] = move_result[1] world_db["Things"][id]["T_POSX"] = move_result[2] build_fov_map(t) if t == world_db["Things"][0]: log("You MOVE " + dir + ".") def actor_pick_up(t): """Make t pick up (topmost?) Thing from ground into inventory. Define topmostness by how low the thing's type ID is. """ ids = [id for id in world_db["Things"] if world_db["Things"][id] != t if not world_db["Things"][id]["carried"] if world_db["Things"][id]["T_POSY"] == t["T_POSY"] if world_db["Things"][id]["T_POSX"] == t["T_POSX"]] if len(ids): lowest_tid = -1 for iid in ids: tid = world_db["Things"][iid]["T_TYPE"] if lowest_tid == -1 or tid < lowest_tid: id = iid lowest_tid = tid world_db["Things"][id]["carried"] = True t["T_CARRIES"].append(id) if t == world_db["Things"][0]: log("You PICK UP an object.") def actor_drop(t): """Make t rop Thing from inventory to ground indexed by T_ARGUMENT.""" # TODO: Handle case where T_ARGUMENT matches nothing. if len(t["T_CARRIES"]): id = t["T_CARRIES"][t["T_ARGUMENT"]] t["T_CARRIES"].remove(id) world_db["Things"][id]["carried"] = False if t == world_db["Things"][0]: log("You DROP an object.") def actor_use(t): """Make t use (for now: consume) T_ARGUMENT-indexed Thing in inventory.""" # TODO: Handle case where T_ARGUMENT matches nothing. if len(t["T_CARRIES"]): id = t["T_CARRIES"][t["T_ARGUMENT"]] type = world_db["Things"][id]["T_TYPE"] if world_db["ThingTypes"][type]["TT_TOOL"] == "food": t["T_CARRIES"].remove(id) del world_db["Things"][id] t["T_SATIATION"] += world_db["ThingTypes"][type]["TT_TOOLPOWER"] if t == world_db["Things"][0]: log("You CONSUME this object.") elif t == world_db["Things"][0]: log("You try to use this object, but FAIL.") def thingproliferation(t, prol_map): """To chance of 1/TT_PROLIFERATE, create t offspring in open neighbor cell. Naturally only works with TT_PROLIFERATE > 0. The neighbor cell must be be marked '.' in prol_map. If there are several map cell candidates, one is selected randomly. """ prolscore = world_db["ThingTypes"][t["T_TYPE"]]["TT_PROLIFERATE"] if prolscore and (1 == prolscore or 1 == (rand.next() % prolscore)): candidates = [] for dir in [directions_db[key] for key in sorted(directions_db.keys())]: mv_result = mv_yx_in_dir_legal(dir, t["T_POSY"], t["T_POSX"]) if mv_result[0] and ord('.') == prol_map[mv_result[1] * world_db["MAP_LENGTH"] + mv_result[2]]: candidates.append((mv_result[1], mv_result[2])) if len(candidates): i = rand.next() % len(candidates) id = id_setter(-1, "Things") newT = new_Thing(t["T_TYPE"], (candidates[i][0], candidates[i][1])) world_db["Things"][id] = newT def try_healing(t): """If t's HP < max, increment them if well-nourished, maybe waiting.""" if t["T_LIFEPOINTS"] < \ world_db["ThingTypes"][t["T_TYPE"]]["TT_LIFEPOINTS"]: wait_id = [id for id in world_db["ThingActions"] if world_db["ThingActions"][id]["TA_NAME"] == "wait"][0] wait_divider = 8 if t["T_COMMAND"] == wait_id else 1 testval = int(abs(t["T_SATIATION"]) / wait_divider) if (testval <= 1 or 1 == (rand.next() % testval)): t["T_LIFEPOINTS"] += 1 if t == world_db["Things"][0]: log("You HEAL.") def hunger_per_turn(type_id): """The amount of satiation score lost per turn for things of given type.""" return int(math.sqrt(world_db["ThingTypes"][type_id]["TT_LIFEPOINTS"])) def hunger(t): """Decrement t's satiation,dependent on it trigger lifepoint dec chance.""" if t["T_SATIATION"] > -32768: t["T_SATIATION"] -= hunger_per_turn(t["T_TYPE"]) if 0 != t["T_SATIATION"] and 0 == int(rand.next() / abs(t["T_SATIATION"])): if t == world_db["Things"][0]: if t["T_SATIATION"] < 0: log("You SUFFER from hunger.") else: log("You SUFFER from over-eating.") decrement_lifepoints(t) def get_dir_to_target(t, filter): """Try to set T_COMMAND/T_ARGUMENT for move to "filter"-determined target. The path-wise nearest target is chosen, via the shortest available path. Target must not be t. On succcess, return positive value, else False. Filters: "a": Thing in FOV is animate, but of ThingType, starts out weaker than t is, and its corpse would be healthy food for t "f": move away from an enemy – any visible actor whose thing type has more TT_LIFEPOINTS than t LIFEPOINTS, and might find t's corpse healthy food – if it is closer than n steps, where n will shrink as t's hunger grows; if enemy is too close, move towards (attack) the enemy instead; if no fleeing is possible, nor attacking useful, wait; don't tread on non-enemies for fleeing "c": Thing in memorized map is consumable of sufficient nutrition for t "s": memory map cell with greatest-reachable degree of unexploredness """ def zero_score_map_where_char_on_memdepthmap(c): # OUTSOURCED FOR PERFORMANCE REASONS TO libplomrogue.so: # for i in [i for i in range(world_db["MAP_LENGTH"] ** 2) # if t["T_MEMDEPTHMAP"][i] == mem_depth_c[0]]: # set_map_score(i, 0) map = c_pointer_to_bytearray(t["T_MEMDEPTHMAP"]) if libpr.zero_score_map_where_char_on_memdepthmap(c, map): raise RuntimeError("No score map allocated for " "zero_score_map_where_char_on_memdepthmap().") def set_map_score_at_thingpos(id, score): pos = world_db["Things"][id]["T_POSY"] * world_db["MAP_LENGTH"] \ + world_db["Things"][id]["T_POSX"] set_map_score(pos, score) def set_map_score(pos, score): test = libpr.set_map_score(pos, score) if test: raise RuntimeError("No score map allocated for set_map_score().") def get_map_score(pos): result = libpr.get_map_score(pos) if result < 0: raise RuntimeError("No score map allocated for get_map_score().") return result def animate_in_fov(Thing): if Thing["carried"] or Thing == t or not Thing["T_LIFEPOINTS"]: return False pos = Thing["T_POSY"] * world_db["MAP_LENGTH"] + Thing["T_POSX"] if ord("v") == t["fovmap"][pos]: return True def good_attack_target(v): eat_cost = eat_vs_hunger_threshold(t["T_TYPE"]) type = world_db["ThingTypes"][v["T_TYPE"]] type_corpse = world_db["ThingTypes"][type["TT_CORPSE_ID"]] if t["T_LIFEPOINTS"] > type["TT_LIFEPOINTS"] \ and type_corpse["TT_TOOL"] == "food" \ and type_corpse["TT_TOOLPOWER"] > eat_cost: return True return False def good_flee_target(m): own_corpse_id = world_db["ThingTypes"][t["T_TYPE"]]["TT_CORPSE_ID"] corpse_type = world_db["ThingTypes"][own_corpse_id] targetness = 0 if corpse_type["TT_TOOL"] != "food" \ else corpse_type["TT_TOOLPOWER"] type = world_db["ThingTypes"][m["T_TYPE"]] if t["T_LIFEPOINTS"] < type["TT_LIFEPOINTS"] \ and targetness > eat_vs_hunger_threshold(m["T_TYPE"]): return True return False def seeing_thing(): if t["fovmap"] and "a" == filter: for id in world_db["Things"]: if animate_in_fov(world_db["Things"][id]): if good_attack_target(world_db["Things"][id]): return True elif t["fovmap"] and "f" == filter: for id in world_db["Things"]: if animate_in_fov(world_db["Things"][id]): if good_flee_target(world_db["Things"][id]): return True elif t["T_MEMMAP"] and "c" == filter: eat_cost = eat_vs_hunger_threshold(t["T_TYPE"]) for mt in t["T_MEMTHING"]: if ' ' != chr(t["T_MEMMAP"][(mt[1] * world_db["MAP_LENGTH"]) + mt[2]]) \ and world_db["ThingTypes"][mt[0]]["TT_TOOL"] == "food" \ and world_db["ThingTypes"][mt[0]]["TT_TOOLPOWER"] \ > eat_cost: return True return False def set_cells_passable_on_memmap_to_65534_on_scoremap(): # OUTSOURCED FOR PERFORMANCE REASONS TO libplomrogue.so: # ord_dot = ord(".") # memmap = t["T_MEMMAP"] # for i in [i for i in range(world_db["MAP_LENGTH"] ** 2) # if ord_dot == memmap[i]]: # set_map_score(i, 65534) # i.e. 65535-1 map = c_pointer_to_bytearray(t["T_MEMMAP"]) if libpr.set_cells_passable_on_memmap_to_65534_on_scoremap(map): raise RuntimeError("No score map allocated for set_cells_passable" "_on_memmap_to_65534_on_scoremap().") def init_score_map(): test = libpr.init_score_map() if test: raise RuntimeError("Malloc error in init_score_map().") ord_v = ord("v") ord_blank = ord(" ") set_cells_passable_on_memmap_to_65534_on_scoremap() if "a" == filter: for id in world_db["Things"]: if animate_in_fov(world_db["Things"][id]) \ and good_attack_target(world_db["Things"][id]): set_map_score_at_thingpos(id, 0) elif "f" == filter: for id in world_db["Things"]: if animate_in_fov(world_db["Things"][id]) \ and good_flee_target(world_db["Things"][id]): set_map_score_at_thingpos(id, 0) elif "c" == filter: eat_cost = eat_vs_hunger_threshold(t["T_TYPE"]) for mt in [mt for mt in t["T_MEMTHING"] if ord_blank != t["T_MEMMAP"][mt[1] * world_db["MAP_LENGTH"] + mt[2]] if world_db["ThingTypes"][mt[0]]["TT_TOOL"] == "food" if world_db["ThingTypes"][mt[0]]["TT_TOOLPOWER"] > eat_cost]: set_map_score(mt[1] * world_db["MAP_LENGTH"] + mt[2], 0) elif "s" == filter: zero_score_map_where_char_on_memdepthmap(mem_depth_c[0]) if "a" != filter: for id in world_db["Things"]: if animate_in_fov(world_db["Things"][id]): if "f" == filter: pos = world_db["Things"][id]["T_POSY"] \ * world_db["MAP_LENGTH"] \ + world_db["Things"][id]["T_POSX"] if 0 == get_map_score(pos): continue set_map_score_at_thingpos(id, 65535) def rand_target_dir(neighbors, cmp, dirs): candidates = [] n_candidates = 0 for i in range(len(dirs)): if cmp == neighbors[i]: candidates.append(dirs[i]) n_candidates += 1 return candidates[rand.next() % n_candidates] if n_candidates else 0 def get_neighbor_scores(dirs, eye_pos): scores = [] if libpr.ready_neighbor_scores(eye_pos): raise RuntimeError("No score map allocated for " + "ready_neighbor_scores.()") for i in range(len(dirs)): scores.append(libpr.get_neighbor_score(i)) return scores def get_dir_from_neighbors(): dir_to_target = False dirs = "edcxsw" eye_pos = t["T_POSY"] * world_db["MAP_LENGTH"] + t["T_POSX"] neighbors = get_neighbor_scores(dirs, eye_pos) minmax_start = 0 if "f" == filter else 65535 - 1 minmax_neighbor = minmax_start for i in range(len(dirs)): if ("f" == filter and get_map_score(eye_pos) < neighbors[i] and minmax_neighbor < neighbors[i] and 65535 != neighbors[i]) \ or ("f" != filter and minmax_neighbor > neighbors[i]): minmax_neighbor = neighbors[i] if minmax_neighbor != minmax_start: dir_to_target = rand_target_dir(neighbors, minmax_neighbor, dirs) if "f" == filter: distance = get_map_score(eye_pos) fear_distance = world_db["MAP_LENGTH"] if t["T_SATIATION"] < 0 and math.sqrt(-t["T_SATIATION"]) > 0: fear_distance = fear_distance / math.sqrt(-t["T_SATIATION"]) attack_distance = 1 if not dir_to_target: if attack_distance >= distance: dir_to_target = rand_target_dir(neighbors, distance - 1, dirs) elif fear_distance >= distance: t["T_COMMAND"] = [id for id in world_db["ThingActions"] if world_db["ThingActions"][id]["TA_NAME"] == "wait"][0] return 1 elif dir_to_target and fear_distance < distance: dir_to_target = 0 return dir_to_target dir_to_target = False mem_depth_c = b' ' run_i = 9 + 1 if "s" == filter else 1 while run_i and not dir_to_target and ("s" == filter or seeing_thing()): run_i -= 1 init_score_map() mem_depth_c = b'9' if b' ' == mem_depth_c \ else bytes([mem_depth_c[0] - 1]) if libpr.dijkstra_map(): raise RuntimeError("No score map allocated for dijkstra_map().") dir_to_target = get_dir_from_neighbors() libpr.free_score_map() if dir_to_target and str == type(dir_to_target): t["T_COMMAND"] = [id for id in world_db["ThingActions"] if world_db["ThingActions"][id]["TA_NAME"] == "move"][0] t["T_ARGUMENT"] = ord(dir_to_target) return dir_to_target def standing_on_food(t): """Return True/False whether t is standing on healthy consumable.""" eat_cost = eat_vs_hunger_threshold(t["T_TYPE"]) for id in [id for id in world_db["Things"] if world_db["Things"][id] != t if not world_db["Things"][id]["carried"] if world_db["Things"][id]["T_POSY"] == t["T_POSY"] if world_db["Things"][id]["T_POSX"] == t["T_POSX"] if world_db["ThingTypes"][world_db["Things"][id]["T_TYPE"]] ["TT_TOOL"] == "food" if world_db["ThingTypes"][world_db["Things"][id]["T_TYPE"]] ["TT_TOOLPOWER"] > eat_cost]: return True return False def get_inventory_slot_to_consume(t): """Return invent. slot of healthiest consumable(if any healthy),else -1.""" cmp_food = -1 selection = -1 i = 0 eat_cost = eat_vs_hunger_threshold(t["T_TYPE"]) for id in t["T_CARRIES"]: type = world_db["Things"][id]["T_TYPE"] if world_db["ThingTypes"][type]["TT_TOOL"] == "food" \ and world_db["ThingTypes"][type]["TT_TOOLPOWER"]: nutvalue = world_db["ThingTypes"][type]["TT_TOOLPOWER"] tmp_cmp = abs(t["T_SATIATION"] + nutvalue - eat_cost) if (cmp_food < 0 and tmp_cmp < abs(t["T_SATIATION"])) \ or tmp_cmp < cmp_food: cmp_food = tmp_cmp selection = i i += 1 return selection def ai(t): """Determine next command/argment for actor t via AI algorithms.""" t["T_COMMAND"] = [id for id in world_db["ThingActions"] if world_db["ThingActions"][id]["TA_NAME"] == "wait"][0] if get_dir_to_target(t, "f"): return sel = get_inventory_slot_to_consume(t) if -1 != sel: t["T_COMMAND"] = [id for id in world_db["ThingActions"] if world_db["ThingActions"][id]["TA_NAME"] == "use"][0] t["T_ARGUMENT"] = sel elif standing_on_food(t): t["T_COMMAND"] = [id for id in world_db["ThingActions"] if world_db["ThingActions"][id]["TA_NAME"] == "pick_up"][0] else: going_to_known_food_spot = get_dir_to_target(t, "c") if not going_to_known_food_spot: aiming_for_walking_food = get_dir_to_target(t, "a") if not aiming_for_walking_food: get_dir_to_target(t, "s") def turn_over(): """Run game world and its inhabitants until new player input expected.""" id = 0 whilebreaker = False while world_db["Things"][0]["T_LIFEPOINTS"]: proliferable_map = world_db["MAP"][:] for id in [id for id in world_db["Things"] if not world_db["Things"][id]["carried"]]: y = world_db["Things"][id]["T_POSY"] x = world_db["Things"][id]["T_POSX"] proliferable_map[y * world_db["MAP_LENGTH"] + x] = ord('X') for id in [id for id in world_db["Things"]]: # Only what's from start! if not id in world_db["Things"] or \ world_db["Things"][id]["carried"]: # May have been consumed or continue # picked up during turn … Thing = world_db["Things"][id] if Thing["T_LIFEPOINTS"]: if not Thing["T_COMMAND"]: update_map_memory(Thing) if 0 == id: whilebreaker = True break ai(Thing) try_healing(Thing) hunger(Thing) if Thing["T_LIFEPOINTS"]: Thing["T_PROGRESS"] += 1 taid = [a for a in world_db["ThingActions"] if a == Thing["T_COMMAND"]][0] ThingAction = world_db["ThingActions"][taid] if Thing["T_PROGRESS"] == ThingAction["TA_EFFORT"]: eval("actor_" + ThingAction["TA_NAME"])(Thing) Thing["T_COMMAND"] = 0 Thing["T_PROGRESS"] = 0 thingproliferation(Thing, proliferable_map) if whilebreaker: break world_db["TURN"] += 1 def new_Thing(type, pos=(0, 0)): """Return Thing of type T_TYPE, with fovmap if alive and world active.""" thing = { "T_LIFEPOINTS": world_db["ThingTypes"][type]["TT_LIFEPOINTS"], "T_ARGUMENT": 0, "T_PROGRESS": 0, "T_SATIATION": 0, "T_COMMAND": 0, "T_TYPE": type, "T_POSY": pos[0], "T_POSX": pos[1], "T_CARRIES": [], "carried": False, "T_MEMTHING": [], "T_MEMMAP": False, "T_MEMDEPTHMAP": False, "fovmap": False } if world_db["WORLD_ACTIVE"] and thing["T_LIFEPOINTS"]: build_fov_map(thing) return thing def id_setter(id, category, id_store=False, start_at_1=False): """Set ID of object of category to manipulate ID unused? Create new one. The ID is stored as id_store.id (if id_store is set). If the integer of the input is valid (if start_at_1, >= 0, else >= -1), but <0 or (if start_at_1) <1, calculate new ID: lowest unused ID >=0 or (if start_at_1) >= 1. None is always returned when no new object is created, else the new object's ID. """ min = 0 if start_at_1 else -1 if str == type(id): id = integer_test(id, min) if None != id: if id in world_db[category]: if id_store: id_store.id = id return None else: if (start_at_1 and 0 == id) \ or ((not start_at_1) and (id < 0)): id = 0 if start_at_1 else -1 while 1: id = id + 1 if id not in world_db[category]: break if id_store: id_store.id = id return id def command_plugin(str_plugin): """Run code in plugins/[str_plugin].""" if (str_plugin.replace("_", "").isalnum() and os.access("plugins/" + str_plugin, os.F_OK)): exec(open("plugins/" + str_plugin).read()) return print("Bad plugin name:", str_plugin) def command_ping(): """Send PONG line to server output file.""" strong_write(io_db["file_out"], "PONG\n") def command_quit(): """Abort server process.""" if None == opts.replay: if world_db["WORLD_ACTIVE"]: save_world() atomic_write(io_db["path_record"], io_db["record_chunk"], do_append=True) raise SystemExit("received QUIT command") def command_thingshere(str_y, str_x): """Write to out file list of Things known to player at coordinate y, x.""" if world_db["WORLD_ACTIVE"]: y = integer_test(str_y, 0, 255) x = integer_test(str_x, 0, 255) length = world_db["MAP_LENGTH"] if None != y and None != x and y < length and x < length: pos = (y * world_db["MAP_LENGTH"]) + x strong_write(io_db["file_out"], "THINGS_HERE START\n") if "v" == chr(world_db["Things"][0]["fovmap"][pos]): for id in [id for tid in sorted(list(world_db["ThingTypes"])) for id in world_db["Things"] if not world_db["Things"][id]["carried"] if world_db["Things"][id]["T_TYPE"] == tid if y == world_db["Things"][id]["T_POSY"] if x == world_db["Things"][id]["T_POSX"]]: type = world_db["Things"][id]["T_TYPE"] name = world_db["ThingTypes"][type]["TT_NAME"] strong_write(io_db["file_out"], name + "\n") else: for mt in [mt for tid in sorted(list(world_db["ThingTypes"])) for mt in world_db["Things"][0]["T_MEMTHING"] if mt[0] == tid if y == mt[1] if x == mt[2]]: name = world_db["ThingTypes"][mt[0]]["TT_NAME"] strong_write(io_db["file_out"], name + "\n") strong_write(io_db["file_out"], "THINGS_HERE END\n") else: print("Ignoring: Invalid map coordinates.") else: print("Ignoring: Command only works on existing worlds.") def set_command(action): """Set player's T_COMMAND, then call turn_over().""" id = [x for x in world_db["ThingActions"] if world_db["ThingActions"][x]["TA_NAME"] == action][0] world_db["Things"][0]["T_COMMAND"] = id turn_over() def play_wait(): """Try "wait" as player's T_COMMAND.""" set_command("wait") def play_pickup(): """Try "pick_up" as player's T_COMMAND".""" t = world_db["Things"][0] ids = [id for id in world_db["Things"] if id if not world_db["Things"][id]["carried"] if world_db["Things"][id]["T_POSY"] == t["T_POSY"] if world_db["Things"][id]["T_POSX"] == t["T_POSX"]] if not len(ids): log("NOTHING to pick up.") else: set_command("pick_up") def play_drop(str_arg): """Try "drop" as player's T_COMMAND, int(str_arg) as T_ARGUMENT / slot.""" t = world_db["Things"][0] if 0 == len(t["T_CARRIES"]): log("You have NOTHING to drop in your inventory.") else: val = integer_test(str_arg, 0, 255) if None != val and val < len(t["T_CARRIES"]): world_db["Things"][0]["T_ARGUMENT"] = val set_command("drop") else: print("Illegal inventory index.") def play_use(str_arg): """Try "use" as player's T_COMMAND, int(str_arg) as T_ARGUMENT / slot.""" t = world_db["Things"][0] if 0 == len(t["T_CARRIES"]): log("You have NOTHING to use in your inventory.") else: val = integer_test(str_arg, 0, 255) if None != val and val < len(t["T_CARRIES"]): id = t["T_CARRIES"][val] type = world_db["Things"][id]["T_TYPE"] if not world_db["ThingTypes"][type]["TT_TOOL"] == "food": log("You CAN'T consume this thing.") return world_db["Things"][0]["T_ARGUMENT"] = val set_command("use") else: print("Illegal inventory index.") def play_move(str_arg): """Try "move" as player's T_COMMAND, str_arg as T_ARGUMENT / direction.""" t = world_db["Things"][0] if not str_arg in directions_db: print("Illegal move direction string.") return dir = ord(directions_db[str_arg]) move_result = mv_yx_in_dir_legal(chr(dir), t["T_POSY"], t["T_POSX"]) if 1 == move_result[0]: pos = (move_result[1] * world_db["MAP_LENGTH"]) + move_result[2] if ord(".") == world_db["MAP"][pos]: world_db["Things"][0]["T_ARGUMENT"] = dir set_command("move") return log("You CAN'T move there.") def command_seedrandomness(seed_string): """Set rand seed to int(seed_string).""" val = integer_test(seed_string, 0, 4294967295) if None != val: rand.seed = val def command_makeworld(seed_string): """(Re-)build game world, i.e. map, things, to a new turn 1 from seed. Seed rand with seed. Do more only with a "wait" ThingAction and world["PLAYER_TYPE"] matching ThingType of TT_START_NUMBER > 0. Then, world_db["Things"] emptied, call make_map() and set world_db["WORLD_ACTIVE"], world_db["TURN"] to 1. Build new Things according to ThingTypes' TT_START_NUMBERS, with Thing of ID 0 to ThingType of ID = world["PLAYER_TYPE"]. Place Things randomly, and actors not on each other. Init player's memory map. Write "NEW_WORLD" line to out file. Call log_help(). """ def free_pos(): i = 0 while 1: err = "Space to put thing on too hard to find. Map too small?" while 1: y = rand.next() % world_db["MAP_LENGTH"] x = rand.next() % world_db["MAP_LENGTH"] if "." == chr(world_db["MAP"][y * world_db["MAP_LENGTH"] + x]): break i += 1 if i == 65535: raise SystemExit(err) # Replica of C code, wrongly ignores animatedness of new Thing. pos_clear = (0 == len([id for id in world_db["Things"] if world_db["Things"][id]["T_LIFEPOINTS"] if world_db["Things"][id]["T_POSY"] == y if world_db["Things"][id]["T_POSX"] == x])) if pos_clear: break return (y, x) val = integer_test(seed_string, 0, 4294967295) if None == val: return rand.seed = val player_will_be_generated = False playertype = world_db["PLAYER_TYPE"] for ThingType in world_db["ThingTypes"]: if playertype == ThingType: if 0 < world_db["ThingTypes"][ThingType]["TT_START_NUMBER"]: player_will_be_generated = True break if not player_will_be_generated: print("Ignoring: No player type with start number >0 defined.") return wait_action = False for ThingAction in world_db["ThingActions"]: if "wait" == world_db["ThingActions"][ThingAction]["TA_NAME"]: wait_action = True if not wait_action: print("Ignoring beyond SEED_MAP: " + "No thing action with name 'wait' defined.") return world_db["Things"] = {} make_map() world_db["WORLD_ACTIVE"] = 1 world_db["TURN"] = 1 for i in range(world_db["ThingTypes"][playertype]["TT_START_NUMBER"]): id = id_setter(-1, "Things") world_db["Things"][id] = new_Thing(playertype, free_pos()) if not world_db["Things"][0]["fovmap"]: empty_fovmap = bytearray(b" " * world_db["MAP_LENGTH"] ** 2) world_db["Things"][0]["fovmap"] = empty_fovmap update_map_memory(world_db["Things"][0]) for type in world_db["ThingTypes"]: for i in range(world_db["ThingTypes"][type]["TT_START_NUMBER"]): if type != playertype: id = id_setter(-1, "Things") world_db["Things"][id] = new_Thing(type, free_pos()) strong_write(io_db["file_out"], "NEW_WORLD\n") log_help() def command_maplength(maplength_string): """Redefine map length. Invalidate map, therefore lose all things on it.""" val = integer_test(maplength_string, 1, 256) if None != val: world_db["MAP_LENGTH"] = val world_db["MAP"] = False set_world_inactive() world_db["Things"] = {} libpr.set_maplength(val) def command_worldactive(worldactive_string): """Toggle world_db["WORLD_ACTIVE"] if possible. An active world can always be set inactive. An inactive world can only be set active with a "wait" ThingAction, and a player Thing (of ID 0), and a map. On activation, rebuild all Things' FOVs, and the player's map memory. Also call log_help(). """ val = integer_test(worldactive_string, 0, 1) if None != val: if 0 != world_db["WORLD_ACTIVE"]: if 0 == val: set_world_inactive() else: print("World already active.") elif 0 == world_db["WORLD_ACTIVE"]: wait_exists = False for ThingAction in world_db["ThingActions"]: if "wait" == world_db["ThingActions"][ThingAction]["TA_NAME"]: wait_exists = True break player_exists = False for Thing in world_db["Things"]: if 0 == Thing: player_exists = True break if wait_exists and player_exists and world_db["MAP"]: for id in world_db["Things"]: if world_db["Things"][id]["T_LIFEPOINTS"]: build_fov_map(world_db["Things"][id]) if 0 == id: update_map_memory(world_db["Things"][id], False) if not world_db["Things"][0]["T_LIFEPOINTS"]: empty_fovmap = bytearray(b" " * world_db["MAP_LENGTH"] ** 2) world_db["Things"][0]["fovmap"] = empty_fovmap world_db["WORLD_ACTIVE"] = 1 log_help() else: print("Ignoring: Not all conditions for world activation met.") def test_for_id_maker(object, category): """Return decorator testing for object having "id" attribute.""" def decorator(f): def helper(*args): if hasattr(object, "id"): f(*args) else: print("Ignoring: No " + category + " defined to manipulate yet.") return helper return decorator def command_tid(id_string): """Set ID of Thing to manipulate. ID unused? Create new one. Default new Thing's type to the first available ThingType, others: zero. """ id = id_setter(id_string, "Things", command_tid) if None != id: if world_db["ThingTypes"] == {}: print("Ignoring: No ThingType to settle new Thing in.") return type = list(world_db["ThingTypes"].keys())[0] world_db["Things"][id] = new_Thing(type) test_Thing_id = test_for_id_maker(command_tid, "Thing") @test_Thing_id def command_tcommand(str_int): """Set T_COMMAND of selected Thing.""" val = integer_test(str_int, 0) if None != val: if 0 == val or val in world_db["ThingActions"]: world_db["Things"][command_tid.id]["T_COMMAND"] = val else: print("Ignoring: ThingAction ID belongs to no known ThingAction.") @test_Thing_id def command_ttype(str_int): """Set T_TYPE of selected Thing.""" val = integer_test(str_int, 0) if None != val: if val in world_db["ThingTypes"]: world_db["Things"][command_tid.id]["T_TYPE"] = val else: print("Ignoring: ThingType ID belongs to no known ThingType.") @test_Thing_id def command_tcarries(str_int): """Append int(str_int) to T_CARRIES of selected Thing. The ID int(str_int) must not be of the selected Thing, and must belong to a Thing with unset "carried" flag. Its "carried" flag will be set on owning. """ val = integer_test(str_int, 0) if None != val: if val == command_tid.id: print("Ignoring: Thing cannot carry itself.") elif val in world_db["Things"] \ and not world_db["Things"][val]["carried"]: world_db["Things"][command_tid.id]["T_CARRIES"].append(val) world_db["Things"][val]["carried"] = True else: print("Ignoring: Thing not available for carrying.") # Note that the whole carrying structure is different from the C version: # Carried-ness is marked by a "carried" flag, not by Things containing # Things internally. @test_Thing_id def command_tmemthing(str_t, str_y, str_x): """Add (int(str_t), int(str_y), int(str_x)) to selected Thing's T_MEMTHING. The type must fit to an existing ThingType, and the position into the map. """ type = integer_test(str_t, 0) posy = integer_test(str_y, 0, 255) posx = integer_test(str_x, 0, 255) if None != type and None != posy and None != posx: if type not in world_db["ThingTypes"] \ or posy >= world_db["MAP_LENGTH"] or posx >= world_db["MAP_LENGTH"]: print("Ignoring: Illegal value for thing type or position.") else: memthing = (type, posy, posx) world_db["Things"][command_tid.id]["T_MEMTHING"].append(memthing) def setter_map(maptype): """Set (world or Thing's) map of maptype's int(str_int)-th line to mapline. If no map of maptype exists yet, initialize it with ' ' bytes first. """ def valid_map_line(str_int, mapline): val = integer_test(str_int, 0, 255) if None != val: if val >= world_db["MAP_LENGTH"]: print("Illegal value for map line number.") elif len(mapline) != world_db["MAP_LENGTH"]: print("Map line length is unequal map width.") else: return val return None def nonThingMap_helper(str_int, mapline): val = valid_map_line(str_int, mapline) if None != val: length = world_db["MAP_LENGTH"] if not world_db["MAP"]: map = bytearray(b' ' * (length ** 2)) else: map = world_db["MAP"] map[val * length:(val * length) + length] = mapline.encode() if not world_db["MAP"]: world_db["MAP"] = map @test_Thing_id def ThingMap_helper(str_int, mapline): val = valid_map_line(str_int, mapline) if None != val: length = world_db["MAP_LENGTH"] if not world_db["Things"][command_tid.id][maptype]: map = bytearray(b' ' * (length ** 2)) else: map = world_db["Things"][command_tid.id][maptype] map[val * length:(val * length) + length] = mapline.encode() if not world_db["Things"][command_tid.id][maptype]: world_db["Things"][command_tid.id][maptype] = map return nonThingMap_helper if maptype == "MAP" else ThingMap_helper def setter_tpos(axis): """Generate setter for T_POSX or T_POSY of selected Thing. If world is active, rebuilds animate things' fovmap, player's memory map. """ @test_Thing_id def helper(str_int): val = integer_test(str_int, 0, 255) if None != val: if val < world_db["MAP_LENGTH"]: world_db["Things"][command_tid.id]["T_POS" + axis] = val if world_db["WORLD_ACTIVE"] \ and world_db["Things"][command_tid.id]["T_LIFEPOINTS"]: build_fov_map(world_db["Things"][command_tid.id]) if 0 == command_tid.id: update_map_memory(world_db["Things"][command_tid.id]) else: print("Ignoring: Position is outside of map.") return helper def command_ttid(id_string): """Set ID of ThingType to manipulate. ID unused? Create new one. Default new ThingType's TT_SYMBOL to "?", TT_CORPSE_ID to self, TT_TOOL to "", others: 0. """ id = id_setter(id_string, "ThingTypes", command_ttid) if None != id: world_db["ThingTypes"][id] = { "TT_NAME": "(none)", "TT_TOOLPOWER": 0, "TT_LIFEPOINTS": 0, "TT_PROLIFERATE": 0, "TT_START_NUMBER": 0, "TT_SYMBOL": "?", "TT_CORPSE_ID": id, "TT_TOOL": "" } test_ThingType_id = test_for_id_maker(command_ttid, "ThingType") @test_ThingType_id def command_ttname(name): """Set TT_NAME of selected ThingType.""" world_db["ThingTypes"][command_ttid.id]["TT_NAME"] = name @test_ThingType_id def command_tttool(name): """Set TT_TOOL of selected ThingType.""" world_db["ThingTypes"][command_ttid.id]["TT_TOOL"] = name @test_ThingType_id def command_ttsymbol(char): """Set TT_SYMBOL of selected ThingType. """ if 1 == len(char): world_db["ThingTypes"][command_ttid.id]["TT_SYMBOL"] = char else: print("Ignoring: Argument must be single character.") @test_ThingType_id def command_ttcorpseid(str_int): """Set TT_CORPSE_ID of selected ThingType.""" val = integer_test(str_int, 0) if None != val: if val in world_db["ThingTypes"]: world_db["ThingTypes"][command_ttid.id]["TT_CORPSE_ID"] = val else: print("Ignoring: Corpse ID belongs to no known ThignType.") def command_taid(id_string): """Set ID of ThingAction to manipulate. ID unused? Create new one. Default new ThingAction's TA_EFFORT to 1, its TA_NAME to "wait". """ id = id_setter(id_string, "ThingActions", command_taid, True) if None != id: world_db["ThingActions"][id] = { "TA_EFFORT": 1, "TA_NAME": "wait" } test_ThingAction_id = test_for_id_maker(command_taid, "ThingAction") @test_ThingAction_id def command_taname(name): """Set TA_NAME of selected ThingAction. The name must match a valid thing action function. If after the name setting no ThingAction with name "wait" remains, call set_world_inactive(). """ if name == "wait" or name == "move" or name == "use" or name == "drop" \ or name == "pick_up": world_db["ThingActions"][command_taid.id]["TA_NAME"] = name if 1 == world_db["WORLD_ACTIVE"]: wait_defined = False for id in world_db["ThingActions"]: if "wait" == world_db["ThingActions"][id]["TA_NAME"]: wait_defined = True break if not wait_defined: set_world_inactive() else: print("Ignoring: Invalid action name.") # In contrast to the original,naming won't map a function to a ThingAction. def command_ai(): """Call ai() on player Thing, then turn_over().""" ai(world_db["Things"][0]) turn_over() """Commands database. Map command start tokens to ([0]) number of expected command arguments, ([1]) the command's meta-ness (i.e. is it to be written to the record file, is it to be ignored in replay mode if read from server input file), and ([2]) a function to be called on it. """ commands_db = { "PLUGIN": (1, True, command_plugin), "QUIT": (0, True, command_quit), "PING": (0, True, command_ping), "THINGS_HERE": (2, True, command_thingshere), "MAKE_WORLD": (1, False, command_makeworld), "SEED_RANDOMNESS": (1, False, command_seedrandomness), "TURN": (1, False, setter(None, "TURN", 0, 65535)), "PLAYER_TYPE": (1, False, setter(None, "PLAYER_TYPE", 0)), "MAP_LENGTH": (1, False, command_maplength), "WORLD_ACTIVE": (1, False, command_worldactive), "MAP": (2, False, setter_map("MAP")), "TA_ID": (1, False, command_taid), "TA_EFFORT": (1, False, setter("ThingAction", "TA_EFFORT", 0, 255)), "TA_NAME": (1, False, command_taname), "TT_ID": (1, False, command_ttid), "TT_NAME": (1, False, command_ttname), "TT_TOOL": (1, False, command_tttool), "TT_SYMBOL": (1, False, command_ttsymbol), "TT_CORPSE_ID": (1, False, command_ttcorpseid), "TT_TOOLPOWER": (1, False, setter("ThingType", "TT_TOOLPOWER", 0, 65535)), "TT_START_NUMBER": (1, False, setter("ThingType", "TT_START_NUMBER", 0, 255)), "TT_PROLIFERATE": (1, False, setter("ThingType", "TT_PROLIFERATE", 0, 65535)), "TT_LIFEPOINTS": (1, False, setter("ThingType", "TT_LIFEPOINTS", 0, 255)), "T_ID": (1, False, command_tid), "T_ARGUMENT": (1, False, setter("Thing", "T_ARGUMENT", 0, 255)), "T_PROGRESS": (1, False, setter("Thing", "T_PROGRESS", 0, 255)), "T_LIFEPOINTS": (1, False, setter("Thing", "T_LIFEPOINTS", 0, 255)), "T_SATIATION": (1, False, setter("Thing", "T_SATIATION", -32768, 32767)), "T_COMMAND": (1, False, command_tcommand), "T_TYPE": (1, False, command_ttype), "T_CARRIES": (1, False, command_tcarries), "T_MEMMAP": (2, False, setter_map("T_MEMMAP")), "T_MEMDEPTHMAP": (2, False, setter_map("T_MEMDEPTHMAP")), "T_MEMTHING": (3, False, command_tmemthing), "T_POSY": (1, False, setter_tpos("Y")), "T_POSX": (1, False, setter_tpos("X")), "wait": (0, False, play_wait), "move": (1, False, play_move), "pick_up": (0, False, play_pickup), "drop": (1, False, play_drop), "use": (1, False, play_use), "ai": (0, False, command_ai) } # TODO: Unhandled cases: (Un-)killing animates (esp. player!) with T_LIFEPOINTS. """World state database. With sane default values. (Randomness is in rand.)""" world_db = { "TURN": 0, "MAP_LENGTH": 64, "PLAYER_TYPE": 0, "WORLD_ACTIVE": 0, "MAP": False, "ThingActions": {}, "ThingTypes": {}, "Things": {} } """Mapping of direction names to internal direction chars.""" directions_db = {"east": "d", "south-east": "c", "south-west": "x", "west": "s", "north-west": "w", "north-east": "e"} """File IO database.""" io_db = { "path_save": "save", "path_record": "record_save", "path_worldconf": "confserver/world", "path_server": "server/", "path_in": "server/in", "path_out": "server/out", "path_worldstate": "server/worldstate", "tmp_suffix": "_tmp", "kicked_by_rival": False, "worldstate_updateable": False } try: libpr = prep_library() rand = RandomnessIO() opts = parse_command_line_arguments() if opts.savefile: io_db["path_save"] = opts.savefile io_db["path_record"] = "record_" + opts.savefile setup_server_io() if opts.verbose: io_db["verbose"] = True if None != opts.replay: replay_game() else: play_game() except SystemExit as exit: print("ABORTING: " + exit.args[0]) except: print("SOMETHING WENT WRONG IN UNEXPECTED WAYS") raise finally: cleanup_server_io()