# see the file NOTICE in the root directory of the PlomRogue source package.
-import argparse
-import errno
-import os
-import shlex
-import shutil
-import time
-import ctypes
-import math
-
-
-from server.config.world_data import world_db, directions_db
-from server.config.io import io_db
-
-
-class RandomnessIO:
- """"Interface to libplomrogue's pseudo-randomness generator."""
-
- def set_seed(self, seed):
- libpr.seed_rrand(1, seed)
-
- def get_seed(self):
- return libpr.seed_rrand(0, 0)
-
- def next(self):
- return libpr.rrand()
-
- seed = property(get_seed, set_seed)
-
-
-def prep_library():
- """Prepare ctypes library at ./libplomrogue.so"""
- libpath = ("./libplomrogue.so")
- if not os.access(libpath, os.F_OK):
- raise SystemExit("No library " + libpath + ", run ./build.sh first?")
- libpr = ctypes.cdll.LoadLibrary(libpath)
- libpr.seed_rrand.restype = ctypes.c_uint32
- return libpr
-
-
-def c_pointer_to_bytearray(ba):
- """Return C char * pointer to ba."""
- type = ctypes.c_char * len(ba)
- return type.from_buffer(ba)
-
-
-def strong_write(file, string):
- """Apply write(string), then flush()."""
- file.write(string)
- file.flush()
-
-
-def setup_server_io():
- """Fill IO files DB with proper file( path)s. Write process IO test string.
-
- Ensure IO files directory at server/. Remove any old input file if found.
- Set up new input file for reading, and new output file for appending. Start
- output file with process hash line of format PID + " " + floated UNIX time
- (io_db["teststring"]). Raise SystemExit if file is found at path of either
- record or save file plus io_db["tmp_suffix"].
- """
- def detect_atomic_leftover(path, tmp_suffix):
- path_tmp = path + tmp_suffix
- msg = "Found file '" + path_tmp + "' that may be a leftover from an " \
- "aborted previous attempt to write '" + path + "'. Aborting " \
- "until matter is resolved by removing it from its current path."
- if os.access(path_tmp, os.F_OK):
- raise SystemExit(msg)
- io_db["teststring"] = str(os.getpid()) + " " + str(time.time())
- io_db["save_wait"] = 0
- io_db["verbose"] = False
- io_db["record_chunk"] = ""
- os.makedirs(io_db["path_server"], exist_ok=True)
- io_db["file_out"] = open(io_db["path_out"], "a")
- strong_write(io_db["file_out"], io_db["teststring"] + "\n")
- if os.access(io_db["path_in"], os.F_OK):
- os.remove(io_db["path_in"])
- io_db["file_in"] = open(io_db["path_in"], "w")
- io_db["file_in"].close()
- io_db["file_in"] = open(io_db["path_in"], "r")
- detect_atomic_leftover(io_db["path_save"], io_db["tmp_suffix"])
- detect_atomic_leftover(io_db["path_record"], io_db["tmp_suffix"])
-
-
-def cleanup_server_io():
- """Close and (if io_db["kicked_by_rival"] false) remove files in io_db."""
- def helper(file_key, path_key):
- if file_key in io_db:
- io_db[file_key].close()
- if not io_db["kicked_by_rival"] \
- and os.access(io_db[path_key], os.F_OK):
- os.remove(io_db[path_key])
- helper("file_in", "path_in")
- helper("file_out", "path_out")
- helper("file_worldstate", "path_worldstate")
- if "file_record" in io_db:
- io_db["file_record"].close()
-
-
-def log(msg):
- """Send "msg" to log."""
- strong_write(io_db["file_out"], "LOG " + msg + "\n")
-
-
-def obey(command, prefix, replay=False, do_record=False):
- """Call function from commands_db mapped to command's first token.
-
- Tokenize command string with shlex.split(comments=True). If replay is set,
- a non-meta command from the commands_db merely triggers obey() on the next
- command from the records file. If not, non-meta commands set
- io_db["worldstate_updateable"] to world_db["WORLD_ACTIVE"], and, if
- do_record is set, are recorded to io_db["record_chunk"], and save_world()
- is called (and io_db["record_chunk"] written) if 15 seconds have passed
- since the last time it was called. The prefix string is inserted into the
- server's input message between its beginning 'input ' and ':'. All activity
- is preceded by a server_test() call. Commands that start with a lowercase
- letter are ignored when world_db["WORLD_ACTIVE"] is False/0.
- """
- server_test()
- if io_db["verbose"]:
- print("input " + prefix + ": " + command)
- try:
- tokens = shlex.split(command, comments=True)
- except ValueError as err:
- print("Can't tokenize command string: " + str(err) + ".")
- return
- if len(tokens) > 0 and tokens[0] in commands_db \
- and len(tokens) == commands_db[tokens[0]][0] + 1:
- if commands_db[tokens[0]][1]:
- commands_db[tokens[0]][2](*tokens[1:])
- elif tokens[0][0].islower() and not world_db["WORLD_ACTIVE"]:
- print("Ignoring lowercase-starting commands when world inactive.")
- elif replay:
- print("Due to replay mode, reading command as 'go on in record'.")
- line = io_db["file_record"].readline()
- if len(line) > 0:
- obey(line.rstrip(), io_db["file_record"].prefix
- + str(io_db["file_record"].line_n))
- io_db["file_record"].line_n = io_db["file_record"].line_n + 1
- else:
- print("Reached end of record file.")
- else:
- commands_db[tokens[0]][2](*tokens[1:])
- if do_record:
- io_db["record_chunk"] += command + "\n"
- if time.time() > io_db["save_wait"] + 15:
- atomic_write(io_db["path_record"], io_db["record_chunk"],
- do_append=True)
- if world_db["WORLD_ACTIVE"]:
- save_world()
- io_db["record_chunk"] = ""
- io_db["save_wait"] = time.time()
- io_db["worldstate_updateable"] = world_db["WORLD_ACTIVE"]
- elif 0 != len(tokens):
- print("Invalid command/argument, or bad number of tokens.")
-
-
-def atomic_write(path, text, do_append=False, delete=True):
- """Atomic write of text to file at path, appended if do_append is set."""
- path_tmp = path + io_db["tmp_suffix"]
- mode = "w"
- if do_append:
- mode = "a"
- if os.access(path, os.F_OK):
- shutil.copyfile(path, path_tmp)
- file = open(path_tmp, mode)
- strong_write(file, text)
- file.close()
- if delete and os.access(path, os.F_OK):
- os.remove(path)
- os.rename(path_tmp, path)
-
-
-def save_world():
- """Save all commands needed to reconstruct current world state."""
-
- def quote(string):
- string = string.replace("\u005C", '\u005C\u005C')
- return '"' + string.replace('"', '\u005C"') + '"'
-
- def mapsetter(key):
- def helper(id=None):
- string = ""
- if key == "MAP" or world_db["Things"][id][key]:
- map = world_db["MAP"] if key == "MAP" \
- else world_db["Things"][id][key]
- length = world_db["MAP_LENGTH"]
- for i in range(length):
- line = map[i * length:(i * length) + length].decode()
- string = string + key + " " + str(i) + " " + quote(line) \
- + "\n"
- return string
- return helper
-
- def memthing(id):
- string = ""
- for memthing in world_db["Things"][id]["T_MEMTHING"]:
- string = string + "T_MEMTHING " + str(memthing[0]) + " " + \
- str(memthing[1]) + " " + str(memthing[2]) + "\n"
- return string
-
- def helper(category, id_string, special_keys={}):
- string = ""
- for id in sorted(world_db[category].keys()):
- string = string + id_string + " " + str(id) + "\n"
- for key in sorted(world_db[category][id].keys()):
- if not key in special_keys:
- x = world_db[category][id][key]
- argument = quote(x) if str == type(x) else str(x)
- string = string + key + " " + argument + "\n"
- elif special_keys[key]:
- string = string + special_keys[key](id)
- return string
-
- string = ""
- for key in sorted(world_db.keys()):
- if (not isinstance(world_db[key], dict)) and key != "MAP" and \
- key != "WORLD_ACTIVE":
- string = string + key + " " + str(world_db[key]) + "\n"
- string = string + mapsetter("MAP")()
- string = string + helper("ThingActions", "TA_ID")
- string = string + helper("ThingTypes", "TT_ID", {"TT_CORPSE_ID": False})
- for id in sorted(world_db["ThingTypes"].keys()):
- string = string + "TT_ID " + str(id) + "\n" + "TT_CORPSE_ID " + \
- str(world_db["ThingTypes"][id]["TT_CORPSE_ID"]) + "\n"
- string = string + helper("Things", "T_ID",
- {"T_CARRIES": False, "carried": False,
- "T_MEMMAP": mapsetter("T_MEMMAP"),
- "T_MEMTHING": memthing, "fovmap": False,
- "T_MEMDEPTHMAP": mapsetter("T_MEMDEPTHMAP")})
- for id in sorted(world_db["Things"].keys()):
- if [] != world_db["Things"][id]["T_CARRIES"]:
- string = string + "T_ID " + str(id) + "\n"
- for carried in sorted(world_db["Things"][id]["T_CARRIES"]):
- string = string + "T_CARRIES " + str(carried) + "\n"
- string = string + "SEED_RANDOMNESS " + str(rand.seed) + "\n" + \
- "WORLD_ACTIVE " + str(world_db["WORLD_ACTIVE"])
- atomic_write(io_db["path_save"], string)
-
-
-def obey_lines_in_file(path, name, do_record=False):
- """Call obey() on each line of path's file, use name in input prefix."""
- file = open(path, "r")
- line_n = 1
- for line in file.readlines():
- obey(line.rstrip(), name + "file line " + str(line_n),
- do_record=do_record)
- line_n = line_n + 1
- file.close()
-
-
-def parse_command_line_arguments():
- """Return settings values read from command line arguments."""
- parser = argparse.ArgumentParser()
- parser.add_argument('-s', nargs='?', type=int, dest='replay', const=1,
- action='store')
- parser.add_argument('-l', nargs="?", const="save", dest='savefile',
- action="store")
- parser.add_argument('-v', dest='verbose', action='store_true')
- opts, unknown = parser.parse_known_args()
- return opts
-
-
-def server_test():
- """Ensure valid server out file belonging to current process.
-
- This is done by comparing io_db["teststring"] to what's found at the start
- of the current file at io_db["path_out"]. On failure, set
- io_db["kicked_by_rival"] and raise SystemExit.
- """
- if not os.access(io_db["path_out"], os.F_OK):
- raise SystemExit("Server output file has disappeared.")
- file = open(io_db["path_out"], "r")
- test = file.readline().rstrip("\n")
- file.close()
- if test != io_db["teststring"]:
- io_db["kicked_by_rival"] = True
- msg = "Server test string in server output file does not match. This" \
- " indicates that the current server process has been " \
- "superseded by another one."
- raise SystemExit(msg)
-
-
-def read_command():
- """Return next newline-delimited command from server in file.
-
- Keep building return string until a newline is encountered. Pause between
- unsuccessful reads, and after too much waiting, run server_test().
- """
- wait_on_fail = 0.03333
- max_wait = 5
- now = time.time()
- command = ""
- while True:
- add = io_db["file_in"].readline()
- if len(add) > 0:
- command = command + add
- if len(command) > 0 and "\n" == command[-1]:
- command = command[:-1]
- break
- else:
- time.sleep(wait_on_fail)
- if now + max_wait < time.time():
- server_test()
- now = time.time()
- return command
-
-
-def try_worldstate_update():
- """Write worldstate file if io_db["worldstate_updateable"] is set."""
- if io_db["worldstate_updateable"]:
-
- def write_map(string, map):
- for i in range(length):
- line = map[i * length:(i * length) + length].decode()
- string = string + line + "\n"
- return string
-
- inventory = ""
- if [] == world_db["Things"][0]["T_CARRIES"]:
- inventory = "(none)\n"
- else:
- for id in world_db["Things"][0]["T_CARRIES"]:
- type_id = world_db["Things"][id]["T_TYPE"]
- name = world_db["ThingTypes"][type_id]["TT_NAME"]
- inventory = inventory + name + "\n"
- string = str(world_db["TURN"]) + "\n" + \
- str(world_db["Things"][0]["T_LIFEPOINTS"]) + "\n" + \
- str(world_db["Things"][0]["T_SATIATION"]) + "\n" + \
- inventory + "%\n" + \
- str(world_db["Things"][0]["T_POSY"]) + "\n" + \
- str(world_db["Things"][0]["T_POSX"]) + "\n" + \
- str(world_db["MAP_LENGTH"]) + "\n"
- length = world_db["MAP_LENGTH"]
- fov = bytearray(b' ' * (length ** 2))
- ord_v = ord("v")
- for pos in [pos for pos in range(length ** 2)
- if ord_v == world_db["Things"][0]["fovmap"][pos]]:
- fov[pos] = world_db["MAP"][pos]
- length = world_db["MAP_LENGTH"]
- for id in [id for tid in reversed(sorted(list(world_db["ThingTypes"])))
- for id in world_db["Things"]
- if not world_db["Things"][id]["carried"]
- if world_db["Things"][id]["T_TYPE"] == tid
- if world_db["Things"][0]["fovmap"][
- world_db["Things"][id]["T_POSY"] * length
- + world_db["Things"][id]["T_POSX"]] == ord_v]:
- type = world_db["Things"][id]["T_TYPE"]
- c = ord(world_db["ThingTypes"][type]["TT_SYMBOL"])
- fov[world_db["Things"][id]["T_POSY"] * length
- + world_db["Things"][id]["T_POSX"]] = c
- string = write_map(string, fov)
- mem = world_db["Things"][0]["T_MEMMAP"][:]
- for mt in [mt for tid in reversed(sorted(list(world_db["ThingTypes"])))
- for mt in world_db["Things"][0]["T_MEMTHING"]
- if mt[0] == tid]:
- c = world_db["ThingTypes"][mt[0]]["TT_SYMBOL"]
- mem[(mt[1] * length) + mt[2]] = ord(c)
- string = write_map(string, mem)
- atomic_write(io_db["path_worldstate"], string, delete=False)
- strong_write(io_db["file_out"], "WORLD_UPDATED\n")
- io_db["worldstate_updateable"] = False
-
-
def replay_game():
"""Replay game from record file.
command and all that follow via the server input file. Run
try_worldstate_update() before each interactive obey()/read_command().
"""
+ import time
+ from server.io import obey_lines_in_file
if os.access(io_db["path_save"], os.F_OK):
obey_lines_in_file(io_db["path_save"], "save")
else:
obey(read_command(), "in file", do_record=True)
-def make_map():
- """(Re-)make island map.
-
- Let "~" represent water, "." land, "X" trees: Build island shape randomly,
- start with one land cell in the middle, then go into cycle of repeatedly
- selecting a random sea cell and transforming it into land if it is neighbor
- to land. The cycle ends when a land cell is due to be created at the map's
- border. Then put some trees on the map (TODO: more precise algorithm desc).
- """
-
- def is_neighbor(coordinates, type):
- y = coordinates[0]
- x = coordinates[1]
- length = world_db["MAP_LENGTH"]
- ind = y % 2
- diag_west = x + (ind > 0)
- diag_east = x + (ind < (length - 1))
- pos = (y * length) + x
- if (y > 0 and diag_east
- and type == chr(world_db["MAP"][pos - length + ind])) \
- or (x < (length - 1)
- and type == chr(world_db["MAP"][pos + 1])) \
- or (y < (length - 1) and diag_east
- and type == chr(world_db["MAP"][pos + length + ind])) \
- or (y > 0 and diag_west
- and type == chr(world_db["MAP"][pos - length - (not ind)])) \
- or (x > 0
- and type == chr(world_db["MAP"][pos - 1])) \
- or (y < (length - 1) and diag_west
- and type == chr(world_db["MAP"][pos + length - (not ind)])):
- return True
- return False
-
- world_db["MAP"] = bytearray(b'~' * (world_db["MAP_LENGTH"] ** 2))
- length = world_db["MAP_LENGTH"]
- add_half_width = (not (length % 2)) * int(length / 2)
- world_db["MAP"][int((length ** 2) / 2) + add_half_width] = ord(".")
- while (1):
- y = rand.next() % length
- x = rand.next() % length
- pos = (y * length) + x
- if "~" == chr(world_db["MAP"][pos]) and is_neighbor((y, x), "."):
- if y == 0 or y == (length - 1) or x == 0 or x == (length - 1):
- break
- world_db["MAP"][pos] = ord(".")
- n_trees = int((length ** 2) / 16)
- i_trees = 0
- while (i_trees <= n_trees):
- single_allowed = rand.next() % 32
- y = rand.next() % length
- x = rand.next() % length
- pos = (y * length) + x
- if "." == chr(world_db["MAP"][pos]) \
- and ((not single_allowed) or is_neighbor((y, x), "X")):
- world_db["MAP"][pos] = ord("X")
- i_trees += 1
- # This all-too-precise replica of the original C code misses iter_limit().
-
-
-def eat_vs_hunger_threshold(thingtype):
- """Return satiation cost of eating for type. Good food for it must be >."""
- hunger_unit = hunger_per_turn(thingtype)
- actiontype = [id for id in world_db["ThingActions"]
- if world_db["ThingActions"][id]["TA_NAME"] == "use"][0]
- return world_db["ThingActions"][actiontype]["TA_EFFORT"] * hunger_unit
-
-
-def update_map_memory(t, age_map=True):
- """Update t's T_MEMMAP with what's in its FOV now,age its T_MEMMEPTHMAP."""
-
- def age_some_memdepthmap_on_nonfov_cells():
- # OUTSOURCED FOR PERFORMANCE REASONS TO libplomrogue.so:
- # ord_v = ord("v")
- # ord_0 = ord("0")
- # ord_9 = ord("9")
- # for pos in [pos for pos in range(world_db["MAP_LENGTH"] ** 2)
- # if not ord_v == t["fovmap"][pos]
- # if ord_0 <= t["T_MEMDEPTHMAP"][pos]
- # if ord_9 > t["T_MEMDEPTHMAP"][pos]
- # if not rand.next() % (2 **
- # (t["T_MEMDEPTHMAP"][pos] - 48))]:
- # t["T_MEMDEPTHMAP"][pos] += 1
- memdepthmap = c_pointer_to_bytearray(t["T_MEMDEPTHMAP"])
- fovmap = c_pointer_to_bytearray(t["fovmap"])
- libpr.age_some_memdepthmap_on_nonfov_cells(memdepthmap, fovmap)
-
- if not t["T_MEMMAP"]:
- t["T_MEMMAP"] = bytearray(b' ' * (world_db["MAP_LENGTH"] ** 2))
- if not t["T_MEMDEPTHMAP"]:
- t["T_MEMDEPTHMAP"] = bytearray(b' ' * (world_db["MAP_LENGTH"] ** 2))
- ord_v = ord("v")
- ord_0 = ord("0")
- for pos in [pos for pos in range(world_db["MAP_LENGTH"] ** 2)
- if ord_v == t["fovmap"][pos]]:
- t["T_MEMDEPTHMAP"][pos] = ord_0
- t["T_MEMMAP"][pos] = world_db["MAP"][pos]
- if age_map:
- age_some_memdepthmap_on_nonfov_cells()
- t["T_MEMTHING"] = [mt for mt in t["T_MEMTHING"]
- if ord_v != t["fovmap"][(mt[1] * world_db["MAP_LENGTH"])
- + mt[2]]]
- for id in [id for id in world_db["Things"]
- if not world_db["Things"][id]["carried"]]:
- type = world_db["Things"][id]["T_TYPE"]
- if not world_db["ThingTypes"][type]["TT_LIFEPOINTS"]:
- y = world_db["Things"][id]["T_POSY"]
- x = world_db["Things"][id]["T_POSX"]
- if ord_v == t["fovmap"][(y * world_db["MAP_LENGTH"]) + x]:
- t["T_MEMTHING"].append((type, y, x))
-
-
-def set_world_inactive():
- """Set world_db["WORLD_ACTIVE"] to 0 and remove worldstate file."""
- server_test()
- if os.access(io_db["path_worldstate"], os.F_OK):
- os.remove(io_db["path_worldstate"])
- world_db["WORLD_ACTIVE"] = 0
-
-
-def integer_test(val_string, min, max=None):
- """Return val_string if integer >= min & (if max set) <= max, else None."""
- try:
- val = int(val_string)
- if val < min or (max is not None and val > max):
- raise ValueError
- return val
- except ValueError:
- msg = "Ignoring: Please use integer >= " + str(min)
- if max is not None:
- msg += " and <= " + str(max)
- msg += "."
- print(msg)
- return None
-
-
-def setter(category, key, min, max=None):
- """Build setter for world_db([category + "s"][id])[key] to >=min/<=max."""
- if category is None:
- def f(val_string):
- val = integer_test(val_string, min, max)
- if None != val:
- world_db[key] = val
- else:
- if category == "Thing":
- id_store = command_tid
- decorator = test_Thing_id
- elif category == "ThingType":
- id_store = command_ttid
- decorator = test_ThingType_id
- elif category == "ThingAction":
- id_store = command_taid
- decorator = test_ThingAction_id
-
- @decorator
- def f(val_string):
- val = integer_test(val_string, min, max)
- if None != val:
- world_db[category + "s"][id_store.id][key] = val
- return f
-
-
-def build_fov_map(t):
- """Build Thing's FOV map."""
- t["fovmap"] = bytearray(b'v' * (world_db["MAP_LENGTH"] ** 2))
- fovmap = c_pointer_to_bytearray(t["fovmap"])
- map = c_pointer_to_bytearray(world_db["MAP"])
- if libpr.build_fov_map(t["T_POSY"], t["T_POSX"], fovmap, map):
- raise RuntimeError("Malloc error in build_fov_Map().")
-
-
-def decrement_lifepoints(t):
- """Decrement t's lifepoints by 1, and if to zero, corpse it.
-
- If t is the player avatar, only blank its fovmap, so that the client may
- still display memory data. On non-player things, erase fovmap and memory.
- Dying actors drop all their things.
- """
- t["T_LIFEPOINTS"] -= 1
- if 0 == t["T_LIFEPOINTS"]:
- for id in t["T_CARRIES"]:
- t["T_CARRIES"].remove(id)
- world_db["Things"][id]["T_POSY"] = t["T_POSY"]
- world_db["Things"][id]["T_POSX"] = t["T_POSX"]
- world_db["Things"][id]["carried"] = False
- t["T_TYPE"] = world_db["ThingTypes"][t["T_TYPE"]]["TT_CORPSE_ID"]
- if world_db["Things"][0] == t:
- t["fovmap"] = bytearray(b' ' * (world_db["MAP_LENGTH"] ** 2))
- log("You die.")
- log("See README on how to start over.")
- else:
- t["fovmap"] = False
- t["T_MEMMAP"] = False
- t["T_MEMDEPTHMAP"] = False
- t["T_MEMTHING"] = []
-
-
-def mv_yx_in_dir_legal(dir, y, x):
- """Wrapper around libpr.mv_yx_in_dir_legal to simplify its use."""
- dir_c = dir.encode("ascii")[0]
- test = libpr.mv_yx_in_dir_legal_wrap(dir_c, y, x)
- if -1 == test:
- raise RuntimeError("Too much wrapping in mv_yx_in_dir_legal_wrap()!")
- return (test, libpr.result_y(), libpr.result_x())
-
-
-def actor_wait(t):
- """Make t do nothing (but loudly, if player avatar)."""
- if t == world_db["Things"][0]:
- log("You wait")
-
-
-def actor_move(t):
- """If passable, move/collide(=attack) thing into T_ARGUMENT's direction."""
- passable = False
- move_result = mv_yx_in_dir_legal(chr(t["T_ARGUMENT"]),
- t["T_POSY"], t["T_POSX"])
- if 1 == move_result[0]:
- pos = (move_result[1] * world_db["MAP_LENGTH"]) + move_result[2]
- hitted = [id for id in world_db["Things"]
- if world_db["Things"][id] != t
- if world_db["Things"][id]["T_LIFEPOINTS"]
- if world_db["Things"][id]["T_POSY"] == move_result[1]
- if world_db["Things"][id]["T_POSX"] == move_result[2]]
- if len(hitted):
- hit_id = hitted[0]
- if t == world_db["Things"][0]:
- hitted_type = world_db["Things"][hit_id]["T_TYPE"]
- hitted_name = world_db["ThingTypes"][hitted_type]["TT_NAME"]
- log("You WOUND" + hitted_name + ".")
- elif 0 == hit_id:
- hitter_name = world_db["ThingTypes"][t["T_TYPE"]]["TT_NAME"]
- log(hitter_name +" WOUNDS you.")
- decrement_lifepoints(world_db["Things"][hit_id])
- return
- passable = "." == chr(world_db["MAP"][pos])
- dir = [dir for dir in directions_db
- if directions_db[dir] == chr(t["T_ARGUMENT"])][0]
- if passable:
- t["T_POSY"] = move_result[1]
- t["T_POSX"] = move_result[2]
- for id in t["T_CARRIES"]:
- world_db["Things"][id]["T_POSY"] = move_result[1]
- world_db["Things"][id]["T_POSX"] = move_result[2]
- build_fov_map(t)
- if t == world_db["Things"][0]:
- log("You MOVE " + dir + ".")
-
-
-def actor_pick_up(t):
- """Make t pick up (topmost?) Thing from ground into inventory.
-
- Define topmostness by how low the thing's type ID is.
- """
- ids = [id for id in world_db["Things"] if world_db["Things"][id] != t
- if not world_db["Things"][id]["carried"]
- if world_db["Things"][id]["T_POSY"] == t["T_POSY"]
- if world_db["Things"][id]["T_POSX"] == t["T_POSX"]]
- if len(ids):
- lowest_tid = -1
- for iid in ids:
- tid = world_db["Things"][iid]["T_TYPE"]
- if lowest_tid == -1 or tid < lowest_tid:
- id = iid
- lowest_tid = tid
- world_db["Things"][id]["carried"] = True
- t["T_CARRIES"].append(id)
- if t == world_db["Things"][0]:
- log("You PICK UP an object.")
-
-
-def actor_drop(t):
- """Make t rop Thing from inventory to ground indexed by T_ARGUMENT."""
- # TODO: Handle case where T_ARGUMENT matches nothing.
- if len(t["T_CARRIES"]):
- id = t["T_CARRIES"][t["T_ARGUMENT"]]
- t["T_CARRIES"].remove(id)
- world_db["Things"][id]["carried"] = False
- if t == world_db["Things"][0]:
- log("You DROP an object.")
-
-
-def actor_use(t):
- """Make t use (for now: consume) T_ARGUMENT-indexed Thing in inventory."""
- # TODO: Handle case where T_ARGUMENT matches nothing.
- if len(t["T_CARRIES"]):
- id = t["T_CARRIES"][t["T_ARGUMENT"]]
- type = world_db["Things"][id]["T_TYPE"]
- if world_db["ThingTypes"][type]["TT_TOOL"] == "food":
- t["T_CARRIES"].remove(id)
- del world_db["Things"][id]
- t["T_SATIATION"] += world_db["ThingTypes"][type]["TT_TOOLPOWER"]
- if t == world_db["Things"][0]:
- log("You CONSUME this object.")
- elif t == world_db["Things"][0]:
- log("You try to use this object, but FAIL.")
-
-
-def thingproliferation(t, prol_map):
- """To chance of 1/TT_PROLIFERATE, create t offspring in open neighbor cell.
-
- Naturally only works with TT_PROLIFERATE > 0. The neighbor cell must be be
- marked '.' in prol_map. If there are several map cell candidates, one is
- selected randomly.
- """
- prolscore = world_db["ThingTypes"][t["T_TYPE"]]["TT_PROLIFERATE"]
- if prolscore and (1 == prolscore or 1 == (rand.next() % prolscore)):
- candidates = []
- for dir in [directions_db[key] for key in sorted(directions_db.keys())]:
- mv_result = mv_yx_in_dir_legal(dir, t["T_POSY"], t["T_POSX"])
- if mv_result[0] and ord('.') == prol_map[mv_result[1]
- * world_db["MAP_LENGTH"]
- + mv_result[2]]:
- candidates.append((mv_result[1], mv_result[2]))
- if len(candidates):
- i = rand.next() % len(candidates)
- id = id_setter(-1, "Things")
- newT = new_Thing(t["T_TYPE"], (candidates[i][0], candidates[i][1]))
- world_db["Things"][id] = newT
-
-
-def try_healing(t):
- """If t's HP < max, increment them if well-nourished, maybe waiting."""
- if t["T_LIFEPOINTS"] < \
- world_db["ThingTypes"][t["T_TYPE"]]["TT_LIFEPOINTS"]:
- wait_id = [id for id in world_db["ThingActions"]
- if world_db["ThingActions"][id]["TA_NAME"] == "wait"][0]
- wait_divider = 8 if t["T_COMMAND"] == wait_id else 1
- testval = int(abs(t["T_SATIATION"]) / wait_divider)
- if (testval <= 1 or 1 == (rand.next() % testval)):
- t["T_LIFEPOINTS"] += 1
- if t == world_db["Things"][0]:
- log("You HEAL.")
-
-
-def hunger_per_turn(type_id):
- """The amount of satiation score lost per turn for things of given type."""
- return int(math.sqrt(world_db["ThingTypes"][type_id]["TT_LIFEPOINTS"]))
-
-
-def hunger(t):
- """Decrement t's satiation,dependent on it trigger lifepoint dec chance."""
- if t["T_SATIATION"] > -32768:
- t["T_SATIATION"] -= hunger_per_turn(t["T_TYPE"])
- if 0 != t["T_SATIATION"] and 0 == int(rand.next() / abs(t["T_SATIATION"])):
- if t == world_db["Things"][0]:
- if t["T_SATIATION"] < 0:
- log("You SUFFER from hunger.")
- else:
- log("You SUFFER from over-eating.")
- decrement_lifepoints(t)
-
-
-def get_dir_to_target(t, filter):
- """Try to set T_COMMAND/T_ARGUMENT for move to "filter"-determined target.
-
- The path-wise nearest target is chosen, via the shortest available path.
- Target must not be t. On succcess, return positive value, else False.
- Filters:
- "a": Thing in FOV is animate, but of ThingType, starts out weaker than t
- is, and its corpse would be healthy food for t
- "f": move away from an enemy – any visible actor whose thing type has more
- TT_LIFEPOINTS than t LIFEPOINTS, and might find t's corpse healthy
- food – if it is closer than n steps, where n will shrink as t's hunger
- grows; if enemy is too close, move towards (attack) the enemy instead;
- if no fleeing is possible, nor attacking useful, wait; don't tread on
- non-enemies for fleeing
- "c": Thing in memorized map is consumable of sufficient nutrition for t
- "s": memory map cell with greatest-reachable degree of unexploredness
- """
-
- def zero_score_map_where_char_on_memdepthmap(c):
- # OUTSOURCED FOR PERFORMANCE REASONS TO libplomrogue.so:
- # for i in [i for i in range(world_db["MAP_LENGTH"] ** 2)
- # if t["T_MEMDEPTHMAP"][i] == mem_depth_c[0]]:
- # set_map_score(i, 0)
- map = c_pointer_to_bytearray(t["T_MEMDEPTHMAP"])
- if libpr.zero_score_map_where_char_on_memdepthmap(c, map):
- raise RuntimeError("No score map allocated for "
- "zero_score_map_where_char_on_memdepthmap().")
-
- def set_map_score_at_thingpos(id, score):
- pos = world_db["Things"][id]["T_POSY"] * world_db["MAP_LENGTH"] \
- + world_db["Things"][id]["T_POSX"]
- set_map_score(pos, score)
-
- def set_map_score(pos, score):
- test = libpr.set_map_score(pos, score)
- if test:
- raise RuntimeError("No score map allocated for set_map_score().")
-
- def get_map_score(pos):
- result = libpr.get_map_score(pos)
- if result < 0:
- raise RuntimeError("No score map allocated for get_map_score().")
- return result
-
- def animate_in_fov(Thing):
- if Thing["carried"] or Thing == t or not Thing["T_LIFEPOINTS"]:
- return False
- pos = Thing["T_POSY"] * world_db["MAP_LENGTH"] + Thing["T_POSX"]
- if ord("v") == t["fovmap"][pos]:
- return True
-
- def good_attack_target(v):
- eat_cost = eat_vs_hunger_threshold(t["T_TYPE"])
- type = world_db["ThingTypes"][v["T_TYPE"]]
- type_corpse = world_db["ThingTypes"][type["TT_CORPSE_ID"]]
- if t["T_LIFEPOINTS"] > type["TT_LIFEPOINTS"] \
- and type_corpse["TT_TOOL"] == "food" \
- and type_corpse["TT_TOOLPOWER"] > eat_cost:
- return True
- return False
-
- def good_flee_target(m):
- own_corpse_id = world_db["ThingTypes"][t["T_TYPE"]]["TT_CORPSE_ID"]
- corpse_type = world_db["ThingTypes"][own_corpse_id]
- targetness = 0 if corpse_type["TT_TOOL"] != "food" \
- else corpse_type["TT_TOOLPOWER"]
- type = world_db["ThingTypes"][m["T_TYPE"]]
- if t["T_LIFEPOINTS"] < type["TT_LIFEPOINTS"] \
- and targetness > eat_vs_hunger_threshold(m["T_TYPE"]):
- return True
- return False
-
- def seeing_thing():
- if t["fovmap"] and "a" == filter:
- for id in world_db["Things"]:
- if animate_in_fov(world_db["Things"][id]):
- if good_attack_target(world_db["Things"][id]):
- return True
- elif t["fovmap"] and "f" == filter:
- for id in world_db["Things"]:
- if animate_in_fov(world_db["Things"][id]):
- if good_flee_target(world_db["Things"][id]):
- return True
- elif t["T_MEMMAP"] and "c" == filter:
- eat_cost = eat_vs_hunger_threshold(t["T_TYPE"])
- for mt in t["T_MEMTHING"]:
- if ' ' != chr(t["T_MEMMAP"][(mt[1] * world_db["MAP_LENGTH"])
- + mt[2]]) \
- and world_db["ThingTypes"][mt[0]]["TT_TOOL"] == "food" \
- and world_db["ThingTypes"][mt[0]]["TT_TOOLPOWER"] \
- > eat_cost:
- return True
- return False
-
- def set_cells_passable_on_memmap_to_65534_on_scoremap():
- # OUTSOURCED FOR PERFORMANCE REASONS TO libplomrogue.so:
- # ord_dot = ord(".")
- # memmap = t["T_MEMMAP"]
- # for i in [i for i in range(world_db["MAP_LENGTH"] ** 2)
- # if ord_dot == memmap[i]]:
- # set_map_score(i, 65534) # i.e. 65535-1
- map = c_pointer_to_bytearray(t["T_MEMMAP"])
- if libpr.set_cells_passable_on_memmap_to_65534_on_scoremap(map):
- raise RuntimeError("No score map allocated for set_cells_passable"
- "_on_memmap_to_65534_on_scoremap().")
-
- def init_score_map():
- test = libpr.init_score_map()
- if test:
- raise RuntimeError("Malloc error in init_score_map().")
- ord_v = ord("v")
- ord_blank = ord(" ")
- set_cells_passable_on_memmap_to_65534_on_scoremap()
- if "a" == filter:
- for id in world_db["Things"]:
- if animate_in_fov(world_db["Things"][id]) \
- and good_attack_target(world_db["Things"][id]):
- set_map_score_at_thingpos(id, 0)
- elif "f" == filter:
- for id in world_db["Things"]:
- if animate_in_fov(world_db["Things"][id]) \
- and good_flee_target(world_db["Things"][id]):
- set_map_score_at_thingpos(id, 0)
- elif "c" == filter:
- eat_cost = eat_vs_hunger_threshold(t["T_TYPE"])
- for mt in [mt for mt in t["T_MEMTHING"]
- if ord_blank != t["T_MEMMAP"][mt[1]
- * world_db["MAP_LENGTH"]
- + mt[2]]
- if world_db["ThingTypes"][mt[0]]["TT_TOOL"] == "food"
- if world_db["ThingTypes"][mt[0]]["TT_TOOLPOWER"]
- > eat_cost]:
- set_map_score(mt[1] * world_db["MAP_LENGTH"] + mt[2], 0)
- elif "s" == filter:
- zero_score_map_where_char_on_memdepthmap(mem_depth_c[0])
- if "a" != filter:
- for id in world_db["Things"]:
- if animate_in_fov(world_db["Things"][id]):
- if "f" == filter:
- pos = world_db["Things"][id]["T_POSY"] \
- * world_db["MAP_LENGTH"] \
- + world_db["Things"][id]["T_POSX"]
- if 0 == get_map_score(pos):
- continue
- set_map_score_at_thingpos(id, 65535)
-
- def rand_target_dir(neighbors, cmp, dirs):
- candidates = []
- n_candidates = 0
- for i in range(len(dirs)):
- if cmp == neighbors[i]:
- candidates.append(dirs[i])
- n_candidates += 1
- return candidates[rand.next() % n_candidates] if n_candidates else 0
-
- def get_neighbor_scores(dirs, eye_pos):
- scores = []
- if libpr.ready_neighbor_scores(eye_pos):
- raise RuntimeError("No score map allocated for " +
- "ready_neighbor_scores.()")
- for i in range(len(dirs)):
- scores.append(libpr.get_neighbor_score(i))
- return scores
-
- def get_dir_from_neighbors():
- dir_to_target = False
- dirs = "edcxsw"
- eye_pos = t["T_POSY"] * world_db["MAP_LENGTH"] + t["T_POSX"]
- neighbors = get_neighbor_scores(dirs, eye_pos)
- minmax_start = 0 if "f" == filter else 65535 - 1
- minmax_neighbor = minmax_start
- for i in range(len(dirs)):
- if ("f" == filter and get_map_score(eye_pos) < neighbors[i] and
- minmax_neighbor < neighbors[i] and 65535 != neighbors[i]) \
- or ("f" != filter and minmax_neighbor > neighbors[i]):
- minmax_neighbor = neighbors[i]
- if minmax_neighbor != minmax_start:
- dir_to_target = rand_target_dir(neighbors, minmax_neighbor, dirs)
- if "f" == filter:
- distance = get_map_score(eye_pos)
- fear_distance = world_db["MAP_LENGTH"]
- if t["T_SATIATION"] < 0 and math.sqrt(-t["T_SATIATION"]) > 0:
- fear_distance = fear_distance / math.sqrt(-t["T_SATIATION"])
- attack_distance = 1
- if not dir_to_target:
- if attack_distance >= distance:
- dir_to_target = rand_target_dir(neighbors,
- distance - 1, dirs)
- elif fear_distance >= distance:
- t["T_COMMAND"] = [id for id in world_db["ThingActions"]
- if
- world_db["ThingActions"][id]["TA_NAME"]
- == "wait"][0]
- return 1
- elif dir_to_target and fear_distance < distance:
- dir_to_target = 0
- return dir_to_target
-
- dir_to_target = False
- mem_depth_c = b' '
- run_i = 9 + 1 if "s" == filter else 1
- while run_i and not dir_to_target and ("s" == filter or seeing_thing()):
- run_i -= 1
- init_score_map()
- mem_depth_c = b'9' if b' ' == mem_depth_c \
- else bytes([mem_depth_c[0] - 1])
- if libpr.dijkstra_map():
- raise RuntimeError("No score map allocated for dijkstra_map().")
- dir_to_target = get_dir_from_neighbors()
- libpr.free_score_map()
- if dir_to_target and str == type(dir_to_target):
- t["T_COMMAND"] = [id for id in world_db["ThingActions"]
- if world_db["ThingActions"][id]["TA_NAME"]
- == "move"][0]
- t["T_ARGUMENT"] = ord(dir_to_target)
- return dir_to_target
-
-
-def standing_on_food(t):
- """Return True/False whether t is standing on healthy consumable."""
- eat_cost = eat_vs_hunger_threshold(t["T_TYPE"])
- for id in [id for id in world_db["Things"] if world_db["Things"][id] != t
- if not world_db["Things"][id]["carried"]
- if world_db["Things"][id]["T_POSY"] == t["T_POSY"]
- if world_db["Things"][id]["T_POSX"] == t["T_POSX"]
- if world_db["ThingTypes"][world_db["Things"][id]["T_TYPE"]]
- ["TT_TOOL"] == "food"
- if world_db["ThingTypes"][world_db["Things"][id]["T_TYPE"]]
- ["TT_TOOLPOWER"] > eat_cost]:
- return True
- return False
-
-
-def get_inventory_slot_to_consume(t):
- """Return invent. slot of healthiest consumable(if any healthy),else -1."""
- cmp_food = -1
- selection = -1
- i = 0
- eat_cost = eat_vs_hunger_threshold(t["T_TYPE"])
- for id in t["T_CARRIES"]:
- type = world_db["Things"][id]["T_TYPE"]
- if world_db["ThingTypes"][type]["TT_TOOL"] == "food" \
- and world_db["ThingTypes"][type]["TT_TOOLPOWER"]:
- nutvalue = world_db["ThingTypes"][type]["TT_TOOLPOWER"]
- tmp_cmp = abs(t["T_SATIATION"] + nutvalue - eat_cost)
- if (cmp_food < 0 and tmp_cmp < abs(t["T_SATIATION"])) \
- or tmp_cmp < cmp_food:
- cmp_food = tmp_cmp
- selection = i
- i += 1
- return selection
-
-
-def ai(t):
- """Determine next command/argment for actor t via AI algorithms."""
- t["T_COMMAND"] = [id for id in world_db["ThingActions"]
- if world_db["ThingActions"][id]["TA_NAME"] == "wait"][0]
- if get_dir_to_target(t, "f"):
- return
- sel = get_inventory_slot_to_consume(t)
- if -1 != sel:
- t["T_COMMAND"] = [id for id in world_db["ThingActions"]
- if world_db["ThingActions"][id]["TA_NAME"]
- == "use"][0]
- t["T_ARGUMENT"] = sel
- elif standing_on_food(t):
- t["T_COMMAND"] = [id for id in world_db["ThingActions"]
- if world_db["ThingActions"][id]["TA_NAME"]
- == "pick_up"][0]
- else:
- going_to_known_food_spot = get_dir_to_target(t, "c")
- if not going_to_known_food_spot:
- aiming_for_walking_food = get_dir_to_target(t, "a")
- if not aiming_for_walking_food:
- get_dir_to_target(t, "s")
-
-
-def turn_over():
- """Run game world and its inhabitants until new player input expected."""
- id = 0
- whilebreaker = False
- while world_db["Things"][0]["T_LIFEPOINTS"]:
- proliferable_map = world_db["MAP"][:]
- for id in [id for id in world_db["Things"]
- if not world_db["Things"][id]["carried"]]:
- y = world_db["Things"][id]["T_POSY"]
- x = world_db["Things"][id]["T_POSX"]
- proliferable_map[y * world_db["MAP_LENGTH"] + x] = ord('X')
- for id in [id for id in world_db["Things"]]: # Only what's from start!
- if not id in world_db["Things"] or \
- world_db["Things"][id]["carried"]: # May have been consumed or
- continue # picked up during turn …
- Thing = world_db["Things"][id]
- if Thing["T_LIFEPOINTS"]:
- if not Thing["T_COMMAND"]:
- update_map_memory(Thing)
- if 0 == id:
- whilebreaker = True
- break
- ai(Thing)
- try_healing(Thing)
- hunger(Thing)
- if Thing["T_LIFEPOINTS"]:
- Thing["T_PROGRESS"] += 1
- taid = [a for a in world_db["ThingActions"]
- if a == Thing["T_COMMAND"]][0]
- ThingAction = world_db["ThingActions"][taid]
- if Thing["T_PROGRESS"] == ThingAction["TA_EFFORT"]:
- eval("actor_" + ThingAction["TA_NAME"])(Thing)
- Thing["T_COMMAND"] = 0
- Thing["T_PROGRESS"] = 0
- thingproliferation(Thing, proliferable_map)
- if whilebreaker:
- break
- world_db["TURN"] += 1
-
-
-def new_Thing(type, pos=(0, 0)):
- """Return Thing of type T_TYPE, with fovmap if alive and world active."""
- thing = {
- "T_LIFEPOINTS": world_db["ThingTypes"][type]["TT_LIFEPOINTS"],
- "T_ARGUMENT": 0,
- "T_PROGRESS": 0,
- "T_SATIATION": 0,
- "T_COMMAND": 0,
- "T_TYPE": type,
- "T_POSY": pos[0],
- "T_POSX": pos[1],
- "T_CARRIES": [],
- "carried": False,
- "T_MEMTHING": [],
- "T_MEMMAP": False,
- "T_MEMDEPTHMAP": False,
- "fovmap": False
- }
- if world_db["WORLD_ACTIVE"] and thing["T_LIFEPOINTS"]:
- build_fov_map(thing)
- return thing
-
-
-def id_setter(id, category, id_store=False, start_at_1=False):
- """Set ID of object of category to manipulate ID unused? Create new one.
- The ID is stored as id_store.id (if id_store is set). If the integer of the
- input is valid (if start_at_1, >= 0, else >= -1), but <0 or (if start_at_1)
- <1, calculate new ID: lowest unused ID >=0 or (if start_at_1) >= 1. None is
- always returned when no new object is created, else the new object's ID.
- """
- min = 0 if start_at_1 else -1
- if str == type(id):
- id = integer_test(id, min)
- if None != id:
- if id in world_db[category]:
- if id_store:
- id_store.id = id
- return None
- else:
- if (start_at_1 and 0 == id) \
- or ((not start_at_1) and (id < 0)):
- id = 0 if start_at_1 else -1
- while 1:
- id = id + 1
- if id not in world_db[category]:
- break
- if id_store:
- id_store.id = id
- return id
-
-
-def command_plugin(str_plugin):
- """Run code in plugins/[str_plugin]."""
- if (str_plugin.replace("_", "").isalnum()
- and os.access("plugins/" + str_plugin, os.F_OK)):
- exec(open("plugins/" + str_plugin).read())
- return
- print("Bad plugin name:", str_plugin)
-
-
-def command_ping():
- """Send PONG line to server output file."""
- strong_write(io_db["file_out"], "PONG\n")
-
-
-def command_quit():
- """Abort server process."""
- if None == opts.replay:
- if world_db["WORLD_ACTIVE"]:
- save_world()
- atomic_write(io_db["path_record"], io_db["record_chunk"], do_append=True)
- raise SystemExit("received QUIT command")
-
-
-def command_thingshere(str_y, str_x):
- """Write to out file list of Things known to player at coordinate y, x."""
- if world_db["WORLD_ACTIVE"]:
- y = integer_test(str_y, 0, 255)
- x = integer_test(str_x, 0, 255)
- length = world_db["MAP_LENGTH"]
- if None != y and None != x and y < length and x < length:
- pos = (y * world_db["MAP_LENGTH"]) + x
- strong_write(io_db["file_out"], "THINGS_HERE START\n")
- if "v" == chr(world_db["Things"][0]["fovmap"][pos]):
- for id in [id for tid in sorted(list(world_db["ThingTypes"]))
- for id in world_db["Things"]
- if not world_db["Things"][id]["carried"]
- if world_db["Things"][id]["T_TYPE"] == tid
- if y == world_db["Things"][id]["T_POSY"]
- if x == world_db["Things"][id]["T_POSX"]]:
- type = world_db["Things"][id]["T_TYPE"]
- name = world_db["ThingTypes"][type]["TT_NAME"]
- strong_write(io_db["file_out"], name + "\n")
- else:
- for mt in [mt for tid in sorted(list(world_db["ThingTypes"]))
- for mt in world_db["Things"][0]["T_MEMTHING"]
- if mt[0] == tid if y == mt[1] if x == mt[2]]:
- name = world_db["ThingTypes"][mt[0]]["TT_NAME"]
- strong_write(io_db["file_out"], name + "\n")
- strong_write(io_db["file_out"], "THINGS_HERE END\n")
- else:
- print("Ignoring: Invalid map coordinates.")
- else:
- print("Ignoring: Command only works on existing worlds.")
-
-
-def set_command(action):
- """Set player's T_COMMAND, then call turn_over()."""
- id = [x for x in world_db["ThingActions"]
- if world_db["ThingActions"][x]["TA_NAME"] == action][0]
- world_db["Things"][0]["T_COMMAND"] = id
- turn_over()
-
-
-def play_wait():
- """Try "wait" as player's T_COMMAND."""
- set_command("wait")
-
-
-def play_pickup():
- """Try "pick_up" as player's T_COMMAND"."""
- t = world_db["Things"][0]
- ids = [id for id in world_db["Things"] if id
- if not world_db["Things"][id]["carried"]
- if world_db["Things"][id]["T_POSY"] == t["T_POSY"]
- if world_db["Things"][id]["T_POSX"] == t["T_POSX"]]
- if not len(ids):
- log("NOTHING to pick up.")
- else:
- set_command("pick_up")
-
-
-def play_drop(str_arg):
- """Try "drop" as player's T_COMMAND, int(str_arg) as T_ARGUMENT / slot."""
- t = world_db["Things"][0]
- if 0 == len(t["T_CARRIES"]):
- log("You have NOTHING to drop in your inventory.")
- else:
- val = integer_test(str_arg, 0, 255)
- if None != val and val < len(t["T_CARRIES"]):
- world_db["Things"][0]["T_ARGUMENT"] = val
- set_command("drop")
- else:
- print("Illegal inventory index.")
-
-
-def play_use(str_arg):
- """Try "use" as player's T_COMMAND, int(str_arg) as T_ARGUMENT / slot."""
- t = world_db["Things"][0]
- if 0 == len(t["T_CARRIES"]):
- log("You have NOTHING to use in your inventory.")
- else:
- val = integer_test(str_arg, 0, 255)
- if None != val and val < len(t["T_CARRIES"]):
- id = t["T_CARRIES"][val]
- type = world_db["Things"][id]["T_TYPE"]
- if not world_db["ThingTypes"][type]["TT_TOOL"] == "food":
- log("You CAN'T consume this thing.")
- return
- world_db["Things"][0]["T_ARGUMENT"] = val
- set_command("use")
- else:
- print("Illegal inventory index.")
-
-
-def play_move(str_arg):
- """Try "move" as player's T_COMMAND, str_arg as T_ARGUMENT / direction."""
- t = world_db["Things"][0]
- if not str_arg in directions_db:
- print("Illegal move direction string.")
- return
- dir = ord(directions_db[str_arg])
- move_result = mv_yx_in_dir_legal(chr(dir), t["T_POSY"], t["T_POSX"])
- if 1 == move_result[0]:
- pos = (move_result[1] * world_db["MAP_LENGTH"]) + move_result[2]
- if ord(".") == world_db["MAP"][pos]:
- world_db["Things"][0]["T_ARGUMENT"] = dir
- set_command("move")
- return
- log("You CAN'T move there.")
-
-
-def command_seedrandomness(seed_string):
- """Set rand seed to int(seed_string)."""
- val = integer_test(seed_string, 0, 4294967295)
- if None != val:
- rand.seed = val
-
-
-def command_makeworld(seed_string):
- """(Re-)build game world, i.e. map, things, to a new turn 1 from seed.
-
- Seed rand with seed. Do more only with a "wait" ThingAction and
- world["PLAYER_TYPE"] matching ThingType of TT_START_NUMBER > 0. Then,
- world_db["Things"] emptied, call make_map() and set
- world_db["WORLD_ACTIVE"], world_db["TURN"] to 1. Build new Things
- according to ThingTypes' TT_START_NUMBERS, with Thing of ID 0 to ThingType
- of ID = world["PLAYER_TYPE"]. Place Things randomly, and actors not on each
- other. Init player's memory map. Write "NEW_WORLD" line to out file.
- """
-
- def free_pos():
- i = 0
- while 1:
- err = "Space to put thing on too hard to find. Map too small?"
- while 1:
- y = rand.next() % world_db["MAP_LENGTH"]
- x = rand.next() % world_db["MAP_LENGTH"]
- if "." == chr(world_db["MAP"][y * world_db["MAP_LENGTH"] + x]):
- break
- i += 1
- if i == 65535:
- raise SystemExit(err)
- # Replica of C code, wrongly ignores animatedness of new Thing.
- pos_clear = (0 == len([id for id in world_db["Things"]
- if world_db["Things"][id]["T_LIFEPOINTS"]
- if world_db["Things"][id]["T_POSY"] == y
- if world_db["Things"][id]["T_POSX"] == x]))
- if pos_clear:
- break
- return (y, x)
-
- val = integer_test(seed_string, 0, 4294967295)
- if None == val:
- return
- rand.seed = val
- player_will_be_generated = False
- playertype = world_db["PLAYER_TYPE"]
- for ThingType in world_db["ThingTypes"]:
- if playertype == ThingType:
- if 0 < world_db["ThingTypes"][ThingType]["TT_START_NUMBER"]:
- player_will_be_generated = True
- break
- if not player_will_be_generated:
- print("Ignoring: No player type with start number >0 defined.")
- return
- wait_action = False
- for ThingAction in world_db["ThingActions"]:
- if "wait" == world_db["ThingActions"][ThingAction]["TA_NAME"]:
- wait_action = True
- if not wait_action:
- print("Ignoring beyond SEED_MAP: " +
- "No thing action with name 'wait' defined.")
- return
- world_db["Things"] = {}
- make_map()
- world_db["WORLD_ACTIVE"] = 1
- world_db["TURN"] = 1
- for i in range(world_db["ThingTypes"][playertype]["TT_START_NUMBER"]):
- id = id_setter(-1, "Things")
- world_db["Things"][id] = new_Thing(playertype, free_pos())
- if not world_db["Things"][0]["fovmap"]:
- empty_fovmap = bytearray(b" " * world_db["MAP_LENGTH"] ** 2)
- world_db["Things"][0]["fovmap"] = empty_fovmap
- update_map_memory(world_db["Things"][0])
- for type in world_db["ThingTypes"]:
- for i in range(world_db["ThingTypes"][type]["TT_START_NUMBER"]):
- if type != playertype:
- id = id_setter(-1, "Things")
- world_db["Things"][id] = new_Thing(type, free_pos())
- strong_write(io_db["file_out"], "NEW_WORLD\n")
-
-
-def command_maplength(maplength_string):
- """Redefine map length. Invalidate map, therefore lose all things on it."""
- val = integer_test(maplength_string, 1, 256)
- if None != val:
- world_db["MAP_LENGTH"] = val
- world_db["MAP"] = False
- set_world_inactive()
- world_db["Things"] = {}
- libpr.set_maplength(val)
-
-
-def command_worldactive(worldactive_string):
- """Toggle world_db["WORLD_ACTIVE"] if possible.
-
- An active world can always be set inactive. An inactive world can only be
- set active with a "wait" ThingAction, and a player Thing (of ID 0), and a
- map. On activation, rebuild all Things' FOVs, and the player's map memory.
- """
- val = integer_test(worldactive_string, 0, 1)
- if None != val:
- if 0 != world_db["WORLD_ACTIVE"]:
- if 0 == val:
- set_world_inactive()
- else:
- print("World already active.")
- elif 0 == world_db["WORLD_ACTIVE"]:
- wait_exists = False
- for ThingAction in world_db["ThingActions"]:
- if "wait" == world_db["ThingActions"][ThingAction]["TA_NAME"]:
- wait_exists = True
- break
- player_exists = False
- for Thing in world_db["Things"]:
- if 0 == Thing:
- player_exists = True
- break
- if wait_exists and player_exists and world_db["MAP"]:
- for id in world_db["Things"]:
- if world_db["Things"][id]["T_LIFEPOINTS"]:
- build_fov_map(world_db["Things"][id])
- if 0 == id:
- update_map_memory(world_db["Things"][id], False)
- if not world_db["Things"][0]["T_LIFEPOINTS"]:
- empty_fovmap = bytearray(b" " * world_db["MAP_LENGTH"] ** 2)
- world_db["Things"][0]["fovmap"] = empty_fovmap
- world_db["WORLD_ACTIVE"] = 1
- else:
- print("Ignoring: Not all conditions for world activation met.")
-
-
-def test_for_id_maker(object, category):
- """Return decorator testing for object having "id" attribute."""
- def decorator(f):
- def helper(*args):
- if hasattr(object, "id"):
- f(*args)
- else:
- print("Ignoring: No " + category +
- " defined to manipulate yet.")
- return helper
- return decorator
-
-
-def command_tid(id_string):
- """Set ID of Thing to manipulate. ID unused? Create new one.
-
- Default new Thing's type to the first available ThingType, others: zero.
- """
- id = id_setter(id_string, "Things", command_tid)
- if None != id:
- if world_db["ThingTypes"] == {}:
- print("Ignoring: No ThingType to settle new Thing in.")
- return
- type = list(world_db["ThingTypes"].keys())[0]
- world_db["Things"][id] = new_Thing(type)
-
-
-test_Thing_id = test_for_id_maker(command_tid, "Thing")
-
-
-@test_Thing_id
-def command_tcommand(str_int):
- """Set T_COMMAND of selected Thing."""
- val = integer_test(str_int, 0)
- if None != val:
- if 0 == val or val in world_db["ThingActions"]:
- world_db["Things"][command_tid.id]["T_COMMAND"] = val
- else:
- print("Ignoring: ThingAction ID belongs to no known ThingAction.")
-
-
-@test_Thing_id
-def command_ttype(str_int):
- """Set T_TYPE of selected Thing."""
- val = integer_test(str_int, 0)
- if None != val:
- if val in world_db["ThingTypes"]:
- world_db["Things"][command_tid.id]["T_TYPE"] = val
- else:
- print("Ignoring: ThingType ID belongs to no known ThingType.")
-
-
-@test_Thing_id
-def command_tcarries(str_int):
- """Append int(str_int) to T_CARRIES of selected Thing.
-
- The ID int(str_int) must not be of the selected Thing, and must belong to a
- Thing with unset "carried" flag. Its "carried" flag will be set on owning.
- """
- val = integer_test(str_int, 0)
- if None != val:
- if val == command_tid.id:
- print("Ignoring: Thing cannot carry itself.")
- elif val in world_db["Things"] \
- and not world_db["Things"][val]["carried"]:
- world_db["Things"][command_tid.id]["T_CARRIES"].append(val)
- world_db["Things"][val]["carried"] = True
- else:
- print("Ignoring: Thing not available for carrying.")
- # Note that the whole carrying structure is different from the C version:
- # Carried-ness is marked by a "carried" flag, not by Things containing
- # Things internally.
-
-
-@test_Thing_id
-def command_tmemthing(str_t, str_y, str_x):
- """Add (int(str_t), int(str_y), int(str_x)) to selected Thing's T_MEMTHING.
-
- The type must fit to an existing ThingType, and the position into the map.
- """
- type = integer_test(str_t, 0)
- posy = integer_test(str_y, 0, 255)
- posx = integer_test(str_x, 0, 255)
- if None != type and None != posy and None != posx:
- if type not in world_db["ThingTypes"] \
- or posy >= world_db["MAP_LENGTH"] or posx >= world_db["MAP_LENGTH"]:
- print("Ignoring: Illegal value for thing type or position.")
- else:
- memthing = (type, posy, posx)
- world_db["Things"][command_tid.id]["T_MEMTHING"].append(memthing)
-
-
-def setter_map(maptype):
- """Set (world or Thing's) map of maptype's int(str_int)-th line to mapline.
-
- If no map of maptype exists yet, initialize it with ' ' bytes first.
- """
-
- def valid_map_line(str_int, mapline):
- val = integer_test(str_int, 0, 255)
- if None != val:
- if val >= world_db["MAP_LENGTH"]:
- print("Illegal value for map line number.")
- elif len(mapline) != world_db["MAP_LENGTH"]:
- print("Map line length is unequal map width.")
- else:
- return val
- return None
-
- def nonThingMap_helper(str_int, mapline):
- val = valid_map_line(str_int, mapline)
- if None != val:
- length = world_db["MAP_LENGTH"]
- if not world_db["MAP"]:
- map = bytearray(b' ' * (length ** 2))
- else:
- map = world_db["MAP"]
- map[val * length:(val * length) + length] = mapline.encode()
- if not world_db["MAP"]:
- world_db["MAP"] = map
-
- @test_Thing_id
- def ThingMap_helper(str_int, mapline):
- val = valid_map_line(str_int, mapline)
- if None != val:
- length = world_db["MAP_LENGTH"]
- if not world_db["Things"][command_tid.id][maptype]:
- map = bytearray(b' ' * (length ** 2))
- else:
- map = world_db["Things"][command_tid.id][maptype]
- map[val * length:(val * length) + length] = mapline.encode()
- if not world_db["Things"][command_tid.id][maptype]:
- world_db["Things"][command_tid.id][maptype] = map
-
- return nonThingMap_helper if maptype == "MAP" else ThingMap_helper
-
-
-
-def setter_tpos(axis):
- """Generate setter for T_POSX or T_POSY of selected Thing.
-
- If world is active, rebuilds animate things' fovmap, player's memory map.
- """
- @test_Thing_id
- def helper(str_int):
- val = integer_test(str_int, 0, 255)
- if None != val:
- if val < world_db["MAP_LENGTH"]:
- world_db["Things"][command_tid.id]["T_POS" + axis] = val
- if world_db["WORLD_ACTIVE"] \
- and world_db["Things"][command_tid.id]["T_LIFEPOINTS"]:
- build_fov_map(world_db["Things"][command_tid.id])
- if 0 == command_tid.id:
- update_map_memory(world_db["Things"][command_tid.id])
- else:
- print("Ignoring: Position is outside of map.")
- return helper
-
-
-def command_ttid(id_string):
- """Set ID of ThingType to manipulate. ID unused? Create new one.
-
- Default new ThingType's TT_SYMBOL to "?", TT_CORPSE_ID to self, TT_TOOL to
- "", others: 0.
- """
- id = id_setter(id_string, "ThingTypes", command_ttid)
- if None != id:
- world_db["ThingTypes"][id] = {
- "TT_NAME": "(none)",
- "TT_TOOLPOWER": 0,
- "TT_LIFEPOINTS": 0,
- "TT_PROLIFERATE": 0,
- "TT_START_NUMBER": 0,
- "TT_SYMBOL": "?",
- "TT_CORPSE_ID": id,
- "TT_TOOL": ""
- }
-
-
-test_ThingType_id = test_for_id_maker(command_ttid, "ThingType")
-
-
-@test_ThingType_id
-def command_ttname(name):
- """Set TT_NAME of selected ThingType."""
- world_db["ThingTypes"][command_ttid.id]["TT_NAME"] = name
-
-
-@test_ThingType_id
-def command_tttool(name):
- """Set TT_TOOL of selected ThingType."""
- world_db["ThingTypes"][command_ttid.id]["TT_TOOL"] = name
-
-
-@test_ThingType_id
-def command_ttsymbol(char):
- """Set TT_SYMBOL of selected ThingType. """
- if 1 == len(char):
- world_db["ThingTypes"][command_ttid.id]["TT_SYMBOL"] = char
- else:
- print("Ignoring: Argument must be single character.")
-
-
-@test_ThingType_id
-def command_ttcorpseid(str_int):
- """Set TT_CORPSE_ID of selected ThingType."""
- val = integer_test(str_int, 0)
- if None != val:
- if val in world_db["ThingTypes"]:
- world_db["ThingTypes"][command_ttid.id]["TT_CORPSE_ID"] = val
- else:
- print("Ignoring: Corpse ID belongs to no known ThignType.")
-
-
-def command_taid(id_string):
- """Set ID of ThingAction to manipulate. ID unused? Create new one.
-
- Default new ThingAction's TA_EFFORT to 1, its TA_NAME to "wait".
- """
- id = id_setter(id_string, "ThingActions", command_taid, True)
- if None != id:
- world_db["ThingActions"][id] = {
- "TA_EFFORT": 1,
- "TA_NAME": "wait"
- }
-
-
-test_ThingAction_id = test_for_id_maker(command_taid, "ThingAction")
-
-
-@test_ThingAction_id
-def command_taname(name):
- """Set TA_NAME of selected ThingAction.
-
- The name must match a valid thing action function. If after the name
- setting no ThingAction with name "wait" remains, call set_world_inactive().
- """
- if name == "wait" or name == "move" or name == "use" or name == "drop" \
- or name == "pick_up":
- world_db["ThingActions"][command_taid.id]["TA_NAME"] = name
- if 1 == world_db["WORLD_ACTIVE"]:
- wait_defined = False
- for id in world_db["ThingActions"]:
- if "wait" == world_db["ThingActions"][id]["TA_NAME"]:
- wait_defined = True
- break
- if not wait_defined:
- set_world_inactive()
- else:
- print("Ignoring: Invalid action name.")
- # In contrast to the original,naming won't map a function to a ThingAction.
-
-
-def command_ai():
- """Call ai() on player Thing, then turn_over()."""
- ai(world_db["Things"][0])
- turn_over()
-
-
-"""Commands database.
-
-Map command start tokens to ([0]) number of expected command arguments, ([1])
-the command's meta-ness (i.e. is it to be written to the record file, is it to
-be ignored in replay mode if read from server input file), and ([2]) a function
-to be called on it.
-"""
-commands_db = {
- "PLUGIN": (1, True, command_plugin),
- "QUIT": (0, True, command_quit),
- "PING": (0, True, command_ping),
- "THINGS_HERE": (2, True, command_thingshere),
- "MAKE_WORLD": (1, False, command_makeworld),
- "SEED_RANDOMNESS": (1, False, command_seedrandomness),
- "TURN": (1, False, setter(None, "TURN", 0, 65535)),
- "PLAYER_TYPE": (1, False, setter(None, "PLAYER_TYPE", 0)),
- "MAP_LENGTH": (1, False, command_maplength),
- "WORLD_ACTIVE": (1, False, command_worldactive),
- "MAP": (2, False, setter_map("MAP")),
- "TA_ID": (1, False, command_taid),
- "TA_EFFORT": (1, False, setter("ThingAction", "TA_EFFORT", 0, 255)),
- "TA_NAME": (1, False, command_taname),
- "TT_ID": (1, False, command_ttid),
- "TT_NAME": (1, False, command_ttname),
- "TT_TOOL": (1, False, command_tttool),
- "TT_SYMBOL": (1, False, command_ttsymbol),
- "TT_CORPSE_ID": (1, False, command_ttcorpseid),
- "TT_TOOLPOWER": (1, False, setter("ThingType", "TT_TOOLPOWER", 0, 65535)),
- "TT_START_NUMBER": (1, False, setter("ThingType", "TT_START_NUMBER",
- 0, 255)),
- "TT_PROLIFERATE": (1, False, setter("ThingType", "TT_PROLIFERATE",
- 0, 65535)),
- "TT_LIFEPOINTS": (1, False, setter("ThingType", "TT_LIFEPOINTS", 0, 255)),
- "T_ID": (1, False, command_tid),
- "T_ARGUMENT": (1, False, setter("Thing", "T_ARGUMENT", 0, 255)),
- "T_PROGRESS": (1, False, setter("Thing", "T_PROGRESS", 0, 255)),
- "T_LIFEPOINTS": (1, False, setter("Thing", "T_LIFEPOINTS", 0, 255)),
- "T_SATIATION": (1, False, setter("Thing", "T_SATIATION", -32768, 32767)),
- "T_COMMAND": (1, False, command_tcommand),
- "T_TYPE": (1, False, command_ttype),
- "T_CARRIES": (1, False, command_tcarries),
- "T_MEMMAP": (2, False, setter_map("T_MEMMAP")),
- "T_MEMDEPTHMAP": (2, False, setter_map("T_MEMDEPTHMAP")),
- "T_MEMTHING": (3, False, command_tmemthing),
- "T_POSY": (1, False, setter_tpos("Y")),
- "T_POSX": (1, False, setter_tpos("X")),
- "wait": (0, False, play_wait),
- "move": (1, False, play_move),
- "pick_up": (0, False, play_pickup),
- "drop": (1, False, play_drop),
- "use": (1, False, play_use),
- "ai": (0, False, command_ai)
-}
-# TODO: Unhandled cases: (Un-)killing animates (esp. player!) with T_LIFEPOINTS.
-
-
+from server.io import cleanup_server_io
try:
- libpr = prep_library()
- rand = RandomnessIO()
- opts = parse_command_line_arguments()
+ from server.utils import opts
+ from server.config.io import io_db
if opts.savefile:
io_db["path_save"] = opts.savefile
io_db["path_record"] = "record_" + opts.savefile
+ from server.io import setup_server_io
setup_server_io()
if opts.verbose:
io_db["verbose"] = True
+ import os
+ from server.config.world_data import world_db
+ from server.io import read_command, try_worldstate_update, obey
if None != opts.replay:
replay_game()
else:
--- /dev/null
+from server.config.world_data import world_db
+from server.io import log
+
+
+def actor_wait(t):
+ """Make t do nothing (but loudly, if player avatar)."""
+ if t == world_db["Things"][0]:
+ log("You wait")
+
+
+def actor_move(t):
+ """If passable, move/collide(=attack) thing into T_ARGUMENT's direction."""
+ from server.world import build_fov_map, decrement_lifepoints
+ from server.utils import mv_yx_in_dir_legal
+ from server.config.world_data import directions_db
+ passable = False
+ move_result = mv_yx_in_dir_legal(chr(t["T_ARGUMENT"]),
+ t["T_POSY"], t["T_POSX"])
+ if 1 == move_result[0]:
+ pos = (move_result[1] * world_db["MAP_LENGTH"]) + move_result[2]
+ hitted = [id for id in world_db["Things"]
+ if world_db["Things"][id] != t
+ if world_db["Things"][id]["T_LIFEPOINTS"]
+ if world_db["Things"][id]["T_POSY"] == move_result[1]
+ if world_db["Things"][id]["T_POSX"] == move_result[2]]
+ if len(hitted):
+ hit_id = hitted[0]
+ if t == world_db["Things"][0]:
+ hitted_type = world_db["Things"][hit_id]["T_TYPE"]
+ hitted_name = world_db["ThingTypes"][hitted_type]["TT_NAME"]
+ log("You WOUND" + hitted_name + ".")
+ elif 0 == hit_id:
+ hitter_name = world_db["ThingTypes"][t["T_TYPE"]]["TT_NAME"]
+ log(hitter_name +" WOUNDS you.")
+ decrement_lifepoints(world_db["Things"][hit_id])
+ return
+ passable = "." == chr(world_db["MAP"][pos])
+ dir = [dir for dir in directions_db
+ if directions_db[dir] == chr(t["T_ARGUMENT"])][0]
+ if passable:
+ t["T_POSY"] = move_result[1]
+ t["T_POSX"] = move_result[2]
+ for id in t["T_CARRIES"]:
+ world_db["Things"][id]["T_POSY"] = move_result[1]
+ world_db["Things"][id]["T_POSX"] = move_result[2]
+ build_fov_map(t)
+ if t == world_db["Things"][0]:
+ log("You MOVE " + dir + ".")
+
+
+def actor_pick_up(t):
+ """Make t pick up (topmost?) Thing from ground into inventory.
+
+ Define topmostness by how low the thing's type ID is.
+ """
+ ids = [id for id in world_db["Things"] if world_db["Things"][id] != t
+ if not world_db["Things"][id]["carried"]
+ if world_db["Things"][id]["T_POSY"] == t["T_POSY"]
+ if world_db["Things"][id]["T_POSX"] == t["T_POSX"]]
+ if len(ids):
+ lowest_tid = -1
+ for iid in ids:
+ tid = world_db["Things"][iid]["T_TYPE"]
+ if lowest_tid == -1 or tid < lowest_tid:
+ id = iid
+ lowest_tid = tid
+ world_db["Things"][id]["carried"] = True
+ t["T_CARRIES"].append(id)
+ if t == world_db["Things"][0]:
+ log("You PICK UP an object.")
+
+
+def actor_drop(t):
+ """Make t rop Thing from inventory to ground indexed by T_ARGUMENT."""
+ # TODO: Handle case where T_ARGUMENT matches nothing.
+ if len(t["T_CARRIES"]):
+ id = t["T_CARRIES"][t["T_ARGUMENT"]]
+ t["T_CARRIES"].remove(id)
+ world_db["Things"][id]["carried"] = False
+ if t == world_db["Things"][0]:
+ log("You DROP an object.")
+
+
+def actor_use(t):
+ """Make t use (for now: consume) T_ARGUMENT-indexed Thing in inventory."""
+ # TODO: Handle case where T_ARGUMENT matches nothing.
+ if len(t["T_CARRIES"]):
+ id = t["T_CARRIES"][t["T_ARGUMENT"]]
+ type = world_db["Things"][id]["T_TYPE"]
+ if world_db["ThingTypes"][type]["TT_TOOL"] == "food":
+ t["T_CARRIES"].remove(id)
+ del world_db["Things"][id]
+ t["T_SATIATION"] += world_db["ThingTypes"][type]["TT_TOOLPOWER"]
+ if t == world_db["Things"][0]:
+ log("You CONSUME this object.")
+ elif t == world_db["Things"][0]:
+ log("You try to use this object, but FAIL.")
--- /dev/null
+from server.config.world_data import world_db
+
+
+def eat_vs_hunger_threshold(thingtype):
+ """Return satiation cost of eating for type. Good food for it must be >."""
+ from server.world import hunger_per_turn
+ hunger_unit = hunger_per_turn(thingtype)
+ actiontype = [id for id in world_db["ThingActions"]
+ if world_db["ThingActions"][id]["TA_NAME"] == "use"][0]
+ return world_db["ThingActions"][actiontype]["TA_EFFORT"] * hunger_unit
+
+
+def get_dir_to_target(t, filter):
+ """Try to set T_COMMAND/T_ARGUMENT for move to "filter"-determined target.
+
+ The path-wise nearest target is chosen, via the shortest available path.
+ Target must not be t. On succcess, return positive value, else False.
+ Filters:
+ "a": Thing in FOV is animate, but of ThingType, starts out weaker than t
+ is, and its corpse would be healthy food for t
+ "f": move away from an enemy – any visible actor whose thing type has more
+ TT_LIFEPOINTS than t LIFEPOINTS, and might find t's corpse healthy
+ food – if it is closer than n steps, where n will shrink as t's hunger
+ grows; if enemy is too close, move towards (attack) the enemy instead;
+ if no fleeing is possible, nor attacking useful, wait; don't tread on
+ non-enemies for fleeing
+ "c": Thing in memorized map is consumable of sufficient nutrition for t
+ "s": memory map cell with greatest-reachable degree of unexploredness
+ """
+ from server.utils import rand, libpr, c_pointer_to_bytearray
+
+ def zero_score_map_where_char_on_memdepthmap(c):
+ # OUTSOURCED FOR PERFORMANCE REASONS TO libplomrogue.so:
+ # for i in [i for i in range(world_db["MAP_LENGTH"] ** 2)
+ # if t["T_MEMDEPTHMAP"][i] == mem_depth_c[0]]:
+ # set_map_score(i, 0)
+ map = c_pointer_to_bytearray(t["T_MEMDEPTHMAP"])
+ if libpr.zero_score_map_where_char_on_memdepthmap(c, map):
+ raise RuntimeError("No score map allocated for "
+ "zero_score_map_where_char_on_memdepthmap().")
+
+ def set_map_score_at_thingpos(id, score):
+ pos = world_db["Things"][id]["T_POSY"] * world_db["MAP_LENGTH"] \
+ + world_db["Things"][id]["T_POSX"]
+ set_map_score(pos, score)
+
+ def set_map_score(pos, score):
+ test = libpr.set_map_score(pos, score)
+ if test:
+ raise RuntimeError("No score map allocated for set_map_score().")
+
+ def get_map_score(pos):
+ result = libpr.get_map_score(pos)
+ if result < 0:
+ raise RuntimeError("No score map allocated for get_map_score().")
+ return result
+
+ def animate_in_fov(Thing):
+ if Thing["carried"] or Thing == t or not Thing["T_LIFEPOINTS"]:
+ return False
+ pos = Thing["T_POSY"] * world_db["MAP_LENGTH"] + Thing["T_POSX"]
+ if ord("v") == t["fovmap"][pos]:
+ return True
+
+ def good_attack_target(v):
+ eat_cost = eat_vs_hunger_threshold(t["T_TYPE"])
+ type = world_db["ThingTypes"][v["T_TYPE"]]
+ type_corpse = world_db["ThingTypes"][type["TT_CORPSE_ID"]]
+ if t["T_LIFEPOINTS"] > type["TT_LIFEPOINTS"] \
+ and type_corpse["TT_TOOL"] == "food" \
+ and type_corpse["TT_TOOLPOWER"] > eat_cost:
+ return True
+ return False
+
+ def good_flee_target(m):
+ own_corpse_id = world_db["ThingTypes"][t["T_TYPE"]]["TT_CORPSE_ID"]
+ corpse_type = world_db["ThingTypes"][own_corpse_id]
+ targetness = 0 if corpse_type["TT_TOOL"] != "food" \
+ else corpse_type["TT_TOOLPOWER"]
+ type = world_db["ThingTypes"][m["T_TYPE"]]
+ if t["T_LIFEPOINTS"] < type["TT_LIFEPOINTS"] \
+ and targetness > eat_vs_hunger_threshold(m["T_TYPE"]):
+ return True
+ return False
+
+ def seeing_thing():
+ if t["fovmap"] and "a" == filter:
+ for id in world_db["Things"]:
+ if animate_in_fov(world_db["Things"][id]):
+ if good_attack_target(world_db["Things"][id]):
+ return True
+ elif t["fovmap"] and "f" == filter:
+ for id in world_db["Things"]:
+ if animate_in_fov(world_db["Things"][id]):
+ if good_flee_target(world_db["Things"][id]):
+ return True
+ elif t["T_MEMMAP"] and "c" == filter:
+ eat_cost = eat_vs_hunger_threshold(t["T_TYPE"])
+ for mt in t["T_MEMTHING"]:
+ if ' ' != chr(t["T_MEMMAP"][(mt[1] * world_db["MAP_LENGTH"])
+ + mt[2]]) \
+ and world_db["ThingTypes"][mt[0]]["TT_TOOL"] == "food" \
+ and world_db["ThingTypes"][mt[0]]["TT_TOOLPOWER"] \
+ > eat_cost:
+ return True
+ return False
+
+ def set_cells_passable_on_memmap_to_65534_on_scoremap():
+ # OUTSOURCED FOR PERFORMANCE REASONS TO libplomrogue.so:
+ # ord_dot = ord(".")
+ # memmap = t["T_MEMMAP"]
+ # for i in [i for i in range(world_db["MAP_LENGTH"] ** 2)
+ # if ord_dot == memmap[i]]:
+ # set_map_score(i, 65534) # i.e. 65535-1
+ map = c_pointer_to_bytearray(t["T_MEMMAP"])
+ if libpr.set_cells_passable_on_memmap_to_65534_on_scoremap(map):
+ raise RuntimeError("No score map allocated for set_cells_passable"
+ "_on_memmap_to_65534_on_scoremap().")
+
+ def init_score_map():
+ test = libpr.init_score_map()
+ if test:
+ raise RuntimeError("Malloc error in init_score_map().")
+ ord_v = ord("v")
+ ord_blank = ord(" ")
+ set_cells_passable_on_memmap_to_65534_on_scoremap()
+ if "a" == filter:
+ for id in world_db["Things"]:
+ if animate_in_fov(world_db["Things"][id]) \
+ and good_attack_target(world_db["Things"][id]):
+ set_map_score_at_thingpos(id, 0)
+ elif "f" == filter:
+ for id in world_db["Things"]:
+ if animate_in_fov(world_db["Things"][id]) \
+ and good_flee_target(world_db["Things"][id]):
+ set_map_score_at_thingpos(id, 0)
+ elif "c" == filter:
+ eat_cost = eat_vs_hunger_threshold(t["T_TYPE"])
+ for mt in [mt for mt in t["T_MEMTHING"]
+ if ord_blank != t["T_MEMMAP"][mt[1]
+ * world_db["MAP_LENGTH"]
+ + mt[2]]
+ if world_db["ThingTypes"][mt[0]]["TT_TOOL"] == "food"
+ if world_db["ThingTypes"][mt[0]]["TT_TOOLPOWER"]
+ > eat_cost]:
+ set_map_score(mt[1] * world_db["MAP_LENGTH"] + mt[2], 0)
+ elif "s" == filter:
+ zero_score_map_where_char_on_memdepthmap(mem_depth_c[0])
+ if "a" != filter:
+ for id in world_db["Things"]:
+ if animate_in_fov(world_db["Things"][id]):
+ if "f" == filter:
+ pos = world_db["Things"][id]["T_POSY"] \
+ * world_db["MAP_LENGTH"] \
+ + world_db["Things"][id]["T_POSX"]
+ if 0 == get_map_score(pos):
+ continue
+ set_map_score_at_thingpos(id, 65535)
+
+ def rand_target_dir(neighbors, cmp, dirs):
+ candidates = []
+ n_candidates = 0
+ for i in range(len(dirs)):
+ if cmp == neighbors[i]:
+ candidates.append(dirs[i])
+ n_candidates += 1
+ return candidates[rand.next() % n_candidates] if n_candidates else 0
+
+ def get_neighbor_scores(dirs, eye_pos):
+ scores = []
+ if libpr.ready_neighbor_scores(eye_pos):
+ raise RuntimeError("No score map allocated for " +
+ "ready_neighbor_scores.()")
+ for i in range(len(dirs)):
+ scores.append(libpr.get_neighbor_score(i))
+ return scores
+
+ def get_dir_from_neighbors():
+ import math
+ dir_to_target = False
+ dirs = "edcxsw"
+ eye_pos = t["T_POSY"] * world_db["MAP_LENGTH"] + t["T_POSX"]
+ neighbors = get_neighbor_scores(dirs, eye_pos)
+ minmax_start = 0 if "f" == filter else 65535 - 1
+ minmax_neighbor = minmax_start
+ for i in range(len(dirs)):
+ if ("f" == filter and get_map_score(eye_pos) < neighbors[i] and
+ minmax_neighbor < neighbors[i] and 65535 != neighbors[i]) \
+ or ("f" != filter and minmax_neighbor > neighbors[i]):
+ minmax_neighbor = neighbors[i]
+ if minmax_neighbor != minmax_start:
+ dir_to_target = rand_target_dir(neighbors, minmax_neighbor, dirs)
+ if "f" == filter:
+ distance = get_map_score(eye_pos)
+ fear_distance = world_db["MAP_LENGTH"]
+ if t["T_SATIATION"] < 0 and math.sqrt(-t["T_SATIATION"]) > 0:
+ fear_distance = fear_distance / math.sqrt(-t["T_SATIATION"])
+ attack_distance = 1
+ if not dir_to_target:
+ if attack_distance >= distance:
+ dir_to_target = rand_target_dir(neighbors,
+ distance - 1, dirs)
+ elif fear_distance >= distance:
+ t["T_COMMAND"] = [id for id in world_db["ThingActions"]
+ if
+ world_db["ThingActions"][id]["TA_NAME"]
+ == "wait"][0]
+ return 1
+ elif dir_to_target and fear_distance < distance:
+ dir_to_target = 0
+ return dir_to_target
+
+ dir_to_target = False
+ mem_depth_c = b' '
+ run_i = 9 + 1 if "s" == filter else 1
+ while run_i and not dir_to_target and ("s" == filter or seeing_thing()):
+ run_i -= 1
+ init_score_map()
+ mem_depth_c = b'9' if b' ' == mem_depth_c \
+ else bytes([mem_depth_c[0] - 1])
+ if libpr.dijkstra_map():
+ raise RuntimeError("No score map allocated for dijkstra_map().")
+ dir_to_target = get_dir_from_neighbors()
+ libpr.free_score_map()
+ if dir_to_target and str == type(dir_to_target):
+ t["T_COMMAND"] = [id for id in world_db["ThingActions"]
+ if world_db["ThingActions"][id]["TA_NAME"]
+ == "move"][0]
+ t["T_ARGUMENT"] = ord(dir_to_target)
+ return dir_to_target
+
+
+def standing_on_food(t):
+ """Return True/False whether t is standing on healthy consumable."""
+ eat_cost = eat_vs_hunger_threshold(t["T_TYPE"])
+ for id in [id for id in world_db["Things"] if world_db["Things"][id] != t
+ if not world_db["Things"][id]["carried"]
+ if world_db["Things"][id]["T_POSY"] == t["T_POSY"]
+ if world_db["Things"][id]["T_POSX"] == t["T_POSX"]
+ if world_db["ThingTypes"][world_db["Things"][id]["T_TYPE"]]
+ ["TT_TOOL"] == "food"
+ if world_db["ThingTypes"][world_db["Things"][id]["T_TYPE"]]
+ ["TT_TOOLPOWER"] > eat_cost]:
+ return True
+ return False
+
+
+def get_inventory_slot_to_consume(t):
+ """Return invent. slot of healthiest consumable(if any healthy),else -1."""
+ cmp_food = -1
+ selection = -1
+ i = 0
+ eat_cost = eat_vs_hunger_threshold(t["T_TYPE"])
+ for id in t["T_CARRIES"]:
+ type = world_db["Things"][id]["T_TYPE"]
+ if world_db["ThingTypes"][type]["TT_TOOL"] == "food" \
+ and world_db["ThingTypes"][type]["TT_TOOLPOWER"]:
+ nutvalue = world_db["ThingTypes"][type]["TT_TOOLPOWER"]
+ tmp_cmp = abs(t["T_SATIATION"] + nutvalue - eat_cost)
+ if (cmp_food < 0 and tmp_cmp < abs(t["T_SATIATION"])) \
+ or tmp_cmp < cmp_food:
+ cmp_food = tmp_cmp
+ selection = i
+ i += 1
+ return selection
+
+
+def ai(t):
+ """Determine next command/argment for actor t via AI algorithms."""
+ t["T_COMMAND"] = [id for id in world_db["ThingActions"]
+ if world_db["ThingActions"][id]["TA_NAME"] == "wait"][0]
+ if get_dir_to_target(t, "f"):
+ return
+ sel = get_inventory_slot_to_consume(t)
+ if -1 != sel:
+ t["T_COMMAND"] = [id for id in world_db["ThingActions"]
+ if world_db["ThingActions"][id]["TA_NAME"]
+ == "use"][0]
+ t["T_ARGUMENT"] = sel
+ elif standing_on_food(t):
+ t["T_COMMAND"] = [id for id in world_db["ThingActions"]
+ if world_db["ThingActions"][id]["TA_NAME"]
+ == "pick_up"][0]
+ else:
+ going_to_known_food_spot = get_dir_to_target(t, "c")
+ if not going_to_known_food_spot:
+ aiming_for_walking_food = get_dir_to_target(t, "a")
+ if not aiming_for_walking_food:
+ get_dir_to_target(t, "s")
--- /dev/null
+from server.config.world_data import world_db
+from server.config.io import io_db
+from server.io import log, strong_write
+from server.utils import integer_test, id_setter
+from server.world import build_fov_map, update_map_memory, set_world_inactive,\
+ turn_over
+
+
+def command_plugin(str_plugin):
+ """Run code in plugins/[str_plugin]."""
+ import os
+ if (str_plugin.replace("_", "").isalnum()
+ and os.access("plugins/" + str_plugin, os.F_OK)):
+ exec(open("plugins/" + str_plugin).read())
+ return
+ print("Bad plugin name:", str_plugin)
+
+
+def command_ping():
+ """Send PONG line to server output file."""
+ strong_write(io_db["file_out"], "PONG\n")
+
+
+def command_quit():
+ """Abort server process."""
+ from server.io import save_world, atomic_write
+ from server.utils import opts
+ if None == opts.replay:
+ if world_db["WORLD_ACTIVE"]:
+ save_world()
+ atomic_write(io_db["path_record"], io_db["record_chunk"],
+ do_append=True)
+ raise SystemExit("received QUIT command")
+
+
+def command_thingshere(str_y, str_x):
+ """Write to out file list of Things known to player at coordinate y, x."""
+ if world_db["WORLD_ACTIVE"]:
+ y = integer_test(str_y, 0, 255)
+ x = integer_test(str_x, 0, 255)
+ length = world_db["MAP_LENGTH"]
+ if None != y and None != x and y < length and x < length:
+ pos = (y * world_db["MAP_LENGTH"]) + x
+ strong_write(io_db["file_out"], "THINGS_HERE START\n")
+ if "v" == chr(world_db["Things"][0]["fovmap"][pos]):
+ for id in [id for tid in sorted(list(world_db["ThingTypes"]))
+ for id in world_db["Things"]
+ if not world_db["Things"][id]["carried"]
+ if world_db["Things"][id]["T_TYPE"] == tid
+ if y == world_db["Things"][id]["T_POSY"]
+ if x == world_db["Things"][id]["T_POSX"]]:
+ type = world_db["Things"][id]["T_TYPE"]
+ name = world_db["ThingTypes"][type]["TT_NAME"]
+ strong_write(io_db["file_out"], name + "\n")
+ else:
+ for mt in [mt for tid in sorted(list(world_db["ThingTypes"]))
+ for mt in world_db["Things"][0]["T_MEMTHING"]
+ if mt[0] == tid if y == mt[1] if x == mt[2]]:
+ name = world_db["ThingTypes"][mt[0]]["TT_NAME"]
+ strong_write(io_db["file_out"], name + "\n")
+ strong_write(io_db["file_out"], "THINGS_HERE END\n")
+ else:
+ print("Ignoring: Invalid map coordinates.")
+ else:
+ print("Ignoring: Command only works on existing worlds.")
+
+
+def command_seedrandomness(seed_string):
+ """Set rand seed to int(seed_string)."""
+ from server.utils import rand
+ val = integer_test(seed_string, 0, 4294967295)
+ if None != val:
+ rand.seed = val
+
+
+def command_makeworld(seed_string):
+ """Call make_world()."""
+ val = integer_test(seed_string, 0, 4294967295)
+ if None != val:
+ from server.world import make_world
+ make_world(val)
+
+
+def command_maplength(maplength_string):
+ """Redefine map length. Invalidate map, therefore lose all things on it."""
+ val = integer_test(maplength_string, 1, 256)
+ if None != val:
+ from server.utils import libpr
+ world_db["MAP_LENGTH"] = val
+ world_db["MAP"] = False
+ set_world_inactive()
+ world_db["Things"] = {}
+ libpr.set_maplength(val)
+
+
+def command_worldactive(worldactive_string):
+ """Toggle world_db["WORLD_ACTIVE"] if possible.
+
+ An active world can always be set inactive. An inactive world can only be
+ set active with a "wait" ThingAction, and a player Thing (of ID 0), and a
+ map. On activation, rebuild all Things' FOVs, and the player's map memory.
+ """
+ val = integer_test(worldactive_string, 0, 1)
+ if None != val:
+ if 0 != world_db["WORLD_ACTIVE"]:
+ if 0 == val:
+ set_world_inactive()
+ else:
+ print("World already active.")
+ elif 0 == world_db["WORLD_ACTIVE"]:
+ wait_exists = False
+ for ThingAction in world_db["ThingActions"]:
+ if "wait" == world_db["ThingActions"][ThingAction]["TA_NAME"]:
+ wait_exists = True
+ break
+ player_exists = False
+ for Thing in world_db["Things"]:
+ if 0 == Thing:
+ player_exists = True
+ break
+ if wait_exists and player_exists and world_db["MAP"]:
+ for id in world_db["Things"]:
+ if world_db["Things"][id]["T_LIFEPOINTS"]:
+ build_fov_map(world_db["Things"][id])
+ if 0 == id:
+ update_map_memory(world_db["Things"][id], False)
+ if not world_db["Things"][0]["T_LIFEPOINTS"]:
+ empty_fovmap = bytearray(b" " * world_db["MAP_LENGTH"] ** 2)
+ world_db["Things"][0]["fovmap"] = empty_fovmap
+ world_db["WORLD_ACTIVE"] = 1
+ else:
+ print("Ignoring: Not all conditions for world activation met.")
+
+
+def command_tid(id_string):
+ """Set ID of Thing to manipulate. ID unused? Create new one.
+
+ Default new Thing's type to the first available ThingType, others: zero.
+ """
+ id = id_setter(id_string, "Things", command_tid)
+ if None != id:
+ if world_db["ThingTypes"] == {}:
+ print("Ignoring: No ThingType to settle new Thing in.")
+ return
+ type = list(world_db["ThingTypes"].keys())[0]
+ from server.world import new_Thing
+ world_db["Things"][id] = new_Thing(type)
+
+
+def command_ttid(id_string):
+ """Set ID of ThingType to manipulate. ID unused? Create new one.
+
+ Default new ThingType's TT_SYMBOL to "?", TT_CORPSE_ID to self, TT_TOOL to
+ "", others: 0.
+ """
+ id = id_setter(id_string, "ThingTypes", command_ttid)
+ if None != id:
+ world_db["ThingTypes"][id] = {
+ "TT_NAME": "(none)",
+ "TT_TOOLPOWER": 0,
+ "TT_LIFEPOINTS": 0,
+ "TT_PROLIFERATE": 0,
+ "TT_START_NUMBER": 0,
+ "TT_SYMBOL": "?",
+ "TT_CORPSE_ID": id,
+ "TT_TOOL": ""
+ }
+
+
+def command_taid(id_string):
+ """Set ID of ThingAction to manipulate. ID unused? Create new one.
+
+ Default new ThingAction's TA_EFFORT to 1, its TA_NAME to "wait".
+ """
+ id = id_setter(id_string, "ThingActions", command_taid, True)
+ if None != id:
+ world_db["ThingActions"][id] = {
+ "TA_EFFORT": 1,
+ "TA_NAME": "wait"
+ }
+
+
+def test_for_id_maker(object, category):
+ """Return decorator testing for object having "id" attribute."""
+ def decorator(f):
+ def helper(*args):
+ if hasattr(object, "id"):
+ f(*args)
+ else:
+ print("Ignoring: No " + category +
+ " defined to manipulate yet.")
+ return helper
+ return decorator
+
+
+test_Thing_id = test_for_id_maker(command_tid, "Thing")
+test_ThingType_id = test_for_id_maker(command_ttid, "ThingType")
+test_ThingAction_id = test_for_id_maker(command_taid, "ThingAction")
+
+
+@test_Thing_id
+def command_tcommand(str_int):
+ """Set T_COMMAND of selected Thing."""
+ val = integer_test(str_int, 0)
+ if None != val:
+ if 0 == val or val in world_db["ThingActions"]:
+ world_db["Things"][command_tid.id]["T_COMMAND"] = val
+ else:
+ print("Ignoring: ThingAction ID belongs to no known ThingAction.")
+
+
+@test_Thing_id
+def command_ttype(str_int):
+ """Set T_TYPE of selected Thing."""
+ val = integer_test(str_int, 0)
+ if None != val:
+ if val in world_db["ThingTypes"]:
+ world_db["Things"][command_tid.id]["T_TYPE"] = val
+ else:
+ print("Ignoring: ThingType ID belongs to no known ThingType.")
+
+
+@test_Thing_id
+def command_tcarries(str_int):
+ """Append int(str_int) to T_CARRIES of selected Thing.
+
+ The ID int(str_int) must not be of the selected Thing, and must belong to a
+ Thing with unset "carried" flag. Its "carried" flag will be set on owning.
+ """
+ val = integer_test(str_int, 0)
+ if None != val:
+ if val == command_tid.id:
+ print("Ignoring: Thing cannot carry itself.")
+ elif val in world_db["Things"] \
+ and not world_db["Things"][val]["carried"]:
+ world_db["Things"][command_tid.id]["T_CARRIES"].append(val)
+ world_db["Things"][val]["carried"] = True
+ else:
+ print("Ignoring: Thing not available for carrying.")
+ # Note that the whole carrying structure is different from the C version:
+ # Carried-ness is marked by a "carried" flag, not by Things containing
+ # Things internally.
+
+
+@test_Thing_id
+def command_tmemthing(str_t, str_y, str_x):
+ """Add (int(str_t), int(str_y), int(str_x)) to selected Thing's T_MEMTHING.
+
+ The type must fit to an existing ThingType, and the position into the map.
+ """
+ type = integer_test(str_t, 0)
+ posy = integer_test(str_y, 0, 255)
+ posx = integer_test(str_x, 0, 255)
+ if None != type and None != posy and None != posx:
+ if type not in world_db["ThingTypes"] \
+ or posy >= world_db["MAP_LENGTH"] or posx >= world_db["MAP_LENGTH"]:
+ print("Ignoring: Illegal value for thing type or position.")
+ else:
+ memthing = (type, posy, posx)
+ world_db["Things"][command_tid.id]["T_MEMTHING"].append(memthing)
+
+
+@test_ThingType_id
+def command_ttname(name):
+ """Set TT_NAME of selected ThingType."""
+ world_db["ThingTypes"][command_ttid.id]["TT_NAME"] = name
+
+
+@test_ThingType_id
+def command_tttool(name):
+ """Set TT_TOOL of selected ThingType."""
+ world_db["ThingTypes"][command_ttid.id]["TT_TOOL"] = name
+
+
+@test_ThingType_id
+def command_ttsymbol(char):
+ """Set TT_SYMBOL of selected ThingType. """
+ if 1 == len(char):
+ world_db["ThingTypes"][command_ttid.id]["TT_SYMBOL"] = char
+ else:
+ print("Ignoring: Argument must be single character.")
+
+
+@test_ThingType_id
+def command_ttcorpseid(str_int):
+ """Set TT_CORPSE_ID of selected ThingType."""
+ val = integer_test(str_int, 0)
+ if None != val:
+ if val in world_db["ThingTypes"]:
+ world_db["ThingTypes"][command_ttid.id]["TT_CORPSE_ID"] = val
+ else:
+ print("Ignoring: Corpse ID belongs to no known ThignType.")
+
+
+@test_ThingAction_id
+def command_taname(name):
+ """Set TA_NAME of selected ThingAction.
+
+ The name must match a valid thing action function. If after the name
+ setting no ThingAction with name "wait" remains, call set_world_inactive().
+ """
+ if name == "wait" or name == "move" or name == "use" or name == "drop" \
+ or name == "pick_up":
+ world_db["ThingActions"][command_taid.id]["TA_NAME"] = name
+ if 1 == world_db["WORLD_ACTIVE"]:
+ wait_defined = False
+ for id in world_db["ThingActions"]:
+ if "wait" == world_db["ThingActions"][id]["TA_NAME"]:
+ wait_defined = True
+ break
+ if not wait_defined:
+ set_world_inactive()
+ else:
+ print("Ignoring: Invalid action name.")
+ # In contrast to the original,naming won't map a function to a ThingAction.
+
+
+def setter(category, key, min, max=None):
+ """Build setter for world_db([category + "s"][id])[key] to >=min/<=max."""
+ if category is None:
+ def f(val_string):
+ val = integer_test(val_string, min, max)
+ if None != val:
+ world_db[key] = val
+ else:
+ if category == "Thing":
+ id_store = command_tid
+ decorator = test_Thing_id
+ elif category == "ThingType":
+ id_store = command_ttid
+ decorator = test_ThingType_id
+ elif category == "ThingAction":
+ id_store = command_taid
+ decorator = test_ThingAction_id
+
+ @decorator
+ def f(val_string):
+ val = integer_test(val_string, min, max)
+ if None != val:
+ world_db[category + "s"][id_store.id][key] = val
+ return f
+
+
+def setter_map(maptype):
+ """Set (world or Thing's) map of maptype's int(str_int)-th line to mapline.
+
+ If no map of maptype exists yet, initialize it with ' ' bytes first.
+ """
+
+ def valid_map_line(str_int, mapline):
+ val = integer_test(str_int, 0, 255)
+ if None != val:
+ if val >= world_db["MAP_LENGTH"]:
+ print("Illegal value for map line number.")
+ elif len(mapline) != world_db["MAP_LENGTH"]:
+ print("Map line length is unequal map width.")
+ else:
+ return val
+ return None
+
+ def nonThingMap_helper(str_int, mapline):
+ val = valid_map_line(str_int, mapline)
+ if None != val:
+ length = world_db["MAP_LENGTH"]
+ if not world_db["MAP"]:
+ map = bytearray(b' ' * (length ** 2))
+ else:
+ map = world_db["MAP"]
+ map[val * length:(val * length) + length] = mapline.encode()
+ if not world_db["MAP"]:
+ world_db["MAP"] = map
+
+ @test_Thing_id
+ def ThingMap_helper(str_int, mapline):
+ val = valid_map_line(str_int, mapline)
+ if None != val:
+ length = world_db["MAP_LENGTH"]
+ if not world_db["Things"][command_tid.id][maptype]:
+ map = bytearray(b' ' * (length ** 2))
+ else:
+ map = world_db["Things"][command_tid.id][maptype]
+ map[val * length:(val * length) + length] = mapline.encode()
+ if not world_db["Things"][command_tid.id][maptype]:
+ world_db["Things"][command_tid.id][maptype] = map
+
+ return nonThingMap_helper if maptype == "MAP" else ThingMap_helper
+
+
+
+def setter_tpos(axis):
+ """Generate setter for T_POSX or T_POSY of selected Thing.
+
+ If world is active, rebuilds animate things' fovmap, player's memory map.
+ """
+ @test_Thing_id
+ def helper(str_int):
+ val = integer_test(str_int, 0, 255)
+ if None != val:
+ if val < world_db["MAP_LENGTH"]:
+ world_db["Things"][command_tid.id]["T_POS" + axis] = val
+ if world_db["WORLD_ACTIVE"] \
+ and world_db["Things"][command_tid.id]["T_LIFEPOINTS"]:
+ build_fov_map(world_db["Things"][command_tid.id])
+ if 0 == command_tid.id:
+ update_map_memory(world_db["Things"][command_tid.id])
+ else:
+ print("Ignoring: Position is outside of map.")
+ return helper
+
+
+def set_command(action):
+ """Set player's T_COMMAND, then call turn_over()."""
+ id = [x for x in world_db["ThingActions"]
+ if world_db["ThingActions"][x]["TA_NAME"] == action][0]
+ world_db["Things"][0]["T_COMMAND"] = id
+ turn_over()
+
+
+def play_wait():
+ """Try "wait" as player's T_COMMAND."""
+ set_command("wait")
+
+
+def play_pickup():
+ """Try "pick_up" as player's T_COMMAND"."""
+ t = world_db["Things"][0]
+ ids = [id for id in world_db["Things"] if id
+ if not world_db["Things"][id]["carried"]
+ if world_db["Things"][id]["T_POSY"] == t["T_POSY"]
+ if world_db["Things"][id]["T_POSX"] == t["T_POSX"]]
+ if not len(ids):
+ log("NOTHING to pick up.")
+ else:
+ set_command("pick_up")
+
+
+def play_drop(str_arg):
+ """Try "drop" as player's T_COMMAND, int(str_arg) as T_ARGUMENT / slot."""
+ t = world_db["Things"][0]
+ if 0 == len(t["T_CARRIES"]):
+ log("You have NOTHING to drop in your inventory.")
+ else:
+ val = integer_test(str_arg, 0, 255)
+ if None != val and val < len(t["T_CARRIES"]):
+ world_db["Things"][0]["T_ARGUMENT"] = val
+ set_command("drop")
+ else:
+ print("Illegal inventory index.")
+
+
+def play_use(str_arg):
+ """Try "use" as player's T_COMMAND, int(str_arg) as T_ARGUMENT / slot."""
+ t = world_db["Things"][0]
+ if 0 == len(t["T_CARRIES"]):
+ log("You have NOTHING to use in your inventory.")
+ else:
+ val = integer_test(str_arg, 0, 255)
+ if None != val and val < len(t["T_CARRIES"]):
+ id = t["T_CARRIES"][val]
+ type = world_db["Things"][id]["T_TYPE"]
+ if not world_db["ThingTypes"][type]["TT_TOOL"] == "food":
+ log("You CAN'T consume this thing.")
+ return
+ world_db["Things"][0]["T_ARGUMENT"] = val
+ set_command("use")
+ else:
+ print("Illegal inventory index.")
+
+
+def play_move(str_arg):
+ """Try "move" as player's T_COMMAND, str_arg as T_ARGUMENT / direction."""
+ from server.config.world_data import directions_db
+ t = world_db["Things"][0]
+ if not str_arg in directions_db:
+ print("Illegal move direction string.")
+ return
+ dir = ord(directions_db[str_arg])
+ from server.utils import mv_yx_in_dir_legal
+ move_result = mv_yx_in_dir_legal(chr(dir), t["T_POSY"], t["T_POSX"])
+ if 1 == move_result[0]:
+ pos = (move_result[1] * world_db["MAP_LENGTH"]) + move_result[2]
+ if ord(".") == world_db["MAP"][pos]:
+ world_db["Things"][0]["T_ARGUMENT"] = dir
+ set_command("move")
+ return
+ log("You CAN'T move there.")
+
+
+def command_ai():
+ """Call ai() on player Thing, then turn_over()."""
+ from server.ai import ai
+ ai(world_db["Things"][0])
+ turn_over()
--- /dev/null
+from server.actions import actor_wait, actor_move, actor_pick_up, actor_drop, \
+ actor_use
+
+action_db = {
+ "actor_wait": actor_wait,
+ "actor_move": actor_move,
+ "actor_pick_up": actor_pick_up,
+ "actor_drop": actor_drop,
+ "actor_use": actor_use
+}
--- /dev/null
+from server.commands import command_plugin, command_quit, command_ping, \
+ command_thingshere, command_makeworld, command_seedrandomness, setter, \
+ command_maplength, command_worldactive, setter_map, command_taid, \
+ command_taname, command_ttid, command_ttname, command_tttool, \
+ command_ttsymbol, command_ttcorpseid, command_tid, command_tcommand, \
+ command_ttype, command_tcarries, command_tmemthing, setter_tpos, \
+ play_wait, play_move, play_pickup, play_drop, play_use, command_ai
+
+
+"""Commands database.
+
+Map command start tokens to ([0]) number of expected command arguments, ([1])
+the command's meta-ness (i.e. is it to be written to the record file, is it to
+be ignored in replay mode if read from server input file), and ([2]) a function
+to be called on it.
+"""
+commands_db = {
+ "PLUGIN": (1, True, command_plugin),
+ "QUIT": (0, True, command_quit),
+ "PING": (0, True, command_ping),
+ "THINGS_HERE": (2, True, command_thingshere),
+ "MAKE_WORLD": (1, False, command_makeworld),
+ "SEED_RANDOMNESS": (1, False, command_seedrandomness),
+ "TURN": (1, False, setter(None, "TURN", 0, 65535)),
+ "PLAYER_TYPE": (1, False, setter(None, "PLAYER_TYPE", 0)),
+ "MAP_LENGTH": (1, False, command_maplength),
+ "WORLD_ACTIVE": (1, False, command_worldactive),
+ "MAP": (2, False, setter_map("MAP")),
+ "TA_ID": (1, False, command_taid),
+ "TA_EFFORT": (1, False, setter("ThingAction", "TA_EFFORT", 0, 255)),
+ "TA_NAME": (1, False, command_taname),
+ "TT_ID": (1, False, command_ttid),
+ "TT_NAME": (1, False, command_ttname),
+ "TT_TOOL": (1, False, command_tttool),
+ "TT_SYMBOL": (1, False, command_ttsymbol),
+ "TT_CORPSE_ID": (1, False, command_ttcorpseid),
+ "TT_TOOLPOWER": (1, False, setter("ThingType", "TT_TOOLPOWER", 0, 65535)),
+ "TT_START_NUMBER": (1, False, setter("ThingType", "TT_START_NUMBER",
+ 0, 255)),
+ "TT_PROLIFERATE": (1, False, setter("ThingType", "TT_PROLIFERATE",
+ 0, 65535)),
+ "TT_LIFEPOINTS": (1, False, setter("ThingType", "TT_LIFEPOINTS", 0, 255)),
+ "T_ID": (1, False, command_tid),
+ "T_ARGUMENT": (1, False, setter("Thing", "T_ARGUMENT", 0, 255)),
+ "T_PROGRESS": (1, False, setter("Thing", "T_PROGRESS", 0, 255)),
+ "T_LIFEPOINTS": (1, False, setter("Thing", "T_LIFEPOINTS", 0, 255)),
+ "T_SATIATION": (1, False, setter("Thing", "T_SATIATION", -32768, 32767)),
+ "T_COMMAND": (1, False, command_tcommand),
+ "T_TYPE": (1, False, command_ttype),
+ "T_CARRIES": (1, False, command_tcarries),
+ "T_MEMMAP": (2, False, setter_map("T_MEMMAP")),
+ "T_MEMDEPTHMAP": (2, False, setter_map("T_MEMDEPTHMAP")),
+ "T_MEMTHING": (3, False, command_tmemthing),
+ "T_POSY": (1, False, setter_tpos("Y")),
+ "T_POSX": (1, False, setter_tpos("X")),
+ "wait": (0, False, play_wait),
+ "move": (1, False, play_move),
+ "pick_up": (0, False, play_pickup),
+ "drop": (1, False, play_drop),
+ "use": (1, False, play_use),
+ "ai": (0, False, command_ai)
+}
+# TODO: Unhandled cases: (Un-)killing animates (esp. player!) with T_LIFEPOINTS.
"path_worldstate": "server_run/worldstate",
"tmp_suffix": "_tmp",
"kicked_by_rival": False,
- "worldstate_updateable": False
+ "worldstate_updateable": False,
+ "wait_on_read_fail": 0.03333,
+ "max_wait_on_read_fail": 5,
+ "save_wait": 15
}
-
--- /dev/null
+import os
+import time
+
+
+from server.config.world_data import world_db
+from server.config.io import io_db
+
+
+def server_test():
+ """Ensure valid server out file belonging to current process.
+
+ This is done by comparing io_db["teststring"] to what's found at the start
+ of the current file at io_db["path_out"]. On failure, set
+ io_db["kicked_by_rival"] and raise SystemExit.
+ """
+ if not os.access(io_db["path_out"], os.F_OK):
+ raise SystemExit("Server output file has disappeared.")
+ file = open(io_db["path_out"], "r")
+ test = file.readline().rstrip("\n")
+ file.close()
+ if test != io_db["teststring"]:
+ io_db["kicked_by_rival"] = True
+ msg = "Server test string in server output file does not match. This" \
+ " indicates that the current server process has been " \
+ "superseded by another one."
+ raise SystemExit(msg)
+
+
+def safely_remove_worldstate_file():
+ from server.io import server_test
+ if os.access(io_db["path_worldstate"], os.F_OK):
+ server_test()
+ os.remove(io_db["path_worldstate"])
+
+
+def atomic_write(path, text, do_append=False, delete=True):
+ """Atomic write of text to file at path, appended if do_append is set."""
+ path_tmp = path + io_db["tmp_suffix"]
+ mode = "w"
+ if do_append:
+ mode = "a"
+ if os.access(path, os.F_OK):
+ from shutil import copyfile
+ copyfile(path, path_tmp)
+ file = open(path_tmp, mode)
+ strong_write(file, text)
+ file.close()
+ if delete and os.access(path, os.F_OK):
+ os.remove(path)
+ os.rename(path_tmp, path)
+
+
+def strong_write(file, string):
+ """Apply write(string), then flush()."""
+ file.write(string)
+ file.flush()
+
+
+def setup_server_io():
+ """Fill IO files DB with proper file( path)s. Write process IO test string.
+
+ Ensure IO files directory at server/. Remove any old input file if found.
+ Set up new input file for reading, and new output file for appending. Start
+ output file with process hash line of format PID + " " + floated UNIX time
+ (io_db["teststring"]). Raise SystemExit if file is found at path of either
+ record or save file plus io_db["tmp_suffix"].
+ """
+ def detect_atomic_leftover(path, tmp_suffix):
+ path_tmp = path + tmp_suffix
+ msg = "Found file '" + path_tmp + "' that may be a leftover from an " \
+ "aborted previous attempt to write '" + path + "'. Aborting " \
+ "until matter is resolved by removing it from its current path."
+ if os.access(path_tmp, os.F_OK):
+ raise SystemExit(msg)
+ io_db["teststring"] = str(os.getpid()) + " " + str(time.time())
+ io_db["save_wait_start"] = 0
+ io_db["verbose"] = False
+ io_db["record_chunk"] = ""
+ os.makedirs(io_db["path_server"], exist_ok=True)
+ io_db["file_out"] = open(io_db["path_out"], "a")
+ strong_write(io_db["file_out"], io_db["teststring"] + "\n")
+ if os.access(io_db["path_in"], os.F_OK):
+ os.remove(io_db["path_in"])
+ io_db["file_in"] = open(io_db["path_in"], "w")
+ io_db["file_in"].close()
+ io_db["file_in"] = open(io_db["path_in"], "r")
+ detect_atomic_leftover(io_db["path_save"], io_db["tmp_suffix"])
+ detect_atomic_leftover(io_db["path_record"], io_db["tmp_suffix"])
+
+
+def cleanup_server_io():
+ """Close and (if io_db["kicked_by_rival"] false) remove files in io_db."""
+ def helper(file_key, path_key):
+ if file_key in io_db:
+ io_db[file_key].close()
+ if not io_db["kicked_by_rival"] \
+ and os.access(io_db[path_key], os.F_OK):
+ os.remove(io_db[path_key])
+ helper("file_in", "path_in")
+ helper("file_out", "path_out")
+ helper("file_worldstate", "path_worldstate")
+ if "file_record" in io_db:
+ io_db["file_record"].close()
+
+def read_command():
+ """Return next newline-delimited command from server in file.
+
+ Keep building return string until a newline is encountered. Pause between
+ unsuccessful reads, and after too much waiting, run server_test().
+ """
+ wait_on_fail = io_db["wait_on_read_fail"]
+ max_wait = io_db["max_wait_on_read_fail"]
+ now = time.time()
+ command = ""
+ while True:
+ add = io_db["file_in"].readline()
+ if len(add) > 0:
+ command = command + add
+ if len(command) > 0 and "\n" == command[-1]:
+ command = command[:-1]
+ break
+ else:
+ time.sleep(wait_on_fail)
+ if now + max_wait < time.time():
+ server_test()
+ now = time.time()
+ return command
+
+
+def log(msg):
+ """Send "msg" to log."""
+ strong_write(io_db["file_out"], "LOG " + msg + "\n")
+
+
+def save_world():
+ """Save all commands needed to reconstruct current world state."""
+ from server.utils import rand
+
+ def quote_escape(string):
+ string = string.replace("\u005C", '\u005C\u005C')
+ return '"' + string.replace('"', '\u005C"') + '"'
+
+ def mapsetter(key):
+ def helper(id=None):
+ string = ""
+ if key == "MAP" or world_db["Things"][id][key]:
+ map = world_db["MAP"] if key == "MAP" \
+ else world_db["Things"][id][key]
+ length = world_db["MAP_LENGTH"]
+ for i in range(length):
+ line = map[i * length:(i * length) + length].decode()
+ string = string + key + " " + str(i) + " " + \
+ quote_escape(line) + "\n"
+ return string
+ return helper
+
+ def memthing(id):
+ string = ""
+ for memthing in world_db["Things"][id]["T_MEMTHING"]:
+ string = string + "T_MEMTHING " + str(memthing[0]) + " " + \
+ str(memthing[1]) + " " + str(memthing[2]) + "\n"
+ return string
+
+ def helper(category, id_string, special_keys={}):
+ string = ""
+ for id in sorted(world_db[category].keys()):
+ string = string + id_string + " " + str(id) + "\n"
+ for key in sorted(world_db[category][id].keys()):
+ if not key in special_keys:
+ x = world_db[category][id][key]
+ argument = quote_escape(x) if str == type(x) else str(x)
+ string = string + key + " " + argument + "\n"
+ elif special_keys[key]:
+ string = string + special_keys[key](id)
+ return string
+
+ string = ""
+ for key in sorted(world_db.keys()):
+ if (not isinstance(world_db[key], dict)) and key != "MAP" and \
+ key != "WORLD_ACTIVE":
+ string = string + key + " " + str(world_db[key]) + "\n"
+ string = string + mapsetter("MAP")()
+ string = string + helper("ThingActions", "TA_ID")
+ string = string + helper("ThingTypes", "TT_ID", {"TT_CORPSE_ID": False})
+ for id in sorted(world_db["ThingTypes"].keys()):
+ string = string + "TT_ID " + str(id) + "\n" + "TT_CORPSE_ID " + \
+ str(world_db["ThingTypes"][id]["TT_CORPSE_ID"]) + "\n"
+ string = string + helper("Things", "T_ID",
+ {"T_CARRIES": False, "carried": False,
+ "T_MEMMAP": mapsetter("T_MEMMAP"),
+ "T_MEMTHING": memthing, "fovmap": False,
+ "T_MEMDEPTHMAP": mapsetter("T_MEMDEPTHMAP")})
+ for id in sorted(world_db["Things"].keys()):
+ if [] != world_db["Things"][id]["T_CARRIES"]:
+ string = string + "T_ID " + str(id) + "\n"
+ for carried in sorted(world_db["Things"][id]["T_CARRIES"]):
+ string = string + "T_CARRIES " + str(carried) + "\n"
+ string = string + "SEED_RANDOMNESS " + str(rand.seed) + "\n" + \
+ "WORLD_ACTIVE " + str(world_db["WORLD_ACTIVE"])
+ atomic_write(io_db["path_save"], string)
+
+
+def obey(command, prefix, replay=False, do_record=False):
+ """Call function from commands_db mapped to command's first token.
+
+ Tokenize command string with shlex.split(comments=True). If replay is set,
+ a non-meta command from the commands_db merely triggers obey() on the next
+ command from the records file. If not, non-meta commands set
+ io_db["worldstate_updateable"] to world_db["WORLD_ACTIVE"], and, if
+ do_record is set, are recorded to io_db["record_chunk"], and save_world()
+ is called (and io_db["record_chunk"] written) if io_db["save_wait"] seconds
+ have passed since the last time it was called. The prefix string is
+ inserted into the server's input message between its beginning 'input ' and
+ ':'. All activity is preceded by a server_test() call. Commands that start
+ with a lowercase letter are ignored when world_db["WORLD_ACTIVE"] is
+ False/0.
+ """
+ import shlex
+ from server.config.commands import commands_db
+ server_test()
+ if io_db["verbose"]:
+ print("input " + prefix + ": " + command)
+ try:
+ tokens = shlex.split(command, comments=True)
+ except ValueError as err:
+ print("Can't tokenize command string: " + str(err) + ".")
+ return
+ if len(tokens) > 0 and tokens[0] in commands_db \
+ and len(tokens) == commands_db[tokens[0]][0] + 1:
+ if commands_db[tokens[0]][1]:
+ commands_db[tokens[0]][2](*tokens[1:])
+ elif tokens[0][0].islower() and not world_db["WORLD_ACTIVE"]:
+ print("Ignoring lowercase-starting commands when world inactive.")
+ elif replay:
+ print("Due to replay mode, reading command as 'go on in record'.")
+ line = io_db["file_record"].readline()
+ if len(line) > 0:
+ obey(line.rstrip(), io_db["file_record"].prefix
+ + str(io_db["file_record"].line_n))
+ io_db["file_record"].line_n = io_db["file_record"].line_n + 1
+ else:
+ print("Reached end of record file.")
+ else:
+ commands_db[tokens[0]][2](*tokens[1:])
+ if do_record:
+ io_db["record_chunk"] += command + "\n"
+ if time.time() > io_db["save_wait_start"] + io_db["save_wait"]:
+ atomic_write(io_db["path_record"], io_db["record_chunk"],
+ do_append=True)
+ if world_db["WORLD_ACTIVE"]:
+ save_world()
+ io_db["record_chunk"] = ""
+ io_db["save_wait"] = time.time()
+ io_db["worldstate_updateable"] = world_db["WORLD_ACTIVE"]
+ elif 0 != len(tokens):
+ print("Invalid command/argument, or bad number of tokens.")
+
+
+def obey_lines_in_file(path, name, do_record=False):
+ """Call obey() on each line of path's file, use name in input prefix."""
+ file = open(path, "r")
+ line_n = 1
+ for line in file.readlines():
+ obey(line.rstrip(), name + "file line " + str(line_n),
+ do_record=do_record)
+ line_n = line_n + 1
+ file.close()
+
+
+def try_worldstate_update():
+ """Write worldstate file if io_db["worldstate_updateable"] is set."""
+ from server.config.commands import commands_db
+ if io_db["worldstate_updateable"]:
+
+ def write_map(string, map):
+ for i in range(length):
+ line = map[i * length:(i * length) + length].decode()
+ string = string + line + "\n"
+ return string
+
+ inventory = ""
+ if [] == world_db["Things"][0]["T_CARRIES"]:
+ inventory = "(none)\n"
+ else:
+ for id in world_db["Things"][0]["T_CARRIES"]:
+ type_id = world_db["Things"][id]["T_TYPE"]
+ name = world_db["ThingTypes"][type_id]["TT_NAME"]
+ inventory = inventory + name + "\n"
+ string = str(world_db["TURN"]) + "\n" + \
+ str(world_db["Things"][0]["T_LIFEPOINTS"]) + "\n" + \
+ str(world_db["Things"][0]["T_SATIATION"]) + "\n" + \
+ inventory + "%\n" + \
+ str(world_db["Things"][0]["T_POSY"]) + "\n" + \
+ str(world_db["Things"][0]["T_POSX"]) + "\n" + \
+ str(world_db["MAP_LENGTH"]) + "\n"
+ length = world_db["MAP_LENGTH"]
+ fov = bytearray(b' ' * (length ** 2))
+ ord_v = ord("v")
+ for pos in [pos for pos in range(length ** 2)
+ if ord_v == world_db["Things"][0]["fovmap"][pos]]:
+ fov[pos] = world_db["MAP"][pos]
+ length = world_db["MAP_LENGTH"]
+ for id in [id for tid in reversed(sorted(list(world_db["ThingTypes"])))
+ for id in world_db["Things"]
+ if not world_db["Things"][id]["carried"]
+ if world_db["Things"][id]["T_TYPE"] == tid
+ if world_db["Things"][0]["fovmap"][
+ world_db["Things"][id]["T_POSY"] * length
+ + world_db["Things"][id]["T_POSX"]] == ord_v]:
+ type = world_db["Things"][id]["T_TYPE"]
+ c = ord(world_db["ThingTypes"][type]["TT_SYMBOL"])
+ fov[world_db["Things"][id]["T_POSY"] * length
+ + world_db["Things"][id]["T_POSX"]] = c
+ string = write_map(string, fov)
+ mem = world_db["Things"][0]["T_MEMMAP"][:]
+ for mt in [mt for tid in reversed(sorted(list(world_db["ThingTypes"])))
+ for mt in world_db["Things"][0]["T_MEMTHING"]
+ if mt[0] == tid]:
+ c = world_db["ThingTypes"][mt[0]]["TT_SYMBOL"]
+ mem[(mt[1] * length) + mt[2]] = ord(c)
+ string = write_map(string, mem)
+ atomic_write(io_db["path_worldstate"], string, delete=False)
+ strong_write(io_db["file_out"], "WORLD_UPDATED\n")
+ io_db["worldstate_updateable"] = False
--- /dev/null
+import ctypes
+
+
+def mv_yx_in_dir_legal(dir, y, x):
+ """Wrapper around libpr.mv_yx_in_dir_legal to simplify its use."""
+ dir_c = dir.encode("ascii")[0]
+ test = libpr.mv_yx_in_dir_legal_wrap(dir_c, y, x)
+ if -1 == test:
+ raise RuntimeError("Too much wrapping in mv_yx_in_dir_legal_wrap()!")
+ return (test, libpr.result_y(), libpr.result_x())
+
+
+class RandomnessIO:
+ """"Interface to libplomrogue's pseudo-randomness generator."""
+
+ def set_seed(self, seed):
+ libpr.seed_rrand(1, seed)
+
+ def get_seed(self):
+ return libpr.seed_rrand(0, 0)
+
+ def next(self):
+ return libpr.rrand()
+
+ seed = property(get_seed, set_seed)
+
+
+def integer_test(val_string, min, max=None):
+ """Return val_string if integer >= min & (if max set) <= max, else None."""
+ try:
+ val = int(val_string)
+ if val < min or (max is not None and val > max):
+ raise ValueError
+ return val
+ except ValueError:
+ msg = "Ignoring: Please use integer >= " + str(min)
+ if max is not None:
+ msg += " and <= " + str(max)
+ msg += "."
+ print(msg)
+ return None
+
+
+def id_setter(id, category, id_store=False, start_at_1=False):
+ """Set ID of object of category to manipulate ID. Unused? Create new one.
+ The ID is stored as id_store.id (if id_store is set). If the integer of the
+ input is valid (if start_at_1, >= 0, else >= -1), but <0 or (if start_at_1)
+ <1, calculate new ID: lowest unused ID >=0 or (if start_at_1) >= 1. None is
+ always returned when no new object is created, else the new object's ID.
+ """
+ from server.config.world_data import world_db
+ min = 0 if start_at_1 else -1
+ if str == type(id):
+ id = integer_test(id, min)
+ if None != id:
+ if id in world_db[category]:
+ if id_store:
+ id_store.id = id
+ return None
+ else:
+ if (start_at_1 and 0 == id) \
+ or ((not start_at_1) and (id < 0)):
+ id = 0 if start_at_1 else -1
+ while 1:
+ id = id + 1
+ if id not in world_db[category]:
+ break
+ if id_store:
+ id_store.id = id
+ return id
+
+
+def prep_library():
+ """Prepare ctypes library at ./libplomrogue.so"""
+ import os
+ libpath = ("./libplomrogue.so")
+ if not os.access(libpath, os.F_OK):
+ raise SystemExit("No library " + libpath + ", run ./build.sh first?")
+ libpr = ctypes.cdll.LoadLibrary(libpath)
+ libpr.seed_rrand.restype = ctypes.c_uint32
+ return libpr
+
+
+def c_pointer_to_bytearray(ba):
+ """Return C char * pointer to ba."""
+ type = ctypes.c_char * len(ba)
+ return type.from_buffer(ba)
+
+
+def parse_command_line_arguments():
+ """Return settings values read from command line arguments."""
+ import argparse
+ parser = argparse.ArgumentParser()
+ parser.add_argument('-s', nargs='?', type=int, dest='replay', const=1,
+ action='store')
+ parser.add_argument('-l', nargs="?", const="save", dest='savefile',
+ action="store")
+ parser.add_argument('-v', dest='verbose', action='store_true')
+ opts, unknown = parser.parse_known_args()
+ return opts
+
+
+libpr = prep_library()
+rand = RandomnessIO()
+opts = parse_command_line_arguments()
--- /dev/null
+from server.config.world_data import world_db
+from server.io import log
+from server.utils import rand, libpr, c_pointer_to_bytearray
+from server.utils import id_setter
+
+
+def thingproliferation(t, prol_map):
+ """To chance of 1/TT_PROLIFERATE, create t offspring in open neighbor cell.
+
+ Naturally only works with TT_PROLIFERATE > 0. The neighbor cell must be be
+ marked '.' in prol_map. If there are several map cell candidates, one is
+ selected randomly.
+ """
+ from server.config.world_data import directions_db
+ from server.utils import mv_yx_in_dir_legal
+ prolscore = world_db["ThingTypes"][t["T_TYPE"]]["TT_PROLIFERATE"]
+ if prolscore and (1 == prolscore or 1 == (rand.next() % prolscore)):
+ candidates = []
+ for dir in [directions_db[key] for key in sorted(directions_db.keys())]:
+ mv_result = mv_yx_in_dir_legal(dir, t["T_POSY"], t["T_POSX"])
+ if mv_result[0] and ord('.') == prol_map[mv_result[1]
+ * world_db["MAP_LENGTH"]
+ + mv_result[2]]:
+ candidates.append((mv_result[1], mv_result[2]))
+ if len(candidates):
+ i = rand.next() % len(candidates)
+ id = id_setter(-1, "Things")
+ newT = new_Thing(t["T_TYPE"], (candidates[i][0], candidates[i][1]))
+ world_db["Things"][id] = newT
+
+
+def update_map_memory(t, age_map=True):
+ """Update t's T_MEMMAP with what's in its FOV now,age its T_MEMMEPTHMAP."""
+
+ def age_some_memdepthmap_on_nonfov_cells():
+ # OUTSOURCED FOR PERFORMANCE REASONS TO libplomrogue.so:
+ # ord_v = ord("v")
+ # ord_0 = ord("0")
+ # ord_9 = ord("9")
+ # for pos in [pos for pos in range(world_db["MAP_LENGTH"] ** 2)
+ # if not ord_v == t["fovmap"][pos]
+ # if ord_0 <= t["T_MEMDEPTHMAP"][pos]
+ # if ord_9 > t["T_MEMDEPTHMAP"][pos]
+ # if not rand.next() % (2 **
+ # (t["T_MEMDEPTHMAP"][pos] - 48))]:
+ # t["T_MEMDEPTHMAP"][pos] += 1
+ memdepthmap = c_pointer_to_bytearray(t["T_MEMDEPTHMAP"])
+ fovmap = c_pointer_to_bytearray(t["fovmap"])
+ libpr.age_some_memdepthmap_on_nonfov_cells(memdepthmap, fovmap)
+
+ if not t["T_MEMMAP"]:
+ t["T_MEMMAP"] = bytearray(b' ' * (world_db["MAP_LENGTH"] ** 2))
+ if not t["T_MEMDEPTHMAP"]:
+ t["T_MEMDEPTHMAP"] = bytearray(b' ' * (world_db["MAP_LENGTH"] ** 2))
+ ord_v = ord("v")
+ ord_0 = ord("0")
+ for pos in [pos for pos in range(world_db["MAP_LENGTH"] ** 2)
+ if ord_v == t["fovmap"][pos]]:
+ t["T_MEMDEPTHMAP"][pos] = ord_0
+ t["T_MEMMAP"][pos] = world_db["MAP"][pos]
+ if age_map:
+ age_some_memdepthmap_on_nonfov_cells()
+ t["T_MEMTHING"] = [mt for mt in t["T_MEMTHING"]
+ if ord_v != t["fovmap"][(mt[1] * world_db["MAP_LENGTH"])
+ + mt[2]]]
+ for id in [id for id in world_db["Things"]
+ if not world_db["Things"][id]["carried"]]:
+ type = world_db["Things"][id]["T_TYPE"]
+ if not world_db["ThingTypes"][type]["TT_LIFEPOINTS"]:
+ y = world_db["Things"][id]["T_POSY"]
+ x = world_db["Things"][id]["T_POSX"]
+ if ord_v == t["fovmap"][(y * world_db["MAP_LENGTH"]) + x]:
+ t["T_MEMTHING"].append((type, y, x))
+
+
+def build_fov_map(t):
+ """Build Thing's FOV map."""
+ t["fovmap"] = bytearray(b'v' * (world_db["MAP_LENGTH"] ** 2))
+ fovmap = c_pointer_to_bytearray(t["fovmap"])
+ map = c_pointer_to_bytearray(world_db["MAP"])
+ if libpr.build_fov_map(t["T_POSY"], t["T_POSX"], fovmap, map):
+ raise RuntimeError("Malloc error in build_fov_Map().")
+
+
+def new_Thing(type, pos=(0, 0)):
+ """Return Thing of type T_TYPE, with fovmap if alive and world active."""
+ thing = {
+ "T_LIFEPOINTS": world_db["ThingTypes"][type]["TT_LIFEPOINTS"],
+ "T_ARGUMENT": 0,
+ "T_PROGRESS": 0,
+ "T_SATIATION": 0,
+ "T_COMMAND": 0,
+ "T_TYPE": type,
+ "T_POSY": pos[0],
+ "T_POSX": pos[1],
+ "T_CARRIES": [],
+ "carried": False,
+ "T_MEMTHING": [],
+ "T_MEMMAP": False,
+ "T_MEMDEPTHMAP": False,
+ "fovmap": False
+ }
+ if world_db["WORLD_ACTIVE"] and thing["T_LIFEPOINTS"]:
+ build_fov_map(thing)
+ return thing
+
+
+def decrement_lifepoints(t):
+ """Decrement t's lifepoints by 1, and if to zero, corpse it.
+
+ If t is the player avatar, only blank its fovmap, so that the client may
+ still display memory data. On non-player things, erase fovmap and memory.
+ Dying actors drop all their things.
+ """
+ t["T_LIFEPOINTS"] -= 1
+ if 0 == t["T_LIFEPOINTS"]:
+ for id in t["T_CARRIES"]:
+ t["T_CARRIES"].remove(id)
+ world_db["Things"][id]["T_POSY"] = t["T_POSY"]
+ world_db["Things"][id]["T_POSX"] = t["T_POSX"]
+ world_db["Things"][id]["carried"] = False
+ t["T_TYPE"] = world_db["ThingTypes"][t["T_TYPE"]]["TT_CORPSE_ID"]
+ if world_db["Things"][0] == t:
+ t["fovmap"] = bytearray(b' ' * (world_db["MAP_LENGTH"] ** 2))
+ log("You die.")
+ log("See README on how to start over.")
+ else:
+ t["fovmap"] = False
+ t["T_MEMMAP"] = False
+ t["T_MEMDEPTHMAP"] = False
+ t["T_MEMTHING"] = []
+
+
+def try_healing(t):
+ """If t's HP < max, increment them if well-nourished, maybe waiting."""
+ if t["T_LIFEPOINTS"] < \
+ world_db["ThingTypes"][t["T_TYPE"]]["TT_LIFEPOINTS"]:
+ wait_id = [id for id in world_db["ThingActions"]
+ if world_db["ThingActions"][id]["TA_NAME"] == "wait"][0]
+ wait_divider = 8 if t["T_COMMAND"] == wait_id else 1
+ testval = int(abs(t["T_SATIATION"]) / wait_divider)
+ if (testval <= 1 or 1 == (rand.next() % testval)):
+ t["T_LIFEPOINTS"] += 1
+ if t == world_db["Things"][0]:
+ log("You HEAL.")
+
+
+def hunger_per_turn(type_id):
+ """The amount of satiation score lost per turn for things of given type."""
+ import math
+ return int(math.sqrt(world_db["ThingTypes"][type_id]["TT_LIFEPOINTS"]))
+
+
+def hunger(t):
+ """Decrement t's satiation,dependent on it trigger lifepoint dec chance."""
+ if t["T_SATIATION"] > -32768:
+ t["T_SATIATION"] -= hunger_per_turn(t["T_TYPE"])
+ if 0 != t["T_SATIATION"] and 0 == int(rand.next() / abs(t["T_SATIATION"])):
+ if t == world_db["Things"][0]:
+ if t["T_SATIATION"] < 0:
+ log("You SUFFER from hunger.")
+ else:
+ log("You SUFFER from over-eating.")
+ decrement_lifepoints(t)
+
+
+def set_world_inactive():
+ """Set world_db["WORLD_ACTIVE"] to 0 and remove worldstate file."""
+ from server.io import safely_remove_worldstate_file
+ safely_remove_worldstate_file()
+ world_db["WORLD_ACTIVE"] = 0
+
+
+def make_map():
+ """(Re-)make island map.
+
+ Let "~" represent water, "." land, "X" trees: Build island shape randomly,
+ start with one land cell in the middle, then go into cycle of repeatedly
+ selecting a random sea cell and transforming it into land if it is neighbor
+ to land. The cycle ends when a land cell is due to be created at the map's
+ border. Then put some trees on the map (TODO: more precise algorithm desc).
+ """
+
+ def is_neighbor(coordinates, type):
+ y = coordinates[0]
+ x = coordinates[1]
+ length = world_db["MAP_LENGTH"]
+ ind = y % 2
+ diag_west = x + (ind > 0)
+ diag_east = x + (ind < (length - 1))
+ pos = (y * length) + x
+ if (y > 0 and diag_east
+ and type == chr(world_db["MAP"][pos - length + ind])) \
+ or (x < (length - 1)
+ and type == chr(world_db["MAP"][pos + 1])) \
+ or (y < (length - 1) and diag_east
+ and type == chr(world_db["MAP"][pos + length + ind])) \
+ or (y > 0 and diag_west
+ and type == chr(world_db["MAP"][pos - length - (not ind)])) \
+ or (x > 0
+ and type == chr(world_db["MAP"][pos - 1])) \
+ or (y < (length - 1) and diag_west
+ and type == chr(world_db["MAP"][pos + length - (not ind)])):
+ return True
+ return False
+
+ world_db["MAP"] = bytearray(b'~' * (world_db["MAP_LENGTH"] ** 2))
+ length = world_db["MAP_LENGTH"]
+ add_half_width = (not (length % 2)) * int(length / 2)
+ world_db["MAP"][int((length ** 2) / 2) + add_half_width] = ord(".")
+ while (1):
+ y = rand.next() % length
+ x = rand.next() % length
+ pos = (y * length) + x
+ if "~" == chr(world_db["MAP"][pos]) and is_neighbor((y, x), "."):
+ if y == 0 or y == (length - 1) or x == 0 or x == (length - 1):
+ break
+ world_db["MAP"][pos] = ord(".")
+ n_trees = int((length ** 2) / 16)
+ i_trees = 0
+ while (i_trees <= n_trees):
+ single_allowed = rand.next() % 32
+ y = rand.next() % length
+ x = rand.next() % length
+ pos = (y * length) + x
+ if "." == chr(world_db["MAP"][pos]) \
+ and ((not single_allowed) or is_neighbor((y, x), "X")):
+ world_db["MAP"][pos] = ord("X")
+ i_trees += 1
+ # This all-too-precise replica of the original C code misses iter_limit().
+
+
+def make_world(seed):
+ """(Re-)build game world, i.e. map, things, to a new turn 1 from seed.
+
+ Seed rand with seed. Do more only with a "wait" ThingAction and
+ world["PLAYER_TYPE"] matching ThingType of TT_START_NUMBER > 0. Then,
+ world_db["Things"] emptied, call make_map() and set
+ world_db["WORLD_ACTIVE"], world_db["TURN"] to 1. Build new Things
+ according to ThingTypes' TT_START_NUMBERS, with Thing of ID 0 to ThingType
+ of ID = world["PLAYER_TYPE"]. Place Things randomly, and actors not on each
+ other. Init player's memory map. Write "NEW_WORLD" line to out file.
+ """
+
+ def free_pos():
+ i = 0
+ while 1:
+ err = "Space to put thing on too hard to find. Map too small?"
+ while 1:
+ y = rand.next() % world_db["MAP_LENGTH"]
+ x = rand.next() % world_db["MAP_LENGTH"]
+ if "." == chr(world_db["MAP"][y * world_db["MAP_LENGTH"] + x]):
+ break
+ i += 1
+ if i == 65535:
+ raise SystemExit(err)
+ # Replica of C code, wrongly ignores animatedness of new Thing.
+ pos_clear = (0 == len([id for id in world_db["Things"]
+ if world_db["Things"][id]["T_LIFEPOINTS"]
+ if world_db["Things"][id]["T_POSY"] == y
+ if world_db["Things"][id]["T_POSX"] == x]))
+ if pos_clear:
+ break
+ return (y, x)
+
+ rand.seed = seed
+ player_will_be_generated = False
+ playertype = world_db["PLAYER_TYPE"]
+ for ThingType in world_db["ThingTypes"]:
+ if playertype == ThingType:
+ if 0 < world_db["ThingTypes"][ThingType]["TT_START_NUMBER"]:
+ player_will_be_generated = True
+ break
+ if not player_will_be_generated:
+ print("Ignoring: No player type with start number >0 defined.")
+ return
+ wait_action = False
+ for ThingAction in world_db["ThingActions"]:
+ if "wait" == world_db["ThingActions"][ThingAction]["TA_NAME"]:
+ wait_action = True
+ if not wait_action:
+ print("Ignoring beyond SEED_MAP: " +
+ "No thing action with name 'wait' defined.")
+ return
+ world_db["Things"] = {}
+ make_map()
+ world_db["WORLD_ACTIVE"] = 1
+ world_db["TURN"] = 1
+ for i in range(world_db["ThingTypes"][playertype]["TT_START_NUMBER"]):
+ id = id_setter(-1, "Things")
+ world_db["Things"][id] = new_Thing(playertype, free_pos())
+ if not world_db["Things"][0]["fovmap"]:
+ empty_fovmap = bytearray(b" " * world_db["MAP_LENGTH"] ** 2)
+ world_db["Things"][0]["fovmap"] = empty_fovmap
+ update_map_memory(world_db["Things"][0])
+ for type in world_db["ThingTypes"]:
+ for i in range(world_db["ThingTypes"][type]["TT_START_NUMBER"]):
+ if type != playertype:
+ id = id_setter(-1, "Things")
+ world_db["Things"][id] = new_Thing(type, free_pos())
+ from server.config.io import io_db
+ from server.io import strong_write
+ strong_write(io_db["file_out"], "NEW_WORLD\n")
+
+
+def turn_over():
+ """Run game world and its inhabitants until new player input expected."""
+ from server.config.actions import action_db
+ from server.ai import ai
+ id = 0
+ whilebreaker = False
+ while world_db["Things"][0]["T_LIFEPOINTS"]:
+ proliferable_map = world_db["MAP"][:]
+ for id in [id for id in world_db["Things"]
+ if not world_db["Things"][id]["carried"]]:
+ y = world_db["Things"][id]["T_POSY"]
+ x = world_db["Things"][id]["T_POSX"]
+ proliferable_map[y * world_db["MAP_LENGTH"] + x] = ord('X')
+ for id in [id for id in world_db["Things"]]: # Only what's from start!
+ if not id in world_db["Things"] or \
+ world_db["Things"][id]["carried"]: # May have been consumed or
+ continue # picked up during turn …
+ Thing = world_db["Things"][id]
+ if Thing["T_LIFEPOINTS"]:
+ if not Thing["T_COMMAND"]:
+ update_map_memory(Thing)
+ if 0 == id:
+ whilebreaker = True
+ break
+ ai(Thing)
+ try_healing(Thing)
+ hunger(Thing)
+ if Thing["T_LIFEPOINTS"]:
+ Thing["T_PROGRESS"] += 1
+ taid = [a for a in world_db["ThingActions"]
+ if a == Thing["T_COMMAND"]][0]
+ ThingAction = world_db["ThingActions"][taid]
+ if Thing["T_PROGRESS"] == ThingAction["TA_EFFORT"]:
+ action = action_db["actor_" + ThingAction["TA_NAME"]]
+ action(Thing)
+ #eval("actor_" + ThingAction["TA_NAME"])(Thing)
+ Thing["T_COMMAND"] = 0
+ Thing["T_PROGRESS"] = 0
+ thingproliferation(Thing, proliferable_map)
+ if whilebreaker:
+ break
+ world_db["TURN"] += 1