Категорически приветствую.
Недавно мне понадобилось поработать с Mikrotik через его API. Вроде бы ничего примечательного, есть официальная библиотека, есть еще на гитхабе обёртка, но вот беда — мне надо было работать асинхронно через asyncio и c использованием плюшек async/await. И такой библиотеки я не нашел.
Пришлось писать самому.
Сильно длинной статья не будет, т. к. особо и не о чем писать. И вполне достаточно было бы ссылки на репозиторий.
Установка пакета:
Вот пример использования:
Также есть возможно создать простенький пул коннектов до микротика, но в боевых условия пул не тестился.
Для парсинга ответов от микрота я воспользовался наработками из этого репозитория
Надеюсь, кому-то это окажется полезным.
Недавно мне понадобилось поработать с Mikrotik через его API. Вроде бы ничего примечательного, есть официальная библиотека, есть еще на гитхабе обёртка, но вот беда — мне надо было работать асинхронно через asyncio и c использованием плюшек async/await. И такой библиотеки я не нашел.
Пришлось писать самому.
Сильно длинной статья не будет, т. к. особо и не о чем писать. И вполне достаточно было бы ссылки на репозиторий.
Установка пакета:
pip install aio_api_ros
Вот пример использования:
import asyncio from aio_api_ros import create_rosapi_connection async def main(): # устанавливаем коннект mk = await create_rosapi_connection( mk_ip='127.0.0.1', mk_port=8728, mk_user='myuser', mk_psw='mypassword' ) # отправляем команду mk.talk_word('/ip/hotspot/active/print') # считываем ответ от микротика res = await mk.read() print(res) mk.close() if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close()
Также есть возможно создать простенький пул коннектов до микротика, но в боевых условия пул не тестился.
Тот же пример но с пулом коннектов
import asyncio from aio_api_ros import create_rosapi_simple_pool async def main(): mk = await create_rosapi_simple_pool( mk_ip='127.0.0.1', mk_port=8728, mk_user='myuser', mk_psw='mypassword', max_size=4 ) await mk.talk_word('/ip/hotspot/active/print') res = await mk.read() print(res) mk.close() if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close()
Для парсинга ответов от микрота я воспользовался наработками из этого репозитория
Надеюсь, кому-то это окажется полезным.
