Как стать автором
Обновить

Эволюция игрового серверного фреймворка на Python. Часть 1 из 2. Слои инфраструктуры

Python *Программирование *Анализ и проектирование систем *Проектирование и рефакторинг *Разработка игр *

Допустим, у нас большие планы, и мы хотим реализовать серверную часть для всех основных игровых жанров. Однако, прежде, чем приступить к этому, нужно хорошенько подготовиться. Нужно создать такую основу, которая бы подходила для каждой игры, чтобы потом не нужно было ничего переделывать на пол-пути. В том числе и все сделанные на тот момент игры.

Главная наша задача — совместить несовместимое, выработать такое решение, которое бы позволяло создавать игровые приложения любой сложности качественно и максимально быстро! Сочетание этих несовместимых, казалось бы, условий обеспечит нам фреймворк, эволюцию которого мы и намерены здесь проследить. В первой статье будет описано создание инфраструктурного фреймворка, а во второй — разработка логики на его основе. Всего — две статьи на описание методологии разработки всей серверной части.

В качестве языка программирования выберем Python за его простоту и элегантность. Мы начнем в сокетов (asyncio), а закончим HTTP-сервером. Наша задача состоит в том, чтобы код логики не зависел от типа сервера и задействованных сетевых протоколов.

  1. Исходная точка

  2. Разделение логики и инфраструктуры

  3. Разделение форматирования и передачи сообщений

  4. Разделение логики на контроллеры

  5. Состояние

  6. Репозиторий

  7. HTTP-сервер

  8. Выводы

Исходная точка

При разборе клиента мы проследили, как из отображения выделяется логика — сначала модель, а потом и контроллер. Потом мы все игровые действия вместо вызовов методов сделали простыми объектами-командами. Так как отображение и логика взаимодействуют исключительно такими командами, то отображению становится все равно, где находится логика — в том же приложении или в другом, запущенном на иной машине, расположенной за тысячи километров. На клиенте контроллер без логики, таким образом, превращается в простой шлюз между отображением и логикой, между клиентом и сервером.

На сервере, прежде чем команды дойдут до логики, мы должны установить соединение с клиентом и принять от него сообщения. После этого мы декодируем их в объекты команд и передаем их соответствующей функции бизнес-логики на обработку. Таким образом, вокруг логики выстраивается целый массив инфраструктуры, которую нам предстоит упорядочить. Чтобы разобраться в ней самым внимательным образом, начнем с самого простого примера сокет-сервера.

Про то, как реализованы сокеты в Python, мы писали в другом месте. Тут мы продолжим с того, на чем закончили там — с TCP-сервера на asyncio. Добавим в него кодирование и обработку сообщений, а также возможность отправлять их другим клиентам. В результате чего наша первая версия программы будет выглядеть так:

last_index = 0
writers = []

async def handle_connection(reader, writer):
    global last_index
    global writers
    writers.append(writer)
    last_index += 1
    index = last_index
    print("+Connected")
    unparsed_bytes = b""
    while True:
        # Receive
        try:
            request_bytes = await reader.read(1024)
        except ConnectionError:
            break
        if reader.at_eof():
            break  # Disconnected by client
        request_bytes = unparsed_bytes + request_bytes
        request_bytes_list = request_bytes.split(b"\x00")
        unparsed_bytes = request_bytes_list.pop()

        # Process
        for request_bytes in request_bytes_list:
            if not request_bytes:
                continue
            request = request_bytes.decode("utf8")
            print(" >> Received: {repr(request)}")
            try:
                command = json.loads(request)
                to_self_command, to_all_command = await handle_command(index, command)
            except Exception as e:
                print(f"[SERVER#{index}] Error while parsing or processing: {e}")
                to_self_command, to_all_command = {"error": str(e)}, None
            self_response = json.dumps(to_self_command) if to_self_command else None
            all_response = json.dumps(to_all_command) if to_all_command else None
            print(f" << Send: {repr(self_response)} as self_response and commands: "
                  f"{repr(all_response)} to all {len(writers)} connections")
            if self_response:
                to_self_bytes = self_response.encode("utf8") + b"\x00"
                try:
                    writer.write(to_self_bytes)
                    await writer.drain()
                except ConnectionError:
                    pass  # Yet must send to others
            if all_response:
                to_all_bytes = all_response.encode("utf8") + b"\x00"
                for w in writers:
                    try:
                        w.write(to_all_bytes)
                    except ConnectionError:
                        continue
                await asyncio.gather(w.drain() for w in self.writers)
    writers.remove(writer)
    writer.close()
    print(f"-Disconnected")

async def main(host, port):
    print(f"Start server: {host}:{port}")
    server = await asyncio.start_server(handle_connection, host, port)
    async with server:
        await server.serve_forever()

