Агентно-ориентированное программирование





Тема дипломной работы в университете была «Многоагентные системы для обработки баз знаний». Подключение многоагентной системы Jade к базе знаний Protege не составило труда и диплом готов. Теперь можно моделировать абстрактные учебные задачи, рои агентов, и так далее и тому подобное. Но возник вопрос, а как применить на деле полученные знания? Случай завершить НИОКР подвернулся при работе над системой «умный дом». Потребовалось небольшое многопоточное приложение для передачи данных от «умного дома» стороннему разработчику веб-интерфейсов. Вот прекрасная возможность применить Агентно-ориентированное программирование. В результате была успешно создана многоагентная система для параллельного программирования.



Ключевые классы многоагентной системы



Для наилучшего понимания основную идею можно сформулировать следующим образом: ментальная модель системы — это агенты работающие в потоках. Идея проста — потоки это сложно, поэтому отделим их от прикладных алгоритмов.

Классы Agency и Agent


Класс Agency спроектирован потокобезопасным. А в классе Agent стоит обратить внимание на паттерн типа «универсальный инструментарий»
/**
 * Агенты через этот класс найдут друг друга (даже если они в разных потоках)
 */
public class Agency {

    String name = "defaultAgencyName";

    /**
     * Справочник агентов
     */
    public final ConcurrentSkipListMap<String, Agent> agentReference = new ConcurrentSkipListMap<>();

    public void kill(String name) {
        agentReference.remove(name);
    }

    /**
     * Общая очередь заданий
     */
    private final ConcurrentLinkedQueue<Agent> missions = new ConcurrentLinkedQueue<>();

    private final ReentrantLock missionLock = new ReentrantLock();
    private final Condition missionAvailable = missionLock.newCondition();

    /**
     * Потокобезопасное добавление задания в очередь
     */
    public void addMission(Agent agent) {
        missionLock.lock();
        try {
            this.missions.add(agent);
            missionAvailable.signal(); // Будим один ожидающий поток
        } finally {
            missionLock.unlock();
        }
    }

    /**
     * Потокобезопасное извлечение задания из очереди
     */
    public Agent nextMission() throws InterruptedException {
        missionLock.lock();
        try {
            while (missions.isEmpty()) {
                missionAvailable.await(); // Ждем с условием
            }
            return missions.poll();
        } finally {
            missionLock.unlock();
        }
    }
}
/** Базовый класс для агентов */
public abstract class Agent {
    
    public String codeName = "DefaultAgent";

    /** Флаг уничтожения потока */
    public boolean kill = false;

    /** Метод для выполнения задачи агента */
    public abstract void doTask() throws InterruptedException;
    
    /** Ссылка на организацию агента */
    public Agency agency = null;

    /**  Здесь можно хранить все: задачи агента, результаты работы агента, инструменты агента (паттерн инструментарий) ... */
    public ConcurrentSkipListMap<String, Object> tools = new ConcurrentSkipListMap<>();

    /** Мессенджер агента */
    public ConcurrentLinkedQueue<Message> messages = new ConcurrentLinkedQueue<>();

    /** Неблокирующее чтение */
    public Message readMessage() { return messages.poll(); }
    /** Неблокирующая запись */
    public void addMessage(Message message) { messages.offer(message); }

    private final ReentrantLock messageLock = new ReentrantLock();
    private final Condition hasMessages = messageLock.newCondition();

    /** Синхронное чтение: ожидание блокирует весь поток */
    public Message readMsgWait() throws InterruptedException {
        messageLock.lock();
        try {
            while (messages.isEmpty()) {
                    hasMessages.await();
            }
            return messages.poll();
        }  finally {
            messageLock.unlock();
        }
    }

    /** Синхронное чтение с таймаутом */
    public Message readMsgWaitWithTimeout(long timeout, TimeUnit unit) throws InterruptedException {
        messageLock.lock();
        try {
            while (messages.isEmpty()) {
                if (!hasMessages.await(timeout, unit)) {
                    return null; // Таймаут истек
                }
            }
            return messages.poll();
        } finally {
            messageLock.unlock();
        }
    }

    /** Синхронное уведомление: разблокировка потока */
    public void addMsgNotify(Message message) {
        messageLock.lock();
        try {
            messages.offer(message);
            hasMessages.signal();
        } finally {
            messageLock.unlock();
        }
    }

    public String getInstruction() { return (String) this.tools.get("instruction"); }
    public void setInstruction(String instruction) { this.tools.put("instruction", instruction); }
}


Класс ThreadAgent



В этом классе-потоке запускаются сценарии, внутри которых и работают агенты. Они могут общаться друг с другом даже если они работают в разных потоках. Схожее решение предоставляет нам MPI. Кстати, в один поток можно отправить на работу сразу несколько агентов.

/**
 * Один из типов потоков: агенты с заданиями из общей очереди
 */
public class ChainOperation extends Thread {
    public String name = "default";

    public Agency agency = null;

    public SpecialOperations operations = null;

    /**
     * Удалить ссылку на поток из списка потоков.
     * Если ссылок нет, то поток уничтожается.
     */
    public void unlink() {
        this.operations.list.remove(this.name);
    }

    @Override
    public void run() {

        Agent agent;

        while (!Thread.currentThread().isInterrupted()) {
            try {
                Utils.log(Unit.outFlag, name + " поток активирован.");
                agent = agency.nextMission();
                Utils.log(Unit.outFlag, this.name + ": агент " + agent.codeName);

                //активация агента
                agent.doTask();

                //аккуратное уничтожение потока агентом
                if (agent.kill) {
                    Utils.log(Unit.outFlag, "Агент " + agent.codeName + " уничтожает поток " + this.name);
                    this.unlink();
                    break;
                }
            } catch (InterruptedException e) {
                Utils.log(Unit.outFlag, "interrupt " + this.name);
                Thread.currentThread().interrupt();
                break;
            }
        }
    }
}


Исходный код системы



По ссылке в конце статьи можно получить исходный код системы. Сам код представляет собой каркас (FrameWork) который можно расширять, создавая инструменты для агентов, проектируя новые потоки. Для управления потоками реализован класс ThreadPool.java. Использованы lock-free структуры данных для безопасной работы потоков. Также присутствуют примеры работы системы, написанные в стиле Unit-тестов. Исходя из опыта скажу, что удобно работать со сторонним кодом. Загружаем его в сценарий и отправляем в поток, а агент по ходу выполнения стороннего кода вносит в него свои коррективы. Скачиваем, пишем параллельные программы и забываем о прокрустовом ложе шаблона producer-consumer.
Можете подключать к агентам свой искусственный интеллект: базы знаний, экспертные системы, нейросети…

Исходный код системы Social-Intelligence: gitflic.ru/project/neutrino/agency