Как стать автором
Обновить

Решение проблемы «падения» процессов в приложении, работающее 24/7 в режиме мультипроцессинга

Уровень сложностиСредний
Время на прочтение9 мин
Количество просмотров2.7K

Постановка проблемы

Есть приложение, в котором выполняет несколько функций, например, сбора данных из различных источников, их обработки и помещения результатов в БД. Приложение, по задумке, должно работать 24/7, чтобы в любой момент можно было подключиться к БД и получить свежайшую информацию.

Но вот незадача... Вроде бы весь код отлажен, работа приложения стабильна, но в какие‑то моменты замечается, что «бах» и процесс пропал. Ни ошибки в логах, ни сигналов, ничего нет. И как ловить, не очень понятно, а работа стоит и надо как‑то запускаться. На отладку нет много времени.

Симуляция

Представим, что есть две простые функции, которые что‑то делают (не важно, чем они заняты). Если мы говорим о мультипроцессинге, то мы говорим, что каждая функция запускается в системе отдельно, как отдельная задача, отследить которую можно в Диспетчере задач. Значит у процесса есть PID.

Представим, что третий процесс — «следилка». Следилка занимается тем, что она отслеживает существует ли ещё процесс с таким PID в системе. Если работа функции завершается или процесс самопроизвольно погибает, то её задача увидеть это (т. е. не увидеть такой PID) и перезапустить процесс, заново поместив туда функцию.

Очевидно, что нужно имитировать падение процесса, т. е. должна быть функция, которая искусственно убивает процесс.

Также хочется, чтобы уж быть максимально приближенным к реальности, не передавать переменные между процессами, чтобы все работали автономно.

Реализация

Вот пример двух простых функций, которые живут и что‑то делают (можно не смотреть скрытый текст, т. н. функции будут представлены также в полных текстах решения):

Hidden text
def proc_1(timepass=1, repeat=20):
    repeated = True
    i = 0
    while repeated:
        x = random.uniform(0, timepass)
        i += 1
        if i < repeat:
            print("Proc_1: " + str(i) + ", will fall asleep for: " + str(x))
        else:
            repeated = False
            print("Proc_1 COMPLETED!")
            break
        time.sleep(x)


def proc_2(timepass=1, repeat=20):
    repeated = True
    i = 0
    while repeated:
        x = random.uniform(0, timepass)
        i += 1
        if i < repeat:
            print("Proc_2: " + str(i) + ", will fall asleep for: " + str(x))
        else:
            repeated = False
            print("Proc_2 COMPLETED!")
            break
        time.sleep(x)

Было реализовано два примера, которые отличаются тем, что функция, отвечающая за слежение и восстановление, в первом случае, живёт в основном процессе приложения, а во втором — запускается отдельным процессом. Второй случай усложняется тем, что перезапуск нового процесса производится из функции восстановления и если выполнить join в ней, то сама она также будет ждать. Решением это проблемы явилось запуск join в отдельном потоке.

Второй вариант, кстати, выглядит приятней и логичней.

Отдельным моментом является понимание того, в какой операционной системе вы запускаете работу приложения. Пришлось делать «оговорку» на Linux по понятным причинам.

Вот первый вариант реализации (полный текст решения):

# Работа с процессами - рабочая версия

import multiprocessing as mp
import time
import random
import psutil
import os
import platform
import fnmatch
import signal
import subprocess


class bcolors:
    HEADER = '\033[95m'
    OKBLUE = '\033[94m'
    OKCYAN = '\033[96m'
    OKGREEN = '\033[92m'
    WARNING = '\033[93m'
    FAIL = '\033[91m'
    ENDC = '\033[0m'
    BOLD = '\033[1m'
    UNDERLINE = '\033[4m'
    
def proc_1(timepass=1, repeat=20):
    repeated = True
    i = 0
    while repeated:
        x = random.uniform(0, timepass)
        i += 1
        if i < repeat:
            print("Proc_1: " + str(i) + ", will fall asleep for: " + str(x))
        else:
            repeated = False
            print("Proc_1 COMPLETED!")
            break
        time.sleep(x)


def proc_2(timepass=1, repeat=20):
    repeated = True
    i = 0
    while repeated:
        x = random.uniform(0, timepass)
        i += 1
        if i < repeat:
            print("Proc_2: " + str(i) + ", will fall asleep for: " + str(x))
        else:
            repeated = False
            print("Proc_2 COMPLETED!")
            break
        time.sleep(x)