HOST, PORT = "", 5554
if __name__ == "__main__":
    asyncio.run(main(HOST, PORT))

Это был код, отвечающий за пересылку и первичную обработку сообщений. Бизнес-логика реализована в handle_command(). В ней реализовано всего три команды: взять, поместить и изменить (get, set, update). Так как в логике могут присутствовать запросы к базам данным или внешним сервисам, то, чтобы не блокировать выполнение программы на это время, данная функция также сделана асинхронной (async):

storage = {}

async def handle_command(index, command):
    global storage
    key = command.get("key")
    code = command.get("code")
    if code == "get":
        state = storage.get(key)
        return {"success": True, **command, "state": state}, None
    elif code == "set":
        state = command.get("state")
        storage[key] = state
        return {"success": True, **command}, None
    elif code == "update":
        index = command.get("index")
        value = command.get("value")
        if not isinstance(index, int) or not isinstance(value, int):
            return {"success": False, **command}, None
        state = storage.get(key)
        if state is None:
            storage[key] = state = []
        if index >= len(state):
            state += [0] * (index - len(state) + 1)
        state[index] = value
        return None, {"success": True, **command}
    return None, None

Пока что все соединения сохраняются в простой массив writers, а в обработчике команд (handle_command()) мы различаем только два типа адресатов для ответных команд: назад к себе (to_self_command), и ко всем (to_all_command). Это самый примитивный подход и в одном из следующих примеров мы заменим его на перечисление индивидуальных адресатов по их индексам. Обработчик команд будет возвращать массив кортежей, каждый из которых в первом элементе будет содержать список индексов соединений, а во втором список команд, которые им предназначаются: result = [(indexes, commands), (indexes2, commands2)]. Соответственно, все соединения должны будут сохраняться не в списке writers, как раньше, а в словаре writer_by_index. Но пока вернемся к архитектуре приложения.

Разделение логики и инфраструктуры

Сейчас наше приложение цельное и неразделимое, а потому максимально "нереюзабельное". Другими словами, мы не можем использовать отдельные его части повторно, так как все они жестко связаны друг с другом. Хоть логика и вынесена в отдельную функцию handle_command(), но в handle_connection() вместо нее нельзя подставить другую. Если мы захотим написать новое приложение с другой логикой, то нам придется копировать и handle_connection(). Конечно, нас это не устраивает.

В качестве быстрого решения можно передавать ссылку на функцию с логикой (handle_command()) в параметрах handle_connection(). Или можно вынести обе функции в класс, а в подклассах их переопределять. Но лучше всего логику (обработку команд) и инфраструктуру (обмен командами по сети) вообще реализовать в двух разных классах.

Преимущество классов перед отдельными функциями в том, что в них можно хранить собственный контекст из переменных-членов (атрибутов), а также группировать и переопределять в подклассах методы.

Разделение приложения на инфраструктуру (Server) и логику (Logic)
Разделение приложения на инфраструктуру (Server) и логику (Logic)

Разделение на два класса задает два магистральных пути развития в разработке сервера. С одной стороны мы можем подготовить разные типы и версии функционала по передаче объектов (Server), с другой — наделать кучу разных игр (Logic). И все это независимо друг от друга. Единственное условие, которое должно соблюдаться, это чтобы класс логики содержал метод с такой сигнатурой: handle_command(self, index, command):

class SocketServer:
    def __init__(self, logic, host, port):
        self.logic = logic
        self.host = host
        self.port = port
        self.writers = []

    def run(self):
        asyncio.run(self.main())

    async def main(self):
        print(f"Start server: {self.host}:{self.port}")
        server = await asyncio.start_server(handle_connection, host, port)
        async with server:
            await server.serve_forever()

    async def handle_connection(self, reader, writer):
        ...
        # Call:
        # to_self_command, to_all_command = await self.logic.handle_command(index, command)
        # instead of:
        # to_self_command, to_all_command = await handle_command(index, command)

class MyLogic:
    # global storage -> self.storage
    def __init__(self):
      	self.storage = {}

    async def handle_command(self, index, command):
    		...

Код запуска при этом немного изменится:

HOST, PORT = "", 5000
if __name__ == "__main__":
    server = SocketServer(MyLogic(), HOST, PORT)
    server.run()

Таким образом, сервер получает определенный класс логики и обращается к нему согласно интерфейсу. Связь эта односторонняя, так как логика ничего не знает об инфраструктуре.

Это был только первый шаг. Следующим будет — отделение функционала по транспортировке сообщений (сокеты) от способа их кодирования (JSON, YML, XML).

Разделение форматирования и передачи сообщений

