file if found. Set up new in file (io_db["file_in"]) for reading at
io_db["path_in"], and new out file (io_db["file_out"]) for writing at
io_db["path_out"]. Start out file with process hash line of format PID +
file if found. Set up new in file (io_db["file_in"]) for reading at
io_db["path_in"], and new out file (io_db["file_out"]) for writing at
io_db["path_out"]. Start out file with process hash line of format PID +
io_db["file_in"] = open(io_db["path_in"], "w")
io_db["file_in"].close()
io_db["file_in"] = open(io_db["path_in"], "r")
io_db["file_in"] = open(io_db["path_in"], "w")
io_db["file_in"].close()
io_db["file_in"] = open(io_db["path_in"], "r")
def record(cmd, io_db):
"""Append cmd string plus newline to file at path_recordfile. (Atomic.)"""
# This misses some optimizations from the original record(), namely only
def record(cmd, io_db):
"""Append cmd string plus newline to file at path_recordfile. (Atomic.)"""
# This misses some optimizations from the original record(), namely only
- # finishing the atomic write with flush() and fsync() every 15 seconds
- # unless explicitely forced. Implement as needed.
+ # finishing the atomic write with expensive flush() and fsync() every 15
+ # seconds unless explicitely forced. Implement as needed.
path_tmp = io_db["path_record"] + io_db["tmp_suffix"]
if os.access(io_db["path_record"], os.F_OK):
shutil.copyfile(io_db["path_record"], path_tmp)
path_tmp = io_db["path_record"] + io_db["tmp_suffix"]
if os.access(io_db["path_record"], os.F_OK):
shutil.copyfile(io_db["path_record"], path_tmp)
-def obey_lines_in_file(path, name):
- """Call obey() on each line of path's file, use name in input prefix."""
- file = open(io_db["path_worldconf"], "r")
+def obey_lines_in_file(path, name, break_test = None):
+ """Call obey() on each line of path's file, use name in input prefix.
+
+ If break_test function is set, only read the file until it returns True.
+ """
+ file = open(path, "r")
try:
parser = argparse.ArgumentParser()
parser.add_argument('-s', nargs='?', type=int, dest='replay', const=1,
action='store')
setup_server_io(io_db)
# print("DUMMY: Run game.")
try:
parser = argparse.ArgumentParser()
parser.add_argument('-s', nargs='?', type=int, dest='replay', const=1,
action='store')
setup_server_io(io_db)
# print("DUMMY: Run game.")
" (if so late a turn is to be found).")
if not os.access(io_db["path_record"], os.F_OK):
raise SystemExit("No record file found to replay.")
" (if so late a turn is to be found).")
if not os.access(io_db["path_record"], os.F_OK):
raise SystemExit("No record file found to replay.")
else:
if os.access(io_db["path_save"], os.F_OK):
obey_lines_in_file(io_db["path_save"], "save")
else:
if os.access(io_db["path_save"], os.F_OK):
obey_lines_in_file(io_db["path_save"], "save")