def process_killer(PID_list, timepass, exclusion_list=[]):
    while True:
        system = platform.system()
        x = random.uniform(0, timepass)
        time.sleep(x)
        this_process_pid = os.getpid()
        this_process = psutil.Process(this_process_pid)
        this_process_info_as_dict = this_process.as_dict()
        parent_process_pid = this_process.ppid()
        parent_process = psutil.Process(parent_process_pid)
        parent_process_as_dict = parent_process.as_dict()
        parent_process_children = parent_process.children(recursive=True)
        child_pid_list = []
        for i in range(len(parent_process_children)):
            child_info_as_dict = parent_process_children[i].as_dict()
            if child_info_as_dict['pid'] != this_process_pid:
                child_pid_list.append(child_info_as_dict['pid'])
        child_pid_list = list(set(child_pid_list) - set(exclusion_list))
        child_pid_list = list(set(child_pid_list) - set([this_process_pid]))
        # for i in range(len(child_pid_list)):
        #     print(f'Process_{i+1} PID: {child_pid_list[i]}')
        temp_str = '\n'.join([f'Process_{i + 1} PID: {child_pid_list[i]}' for i in range(len(child_pid_list))])
        print(f"{temp_str}")

        if len(child_pid_list) > 0:
            if len(child_pid_list) > 1:
                number = random.randint(0, len(child_pid_list) - 1)
            else:
                number = 0
            kill_proc = psutil.Process(child_pid_list[number])
            kill_process_info_as_dict = kill_proc.as_dict()
            if psutil.pid_exists(kill_process_info_as_dict['pid']):
                if fnmatch.fnmatch(kill_process_info_as_dict['name'], "python*"):
                    print("We kill the process with PID", kill_process_info_as_dict['pid'])
                    try:
                        process = psutil.Process(kill_process_info_as_dict['pid'])
                    except psutil.NoSuchProcess:
                        print(f"Process with PID {child_pid_list[number]} not found.")
                        continue
                    else:
                        if system == "Windows":
                            kill_proc.kill()
                        elif system == "Linux":
                            os.kill(kill_process_info_as_dict['pid'], signal.SIGTERM)
                            # os.kill(kill_process_info_as_dict['pid'],signal.SIGKILL)
                            # subprocess.call(["kill", str(kill_process_info_as_dict['pid'])])

                        print(f"{bcolors.FAIL}Process with PID {kill_process_info_as_dict['pid']} killed.{bcolors.ENDC}")
                        child_pid_list.remove(kill_process_info_as_dict['pid'])




if __name__ == "__main__":
    print(f'{bcolors.OKGREEN}Начало работы программы!{bcolors.ENDC}')

    process_name_list = ["process_1", "process_2"]

    process_1 = mp.Process(target=proc_1, kwargs={'timepass': 2, 'repeat': 30})
    process_2 = mp.Process(target=proc_2, kwargs={'timepass': 3, 'repeat': 30})

    process_1.start()
    process_2.start()

    PID_list = [process_1.pid, process_2.pid]

    # process_recov = mp.Process(target=process_recovery, kwargs={'process_pid_list': PID_list})
    # process_recov.start()

    process_kill = mp.Process(target=process_killer,
                              kwargs={'PID_list': PID_list, 'timepass': 10, 'exclusion_list': []})
    process_kill.start()

    PID_list.append(process_kill.pid)

    system = platform.system()

    while True:
        if system == "Linux":
            os.wait()
        for i in range(len(PID_list)):
            try:
                if psutil.pid_exists(PID_list[i]):
                    pass
                    # print(f'Process with PID {process_pid} is alive')
                else:
                    print(f'Process with PID {PID_list[0]} is dead')
                    print(f"{bcolors.WARNING}Restoring the process{bcolors.ENDC}")
                    if PID_list[i] == PID_list[0]:
                        process = mp.Process(target=proc_1, kwargs={'timepass': 2, 'repeat': 30})
                    if PID_list[i] == PID_list[1]:
                        process = mp.Process(target=proc_2, kwargs={'timepass': 3, 'repeat': 30})
                    process.start()
                    old_pid = PID_list[i]
                    PID_list[i] = process.pid

                    temp_str=""
                    for i in range(len(PID_list)):
                        if  PID_list[i]==process.pid:
                            temp_str += f'{bcolors.OKGREEN}Process_{i + 1} PID: {PID_list[i]}{bcolors.ENDC} (old: {old_pid})\n'
                        else:
                            temp_str += f'Process_{i + 1} PID: {PID_list[i]}\n'
                    temp_str = temp_str.rstrip()
                    # temp_str = '\n'.join(
                    #     [f'Process_{i + 1} PID: {PID_list[i]}' for i in range(len(PID_list))])
                    print('Recovery result:\n' + temp_str)
            except:
                pass
        time.sleep(0.2)





    print("Main PID:", os.getpid())
    print("Process_1 PID:", process_1.pid)
    print("Process_2 PID:", process_2.pid)
    print("Process_killer PID:", process_kill.pid)

    process_1.join()
    process_2.join()
    process_kill.join()
    # process_recov.join()
    process.join()

    time.sleep(5)

    print("Program completed")