Отделить логику приложения от инфраструктуры было хорошей идеей. В результате логика просто преобразуют одни команды в другие, попутно меняя свое состояние. При этом она не задумывается, откуда они берутся и куда деваются. За все это отвечает класс сервера (SocketServer). Вот сервером сейчас и займемся. Посмотрим, на какие части, в свою очередь, распадается он сам.

Заглянув внутрь него, мы увидим, что определенный способ обработки сообщений (JSON) там жестко завязан на определенный способ их передачи (TCP-сокеты). И если мы захотим использовать другой формат передачи данных, то нам заодно придется скопировать кучу кода, к формату не относящегося.

Первое решение, которое приходит на ум — реализовать данные функции в разных методах (выделить методы parse() и serialize()):

class SocketServer:
    def __init__(self, logic, host, port) -> None:
        self.logic = logic
        self.host = host
        self.port = port
        self.last_index = 0
        self.writer_by_index = {}

    async def handle_connection(self, reader, writer):
        self.last_index += 1
        index = self.last_index
        self.writer_by_index[index] = writer
        print(f"[SERVER#{index}] +Connected")
        unparsed_bytes = b""
        while True:
            # Receive
            try:
                request_bytes = await reader.read(1024)
            except ConnectionError:
                break
            if reader.at_eof():
                print(f"[SERVER#{index}] EOF. Connection closed")
                break
            request_bytes = unparsed_bytes + request_bytes
            # Handle
            result, unparsed_bytes = await self.handle_bytes(index, request_bytes)
            # Send response
            await self.send(result)
        print(f"[SERVER#{index}] -Disconnected")
        del self.writer_by_index[index]
        writer.close()

	async def handle_bytes(self, index, request_bytes):
      # Decode request
      request, unparsed_bytes = self.parse(request_bytes)
      # Make response
      try:
          # Parse request
          command = json.loads(request)
          # Process request
          result = self.logic.handle_command(index, command)
      except Exception as e:
          print(f"[SERVER#{index}] Error while parsing or processing: {request} {traceback.format_exc()}")
          result = [([index], [{"error": str(e)}])]
      return result, unparsed_bytes

    async def send(self, result):
        if not result:
            return
        # Serialize
        result = [(indexes, self.serialize(commands))
                  for indexes, commands in result]
        # Send
        wait_writers = []
        for indexes, response_bytes in result:
            for i in indexes:
                writer = self.writer_by_index.get(i)
                if writer:
                    try:
                        writer.write(response_bytes)
                        wait_writers.append(writer)
                    except ConnectionError:
                        continue
        await asyncio.gather(writer.drain() for writer in wait_writers)

    def parse(self, data_bytes):
        request_bytes, unparsed_bytes = request_bytes.split(b"\x00")
        request = request_bytes.decode("utf8")
        return request, unparsed_bytes

    def serialize(self, data):
        return json.dumps(commands).encode("utf8") + b"\x00"

Теперь, например, если мы захотим изменить JSON на YML, то нам нужно всего лишь наследоваться от SocketServer и переопределить parse() и serialize(). Но при таком подходе остается все тот же недостаток при комбинировании разных функционалов. Для каждой комбинации придется создавать отдельный класс: JSONTCPSocketServer, YMLUDPSocketServer, JSONHTTPServer и так далее. Рассмотрим данный вопрос подробнее.

Код по доставке (сокеты) и код по форматированию (JSON) остается в одном классе. Это значит, что если нужно создать классы для двух типов сокетов (TCP и UDP) и трех видов форматов (JSON, YML, XML), то в итоге мы получим 2 * 3 = 6 классов для всех возможных комбинаций. Хотя должно быть по идее 2 + 3 = 5. Пусть 6 и 5 отличаются не сильно, но иметь в качестве закона возрастания кода умножение вместо сложения дает уже на следующем этапе избыточность в 33 % (3 * 3 = 9, 3 + 3 = 6). И то, что код при этом не дублируется — заслуга Python'а (множественное наследование), а не наша.

Выделение парсера как отдельного инфраструктурного слоя
Выделение парсера как отдельного инфраструктурного слоя

Поэтому лучше поступить по-нормальному и разнести доставку и первичную обработку сообщений по разным классам. В данном случае вынести из класса сервера парсер:

class SocketServer:
    def __init__(self, parser, logic, host, port) -> None:
        self.parser = parser
        self.logic = logic
        self.host = host
        self.port = port
        self.last_index = 0
        self.writer_by_index = {}
		# ...
    async def handle_bytes(self, index, request_bytes):
        try:
            # Parse
            commands, unparsed_bytes = self.parser.parse(request_bytes)
            # Handle
            result = await self.logic.handle_commands(index, commands)
        except Exception as e:
            result = [([index], [{"error": str(e)}])]
						unparsed_bytes = b""
        return result, unparsed_bytes

    async def send(self, result):
        if not result:
            return
        # Serialize
        result = [(indexes, self.parser.serialize(commands))
                  for indexes, commands in result]
        # Send
        wait_writers = []
        for indexes, response_bytes in result:
            for i in indexes:
                writer = self.writer_by_index.get(i)
                if writer:
                    try:
                        writer.write(response_bytes)
                        wait_writers.append(writer)
                    except ConnectionError:
                        continue
        await asyncio.gather(writer.drain() for writer in wait_writers)

class Parser:
    def parse(self, data_bytes):
        return data_bytes, b""

    def serialize(self, data):
        return data

class JSONParser(Parser):
    def parse(self, data_bytes):
        # Get unparsed_bytes
        data_bytes, unparsed_bytes = data_bytes.rsplit(b"\x00", 1)
        # bytes -> list of str
        data_str = data_bytes.decode("utf8")
        message_list = data_str.split("\x00")
        # Parse JSON commands (suppose, a command cannot be a list)
        result = []
        for message in message_list:
            if not message:
                continue
            commands = json.loads(message)
            if not commands:
                continue
            if isinstance(commands, list):
                result.extend(commands)
            else:
                result.append(commands)
        return result, unparsed_bytes

    def serialize(self, data):
        if not data:
            return b""
        data_str = json.dumps(data)
        data_bytes = data_str.encode("utf8") + b"\x00"
        return data_bytes

class MyLogic:
    async def handle_commands(self, index, commands):
        # Custom logic
        result = []
        for command in commands:
            key = command.get("key")
            code = command.get("code")
            ...
		return result

HOST, PORT = "", 5000
if __name__ == "__main__":
    server = SocketServer(JSONParser(), MyLogic(), HOST, PORT)
    server.run()

Остановимся на кое-каких технических деталях.

Сервер может за раз принять одно сообщение, а может принять и несколько — в зависимости от того, сколько в потоке байтов присутствует разделителей. Сколько было в буфере, столько и возвращает. Да и в каждом сообщении, в принципе, можно отправлять сразу несколько команд вместо одной. Поэтому логично в парсере возвращать сразу список команд и условиться, что всегда возвращаться будет только список. Соответственно, и handle_command() будет принимать и возвращать команды только списками. Потому он и переименован теперь в handle_commands().

То, что мы передаем в парсер байты, а не декодированные в UTF-8 строки, позволяет нам реализовывать в них собственные кастомные бинарные протоколы. Разграничение сообщений нулевым символом (b"\x00") помещено в парсер по этой же причине. Если удалось распарсить команду, она возвращается. Если нет — возвращаются байты, чтобы позже к ним добавить новые и повторить попытку.

Как вы могли заметить, мы перешли от массовой адресации сообщений к точечной, о которой уже говорили выше. То есть вместо отсылки данных всем соединениям в списке writers, мы можем теперь отправлять команды по конкретным индексам соединений из словаря writer_by_index. Для этого handle_commands() к каждому списку команд добавляет еще список индексов соединений, которым они предназначены.

Чтобы по-прежнему можно было делать массовую отправку, нужно сохранять все индексы в хранилище при подключении, а при отключении — удалять их из списка. Поэтому в класс логики добавляется еще пара методов: on_connect() и on_disconnect():

class SocketServer:
		# ...
    async def handle_connection(self, reader, writer):
        ...
        result = []
        await self.logic.on_connect(index, result)
        await self.send(result)
        unparsed_bytes = b""
        while True:
            ...
        result = []
        await self.logic.on_disconnect(index, result)
        await self.send(result)
        del self.writer_by_index[index]
        writer.close()

class MyLogic:
    def __init__(self):
      	self.storage = {}

    async def on_connect(self, index, result):
        indexes = self.storage.get("indexes")
        if indexes is None:
          	self.storage["indexes"] = [index]
        else:
          	indexes.append(index)

    async def on_disconnect(self, index, result):
        indexes = self.storage.get("indexes")
        if indexes and index in indexes:
          	indexes.remove(index)

    async def handle_commands(self, index, commands):
        # Custom logic
        result = []
        all_indexes = self.storage.get("indexes")
        for command in commands:
            key = command.get("key")
            code = command.get("code")
            if code == "get":
                state = storage.get(key)
                result.append((all_indexes, [{"success": True, **command, "state": state}]))
            elif code == "set":
                state = command.get("state")
                storage[key] = state
                result.append(([index], [{"success": True, **command}]))
            elif code == "update":
                index = command.get("index")
                value = command.get("value")
                if not isinstance(index, int) or not isinstance(value, int):
                    result.append(([index], [{"success": False, **command}]))
                    continue
                state = storage.get(key)
                if state is None:
                  	storage[key] = state = []
                if index >= len(state):
                  	state += [0] * (index - len(state) + 1)
                state[index] = value
                result.append((all_indexes, [{"success": True, **command}]))
		return result