А вот второй:

# Работа с процессами - рабочая, но сомнительная версия
import multiprocessing as mp
import threading
import time
import random
import psutil
import os
import platform
import fnmatch
import signal
import subprocess

class bcolors:
    HEADER = '\033[95m'
    OKBLUE = '\033[94m'
    OKCYAN = '\033[96m'
    OKGREEN = '\033[92m'
    WARNING = '\033[93m'
    FAIL = '\033[91m'
    ENDC = '\033[0m'
    BOLD = '\033[1m'
    UNDERLINE = '\033[4m'

def proc_1(timepass=1, repeat=20):
    repeated = True
    i = 0
    while repeated:
        x = random.uniform(0, timepass)
        i += 1
        if i < repeat:
            print("Proc_1: " + str(i) + ", will fall asleep for: " + str(x))
        else:
            repeated = False
            print("Proc_1 COMPLETED!")
            break
        time.sleep(x)


def proc_2(timepass=1, repeat=20):
    repeated = True
    i = 0
    while repeated:
        x = random.uniform(0, timepass)
        i += 1
        if i < repeat:
            print("Proc_2: " + str(i) + ", will fall asleep for: " + str(x))
        else:
            repeated = False
            print("Proc_2 COMPLETED!")
            break
        time.sleep(x)


def process_killer(PID_list, timepass, exclusion_list=[]):
    while True:
        system = platform.system()
        x = random.uniform(0, timepass)
        time.sleep(x)
        this_process_pid = os.getpid()
        this_process = psutil.Process(this_process_pid)
        this_process_info_as_dict = this_process.as_dict()
        parent_process_pid = this_process.ppid()
        parent_process = psutil.Process(parent_process_pid)
        parent_process_as_dict = parent_process.as_dict()
        parent_process_children = parent_process.children(recursive=True)
        child_pid_list = []
        for i in range(len(parent_process_children)):
            child_info_as_dict = parent_process_children[i].as_dict()
            if child_info_as_dict['pid'] != this_process_pid:
                child_pid_list.append(child_info_as_dict['pid'])
        child_pid_list = list(set(child_pid_list) - set(exclusion_list))
        # for i in range(len(child_pid_list)):
        #     print(f'Process_{i+1} PID: {child_pid_list[i]}')
        temp_str = '\n'.join([f'Process_{i + 1} PID: {child_pid_list[i]}' for i in range(len(child_pid_list))])
        print(f"{temp_str}")

        if len(child_pid_list) > 0:
            if len(child_pid_list) > 1:
                number = random.randint(0, len(child_pid_list) - 1)
            else:
                number = 0
            kill_proc = psutil.Process(child_pid_list[number])
            kill_process_info_as_dict = kill_proc.as_dict()
            if psutil.pid_exists(kill_process_info_as_dict['pid']):
                if fnmatch.fnmatch(kill_process_info_as_dict['name'], "python*"):
                    print("We kill the process with PID", kill_process_info_as_dict['pid'])
                    try:
                        process = psutil.Process(kill_process_info_as_dict['pid'])
                    except psutil.NoSuchProcess:
                        print(f"Process with PID {child_pid_list[number]} not found.")
                        continue
                    else:
                        if system == "Windows":
                            kill_proc.kill()
                        elif system == "Linux":
                            os.kill(kill_process_info_as_dict['pid'], signal.SIGTERM)
                            # os.kill(kill_process_info_as_dict['pid'],signal.SIGKILL)
                            # subprocess.call(["kill", str(kill_process_info_as_dict['pid'])])

                        print(f"{bcolors.FAIL}Process with PID {kill_process_info_as_dict['pid']} killed.{bcolors.ENDC}")
                        child_pid_list.remove(kill_process_info_as_dict['pid'])