Разделение логики на контроллеры

Допустим далее, что нам нужно сделать серверы по игре в шахматы, шашки, крестики-нолики... Для каждого создается отдельный класс логики, где переопределяется лишь один метод handle_commands() — и никакого дублирования кода. Передаем в конструктор SocketServer первым аргументом объект логики, и сервер готов к использованию:

class ChessLogic(MyLogic):
    async def handle_commands(self, index, commands):
    		...

class CheckersLogic(MyLogic):
    async def handle_commands(self, index, commands):
    		...

HOST, PORT = "", 5000
if __name__ == "__main__":
    server = SocketServer(JSONParser(), CheckersLogic(), HOST, PORT)
    server.run()

Но возможно, у вас уже возник закономерный вопрос. А что, если мы захотим создать сервер, где можно было бы по выбору играть и в шахматы, и в шашки? Вот тут уже придется изгаляться. Здесь нужен какой-то класс-диспетчер логики, который будет перенаправлять команды к соответствующему обработчику:

class ComboLogic:
    def __init__(self) -> None:
        self.chess = ChessLogic()
        self.checkers = CheckersLogic()
        # Use same storage for all
        self.chess.storage = self.checkers.storage = self.storage = {}

    async def on_connect(self, index, result):
    		...

    async def on_disconnect(self, index, result):
    		...

    async def handle_commands(self, index, commands):
        # Custom logic
        result = []
        for command in commands:
            key = command.get("key")
            if key == "chess":
                result.extend(await self.chess.handle_commands(index, commands))
            elif key == "checkers":
                result.extend(await self.checkers.handle_commands(index, commands))
		return result

HOST, PORT = "", 5000
if __name__ == "__main__":
    server = SocketServer(JSONParser(), ComboLogic(), HOST, PORT)
    server.run()

Несложно заметить в цикле обработки команд явное дублирование кода (handle_commands() и extend()). Попробуем его устранить с помощью словаря:

class ComboLogic:
    def __init__(self) -> None:
        self.logic_by_key = {
            "chess": ChessLogic(parser),
            "checkers": CheckersLogic(parser),
        }
        self.storage = {}
        # Provide common storage
        for logic in self.logic_by_key.items():
            logic.storage = self.storage
		# ...
    async def handle_commands(self, index, commands):
    	# Custom logic
    	result = []
    	for command in commands:
        key = command.get("key")
        logic = self.logic_by_key.get(key)
        if logic:
            result.extend(await logic.handle_commands(i, [command]))
		return result

HOST, PORT = "", 5000
if __name__ == "__main__":
    logic_by_key = {
        "chess": ChessLogic(),
        "checkers": CheckersLogic(),
    }
    server = SocketServer(JSONParser(), ComboLogic(logic_by_key), HOST, PORT)
    server.run()

Сейчас класс логики выбирается по одному из свойств команды — key. Но впоследствии, когда будут реализованы комнаты (rooms) и перемещения игроков по ним, можно будет выбирать обработчик команды по тому, в какой комнате, в какой игре находится пользователь. Если он в покер-руме, по умолчанию берется логика покера, если за шахматным столом — логика шахмат.

Выделение из логики менеджера контроллеров (Application) как часть инфраструктуры
Выделение из логики менеджера контроллеров (Application) как часть инфраструктуры

Налицо фактическое разделение логики на два типа классов: диспетчер (ComboLogic) и собственно реализации логики (ChessLogic, CheckersLogic). У последних из общего только интерфейс с сигнатурой метода handle_commands(). А диспетчер всегда один и тот же для всех приложений и никогда не меняется. Фактически он превратился в движок приложения, поэтому его уместнее будет переименовать в Engine, или лучше — в Application. А логику отдельных игр тогда — в контроллеры:

class Application:
    def __init__(self, default_controller, controller_by_key=None) -> None:
        self.default_controller = default_controller
        self.controller_by_key = controller_by_key or {}
        self.storage = {}  # App state

    async def on_connect(self, index, result):
        if self.default_controller:
          	self.default_controller.on_connect(storage, index, result)

    async def on_disconnect(self, index, result):
        if self.default_controller:
          	self.default_controller.on_disconnect(storage, index, result)

    async def handle_commands(self, index, commands):
        result = []
        # Handle
        for command in commands:
            key = command.get("key")
            controller = self.controller_by_key.get(key, self.default_controller)
            if controller:
                await controller.handle_command(self.storage, index, command, result)
        return result

class MyController:
		# To be able to send commands to all current connections
    async def on_connect(self, storage, index, result):
        indexes = storage.get("indexes")
        if indexes is None:
            storage["indexes"] = [index]
        else:
            indexes.append(index)

    async def on_disconnect(self, storage, index, result):
        indexes = storage.get("indexes")
        if indexes and index in indexes:
         		indexes.remove(index)

    async def handle_command(self, storage, index, command, result):
    		...

class ChessController:
    async def handle_command(self, storage, index, command, result):
    		...

class CheckersController:
    async def handle_command(self, storage, index, command, result):
    		...

HOST, PORT = "", 5000
if __name__ == "__main__":
    controller_by_key = {
      "chess": ChessLogic(),
      "checkers": CheckersLogic(),
    }
    app = Application(MyController(), controller_by_key)
    server = SocketServer(JSONParser(), app, HOST, PORT)
    server.run()

Содержимое методов on_connect() и on_disconnect() было вынесено из приложения в контроллер по умолчанию, так как данная логика весьма специфическая и может меняться от приложения к приложению. Мы не должны для этого переопределять класс Application.

Отметим также, что в контроллерах теперь не handle_commands(), а handle_command(). То есть команды обрабатываются по одной. Это удобнее, так как не нужно каждый раз делать обработку в цикле. И главное — диспетчер все равно будет передавать на обработку по одной команде, так как любая команда в массиве может требовать своего собственного обработчика.

Еще, результат больше не возвращается через return, а передается в виде списков в аргументах. Это тоже упрощает реализацию обработчиков.

И последнее. Так как контроллеры — это в сущности всего лишь часть общей логики приложения, то все они должны использовать одно общее состояние (storage). Сами контроллеры состояния не имеют и иметь не могут. Они — логика в самом чистом виде. Поэтому при каждом вызове handle_command() среди прочих аргументов передается и ссылка на состояние приложения.

Состояние

Скажем напоследок пару слов о состоянии и о том, как оно у нас получилось таким, каким получилось.

В ООП подходе все само собой складывается так, что для каждой логической сущности создается программный объект с свойствами и методами. В свойствах хранится текущее состояние сущности, а в методах реализуются функции, которые это состояние изменяют.

При таком подходе все состояние приложения размазано тонким слоем по десяткам и сотням таких объектов. Чтобы сохранить состояние всего приложения в файл, придется обойти все объекты, собрать все их свойства и перевести в простые JSON-объекты. А чтобы загрузить, восстановить приложение из файла, придется воссоздать по иерархии JSON-объектов иерархию наших программных объектов, определить нужный класс для каждого, учитывая параметры конструкторов, и потом восстановить значения всех его свойств (даже приватных). В общем, ясно, что это очень и очень сложно и муторно.

Тут сначала может появиться идея, что все свойства объекта можно просто хранить в словаре. И не перебирать свойства объекта, когда его нужно сохранить, а просто отдавать этот словарь. Следующей мыслью возникает вопрос. А зачем нам вообще восстанавливать все эти объекты — их иерархию и внутреннее состояние? Почему не оперировать изначально чистой JSON-структурой? Тогда и объекты никакие нужны не будут, а будут одни функции. Простой набор функций.

По счастью, Python мультипарадигменный язык программирования, и на нем можно писать и в ООП-стиле, и в процедурном, и в функциональном. Мы начали с самой простой возможной реализации сервера — процедурной. Поэтому состояние у нас было изначально в отдельном словаре, общем на все приложение.

Когда мы перешли к ООП, мы сохранили использование централизованного состояния. Мы не стали его распределять по классам, потому что в этом не было никакого смысла. Классы мы применяли лишь для группировки функций и возможности подменять реализации некоторых из них в подклассах (см. паттерн шаблонный метод). (Если же мы сможем так организовать методы, чтобы вообще не менять в них состояние, то перейдем к математической концепции функций — к функциональному стилю.)

Повезло нам с состоянием? Не совсем. Все дело в методике разработки. Всегда нужно начинать с самого простой возможной версии, а потом добавлять в нее только то, без чего нельзя обойтись. Тогда про многие проблемы вы даже и не узнаете, что они бывают.

Репозиторий

В объектах команд помимо названия действия, которое нужно выполнить, также обычно указывается объекты, которые в этом участвуют. Например, чтобы передвинуть что-то на игровом поле, нужно явно определить, что вы будете двигать и куда. Если поле имеет декартову систему координат, то объекты можно указать через координаты. В противном случае придется использовать уникальные идентификаторы (id). Впрочем идентификаторы можно часто применять и параллельно с координатами (указывать или то, или другое).

Также возможны команды, которые будут требовать обращения к свойствам объекта. Тогда поля в команде могут иметь следующий формат: "{id}.{property}". Таким же образом можно обращаться и к различным вложенным объектам, например: "id1.inner_id2.inner_id3".