def process_recovery(process_pid_list):
    PID_list = process_pid_list
    this_process_pid = os.getpid()
    this_process = psutil.Process(this_process_pid)
    this_process_info_as_dict = this_process.as_dict()
    parent_process_pid = this_process.ppid()
    parent_process = psutil.Process(parent_process_pid)
    parent_process_as_dict = parent_process.as_dict()
    parent_process_children = parent_process.children(recursive=True)
    child_pid_list = []
    for i in range(len(parent_process_children)):
        child_info_as_dict = parent_process_children[i].as_dict()
        if child_info_as_dict['pid'] != this_process_pid:
            child_pid_list.append(child_info_as_dict['pid'])

    system = platform.system()

    while True:
        if system == "Linux":
            try:
                os.wait()
            except:
                pass
        for i in range(len(child_pid_list)):
            try:
                if psutil.pid_exists(child_pid_list[i]):
                    pass
                    # print(f'Process with PID {process_pid} is alive')
                else:
                    print(f'Process with PID {child_pid_list[i]} is dead')
                    print(f"{bcolors.WARNING}Restoring the process{bcolors.ENDC}")
                    if child_pid_list[i] == PID_list[0]:
                        process = mp.Process(target=proc_1, kwargs={'timepass': 2, 'repeat': 30})
                    elif child_pid_list[i] == PID_list[1]:
                        process = mp.Process(target=proc_2, kwargs={'timepass': 3, 'repeat': 30})
                    else:
                        process = mp.Process(target=process_killer,
                                                  kwargs={'PID_list': PID_list, 'timepass': 10,
                                                          'exclusion_list': [process_recov.pid, ]})
                    process.start()
                    old_pid = child_pid_list[i]
                    child_pid_list[i] = process.pid
                    if old_pid == PID_list[0]:
                        PID_list[0] = process.pid
                    if old_pid == PID_list[1]:
                        PID_list[1] = process.pid

                    temp_str = ""
                    for i in range(len(child_pid_list)):
                        if child_pid_list[i] == process.pid:
                            temp_str += f'{bcolors.OKGREEN}Process_{i + 1} PID: {child_pid_list[i]}{bcolors.ENDC} (old: {old_pid})\n'
                        else:
                            temp_str += f'Process_{i + 1} PID: {child_pid_list[i]}\n'
                    temp_str = temp_str.rstrip()
                    # temp_str = '\n'.join(
                    #     [f'Process_{i + 1} PID: {PID_list[i]}' for i in range(len(PID_list))])
                    print('Recovery result:\n' + temp_str)
                    threading.Thread(target=process.join).start()
            except:
                pass
        time.sleep(0.2)


if __name__ == "__main__":
    process_name_list = ["process_1", "process_2"]

    process_1 = mp.Process(target=proc_1, kwargs={'timepass': 2, 'repeat': 30})
    process_2 = mp.Process(target=proc_2, kwargs={'timepass': 3, 'repeat': 30})

    process_1.start()
    process_2.start()

    PID_list = [process_1.pid, process_2.pid]

    process_recov = mp.Process(target=process_recovery, kwargs={'process_pid_list': PID_list})
    process_recov.start()

    process_kill = mp.Process(target=process_killer,
                              kwargs={'PID_list': PID_list, 'timepass': 10, 'exclusion_list': [process_recov.pid, ]})
    #process_kill.start()



    print("Main PID:", os.getpid())
    print("Process_1 PID:", process_1.pid)
    print("Process_2 PID:", process_2.pid)
    print("Process_killer PID:", process_kill.pid)

    process_1.join()
    process_2.join()
    #process_kill.join()
    process_recov.join()

    time.sleep(5)

    print("Program completed")

Надеюсь, что, если вдруг, вы столкнётесь с подобной проблемой и не захотите тратить время на поиска ошибки — фантома, то вам поможет такое решение проблемы.

Теги:
Хабы:
Если эта публикация вас вдохновила и вы хотите поддержать автора — не стесняйтесь нажать на кнопку
Всего голосов 12: ↑3 и ↓9-4
Комментарии20

Публикации

Истории

Работа

Data Scientist
80 вакансий
Python разработчик
118 вакансий

Ближайшие события

7 – 8 ноября
Конференция byteoilgas_conf 2024
МоскваОнлайн
7 – 8 ноября
Конференция «Матемаркетинг»
МоскваОнлайн
15 – 16 ноября
IT-конференция Merge Skolkovo
Москва
22 – 24 ноября
Хакатон «AgroCode Hack Genetics'24»
Онлайн
28 ноября
Конференция «TechRec: ITHR CAMPUS»
МоскваОнлайн
25 – 26 апреля
IT-конференция Merge Tatarstan 2025
Казань