Сейчас состояние реализовано простым классом dict. Поэтому в коде мы не можем просто вызывать: object = storage.get("id1").get("inner_id2").get("inner_id3"), так как какого-то промежуточного элемента может и не быть, и тогда возникнет исключение. Но и делать проверки для каждого id мы не можем, так как для этого придется добавить много "глупого" кода. Мы не хотим загромождать нашу логику разными дурацкими проверками, но и не добавить их тоже не можем. Поэтому в идеале для получения объекта должна вызываться только одна функция: object = storage.get("id1.inner_id2.inner_id3") (все проверки и прочая логика должны выполняться в ней автоматически). А для этого придется создать новый класс для хранилища:

class Storage:
    def __init__(self):
        self.storage = {}

    def get(self, path):
        return resolve_path(self.storage, path)

    def set(self, path, value):
        ...

    def update(self, path, value):
      	...

    def delete(self, path):
      	...

def resolve_path(target, path=None):
    if not path or target is None:
        return target
    current = target
    keys = path.split(".")
    for key in keys:
        if isinstance(current, dict):
            current = current.get(key)
        else:
            return None
    return current

Аналогично get() будут выполнены и остальные методы: set(), update(), delete(). Если промежуточный вложенный объект отсутствует, то можно либо возвращать None, либо создавать пустой dict на его месте. Также можно добавить в get() значение по умолчанию (get(path, default=None)), которое будет установлено (set()), если get(path) возвращает None.

Когда у нас есть собственный класс вместо стандартного, нам становится проще добавлять в него новые функции. Например, хранилище можно синхронизировать с базой данных или делать периодическое автосохранение полностью всего состояния в файл, чтобы игру можно было восстановить при случайном падении сервера.

В Application, таким образом, можно подставлять разные реализации хранилища, лишь бы они использовали тот же интерфейс. Тогда можно выбирать нужную стратегию работы с данными (получение и хранение) без всяких изменений со стороны контроллеров. Бизнес-логика не будет даже подозревать откуда берутся данные и сохраняются ли они в БД или нет. Это не их забота.

Выделение хранилища (Repository) как отдельного слоя инфраструктуры
Выделение хранилища (Repository) как отдельного слоя инфраструктуры

В DDD такой фасад для доступа к данным, который к тому же может поддерживать их целостность и актуальность, называется репозиторием (Repository). Поэтому мы вполне можем использовать и это название вместо Storage.

Сюда же можно добавить и настройки приложения. Чтобы не путать их с состоянием, добавим для них метод: getconfig(path). Все настройки приложения можно хранить в специальном файле (лучше всего для этого подходит формат YML) и загружать при его запуске. Поэтому методы setconfig() и updateconfig() не нужны.

Начальные состояния объектов также можно хранить в настройках, откуда оно будет копироваться при создании объекта состояния. На этот случай можно также создать отдельный метод:

class Repository:
    # ...
    def create(self, config_path=None, initial=None):
        if self.state is None:
            return None
        # Get initial
        config = self.getconfig(config_path, {}) if config_path else {}
        initial = {**config, **initial}
        id = initial.get("id")
        if id is None:
            # Generate id
            ...
        # Set
        return self.set(id, initial)

Так как теперь может существовать несколько вариаций хранилища, мы должны иметь возможность задать одну из них при инициализации приложения:

class Application:
    def __init__(self, default_controller, controller_by_key=None, storage=None) -> None:
        self.default_controller = default_controller
        self.controller_by_key = controller_by_key or {}
        self.storage = storage if storage else Repository()  # App state

HOST, PORT = "", 5000
if __name__ == "__main__":
    controller_by_key = {
        "chess": ChessLogic(),
        "checkers": CheckersLogic(),
    }
    app = Application(MyController(), controller_by_key, Repository())
    server = SocketServer(JSONParser(), app, HOST, PORT)
    server.run()

HTTP-сервер

Без сокет-сервера не обойтись, если игра многопользовательская, и нужно как можно быстрее оповещать всех участников о происходящий в приложении событиях. Но если мы строим ферму или наряжаем ёлочку, то нам достаточно простого HTTP-сервера, ведь все события мы генерируем сами на клиенте. Даже если что-то происходит само на сервере, то это происходит прогнозируемым образом. А значит, в одном из сообщений сервер даст знать клиенту, в какой момент тому нужно сделать запрос, чтобы проверить, не случилось ли чего. Давайте теперь посмотрим, насколько сильно изменится наша реализация при использовании HTTP-протокола.

HTTP-сервер — это тоже сокет-сервер, но с тем отличием, что соединение разрывается сразу после отправки первого же ответного сообщения. Алгоритм его работы такой: сокет-соединение устанавливается, принимается запрос от клиента, он обрабатывается, и отсылается ответное сообщение на клиент. В конце каждой отправки соединение тут же закрывается.

Изолирование логики от инфраструктуры позволяет нам использовать логику повторно с любым типом серверов без всяких изменений в классах логики. Единственное, что будет меняться — это кое-какие слои инфраструктуры. В инфраструктуре первым делом изменяется способ транспортировки сообщений — слой Server.

HTTP — протокол стандартный и широко известный. Поэтому существует множество реализаций такого типа серверов, в том числе и на Python: Django, Twisted, Tornado. Мы выберем один из самых популярных и минималистичных — Flask. Принцип его работы можно проиллюстрировать следующим примером:

import json
from flask import Flask, send_file, request

app = Flask(__name__)

def handle(request):
    return {}

@app.route("/storage/<key>")
def storage(key):
    response = handle(request)
    return response

Главная задача данных фреймворков — это преобразование строковых HTTP-сообщений в объекты request и response. Чтобы преобразовать эти объекты в привычные нам команды и обратно, создадим специальный FlaskParser. Допустим клиент использует RESTful API (это когда назначение запроса — взять, задать, изменить — определяется методом: get, post, patch):

class FlaskParser(Parser):
    command_by_alias = {
        "GET": "get",
        "POST": "save",
        "PATCH": "update",
    }

    def parse(self, request):
        # Parse
        values = request.values
        data_str = values.get("data")
        data = json.loads(data_str) if data_str else None
        if data is None:
            data = {}
        # Prepare command
        code = data.get("code")
        if not code:
            code = values.get("_method") or request.method
        data["code"] = self.command_by_alias.get(code, code)
        data["key"] = request.view_args.get("key")
        return data, b""

    # No real serialization needed here
    def serialize(self, command):
        return command

Также, поскольку HTTP-сервер запускается зачастую в нескольких процессах, которые все должны разделять общее состояние, то нам придется использовать репозиторий, хранящий данные не в памяти, а в какой-нибудь БД.

Итого, изменяется всего три слоя: сервер, парсер и репозиторий. С репозиторием ничего нового — синхронизировать данные с БД часто бывает нужно и в сокет-серверах. Класс сервера можно также использовать старый, так как там нам нужен только метод handle_bytes(). Для унификации его можно переименовать в более абстрактный handle_requests() и вынести весь код, кроме handle_connection() в базовый класс Server. В конце концов действительно уникальным для HTTP-сервера классом будет только парсер:

import json
from flask import Flask, send_file, request

app = Flask(__name__)
controller_by_key = {
    "chess": ChessLogic(),
    "checkers": CheckersLogic(),
}
application = Application(MyController(), controller_by_key, DBRepository())
server = Server(FlaskParser(), application)

@app.route("/storage/<key>")
async def storage(key):
    return await server.handle_bytes(key, request)[0]

Одно из испытаний на универсальность наша схема успешно выдержала.

Выводы

Разбиение всего приложения на несколько независимых друг от друга слоев позволяет классы каждого из них разрабатывать отдельно от классов других слоев. Все, что от них требуется — это держаться в рамках заданных для них интерфейсов. Если интерфейсы остаются неприкосновенными, то любые изменения внутри слоя никак не отразятся на остальных. В этом и заключается вся прелесть слоистой архитектуры.

Четыре слоя инфраструктуры и их отношение к логике (Controller)
Четыре слоя инфраструктуры и их отношение к логике (Controller)

При разбиении цельного приложения на слои мы сначала выделили две основные логические части: инфраструктуру и бизнес-логику. Первая в последствии разделилась на Server и Parser. А из второй отделились еще два инфраструктурных слоя: Application и Repository. В результате данные в программе обрабатываются по следующей цепочке:

Server → Parser → Application → Controller → Repository

Инфраструктура — это все то, что не относится напрямую к логике, но помогает ей выполнять свои задачи. Будучи общей для самых разных задач, ее можно вынести в основную библиотеку классов. А поскольку эти классы также задают всю структуру приложения, составляют его каркас, то его с полным правом можно назвать фреймворком.

Отделив все вспомогательные функции и вынеся их в специальный инфраструктурный фреймворк, мы получили бизнес-логику в чистом виде — в виде контроллеров. Настолько чистом, что они не зависят даже от самих себя (т.е. друг от друга). О том, как писать бизнес-логику правильно, и на какие слои разбивается она сама, можно узнать в следующей, заключительной статье.

Исходники

< Назад | Начало | Вперед >

Теги:
Хабы:
Рейтинг 0
Просмотры 1.6K
Комментарии Комментарии 1