Pull to refresh

Удаленный доступ к IP камерам, теперь на Python

Reading time19 min
Views55K
Реальное изображение с одной из камер
Реальное изображение с одной из камер

Удивительно, что в 2021-м все еще можно обсуждать такую избитую тему. Однако, мне пришлось пройти довольно длинный путь от покупки охранных камер до готового решения, покрывающего мои, довольно нехитрые, задачи. Под катом вы найдете скрипт, который показался мне достаточно удачным, чтобы опубликовать его на Хабре, и некоторые пояснения к нему. Надеюсь, кому-то поможет.

UPD: Во второй части я расскажу о мобильном приложении для просмотра IP камер, в третьей — о просмотре камер в браузере.

Немного предыстории: мне достались IP камеры Rubetek RV-3414 и Hikvision DS-2CD2023. Мне нужно было организовать оперативный доступ к ним с мобильных устройств и непрерывную запись на жесткий диск. В качестве видеорегистратора и медиасервера я использую Intel NUC младшей модели из-за низкого потребления энергии, что сильно увеличивает автономность всей системы. Обе камеры работают по протоколу RTSP – Real Time Streaming Protocol. Теперь обо всем по порядку.

Инструкции производителей

Первое, что я сделал после установки камер, - подключил их согласно прилагаемым инструкциям. И тут началось приключение, полное сюрпризов и неожиданных поворотов.

Rubetek предложил проприетарный софт для просмотра видеопотока, который работает с оборудованием собственного производства (и почти не глючит). Но при подключении нескольких клиентов к камере это чудо техники почему-то отказывается отдавать поток на основное устройство (у меня это мобильный телефон на Андроиде). Ладно, не больно то и хотелось, все равно держать зоопарк программ под каждую камеру я не собирался. Устанавливаю на телефон VLC — работает.

Hikvision порадовал меня тем, что не требует ничего скачивать, и у камеры есть веб интерфейс для настройки. Инструкция предлагает открыть указанный IP адрес в IE, Chrome или Firefox’е. Открываю в Windows 11:

Windows 11, Edge работает в режиме IE
Windows 11, Edge работает в режиме IE

Требует обновить Edge до IE. Смешно. В 11-й, согласно официальным данным, нет возможности установить IE, зато есть режим IE (на скриншоте). Включаю — не открывается. Chrome, FF — то же самое, Linux — то же самое. Весело. Устанавливаю Win 7 в виртуалку, настраиваю камеру. Без комментариев. Справедливости ради должен сказать, что к работе самой камеры у меня претензий нет.

Удаленный доступ

Наконец, пришло время настроить удаленный просмотр видеопотоков. Мои камеры подключены к роутеру с «серым» IP адресом, подключится извне не получится. Самый простой выход из этой ситуации (не считая покупки IP) — прокинуть SSH туннель до ближайшего сервера с «белым» IP:

ssh -NT -o ServerAliveInterval=60 -o ExitOnForwardFailure=yes -R <remote_port>:<local_ip>:<local_port> <login>@<remote_ip>

Команда выполняется на сервере в локальной сети. Все запросы на <remote_ip>:<remote_port> будут проброшены на <local_ip>:<local_port>, т. е. на камеру. Но чтобы удаленный сервер слушал сетевой интерфейс, придется разрешить проброс строкой «GatewayPorts yes» в sshd_config. Это не безопасно, делать так не надо. Кроме того, логин/пароль от камеры в этом случае гуляет по интернету в открытом виде и легко может быть скомпрометирован.

К счастью, существуют готовые решения на этот случай, например rtsp-simple-server. Отличный скрипт на Go, покрывает почти все мои потребности. Но хочется больше контроля и больше функционала, а значит, придется разбираться в деталях.

Техзадание

Опыт, полученный до этого момента, помог мне сформулировать требования к будущему серверу.

  • Надежное подключение клиентов в локальной сети. Оперативное подключение к камерам должно работать железобетонно. По моим наблюдениям, камеры глючит при большом числе подключений (большое — это больше одного — двух). Поэтому в идеале нужно организовать одно подключение к каждой камере, независимо от количества клиентов.

  • Минимальная задержка подключения клиентов, как минимум, первого локального.

  • Низкая нагрузка на процессор. Для меня это важно, потому что иначе мой Intel NUC начнет есть батарею бесперебойника и противно шуршать охлаждающей турбинкой.

  • Проксирование потоков с IP камер неограниченному количеству клиентов в локальной сети и ограничение количества веб клиентов.

  • Запись на жесткий диск с разбивкой на фрагменты и суточной ротацией.

  • Восстановление соединения с камерами и записи на диск после отключения камер. Рано или поздно отключение произойдет обязательно, например, из-за перебоя питания.

  • Подключения нужно логировать.

Ну все, ТЗ есть, приступаю к реализации.

Сбор данных

Для начала нужно взглянуть, как VLC общается с камерами. Включаю Wireshark с фильтром tcp.port == 554 и вижу:

Rubetek

Ask: OPTIONS rtsp://192.168.0.114:554/onvif1 RTSP/1.0
CSeq: 2
User-Agent: LibVLC/3.0.16 (LIVE555 Streaming Media v2021.08.24)

Reply: RTSP/1.0 200 OK
CSeq: 2
Public: OPTIONS, DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE, GET_PARAMETER, SET_PARAMETER,USER_CMD_SET

Ask: DESCRIBE rtsp://192.168.0.114:554/onvif1 RTSP/1.0
CSeq: 3
User-Agent: LibVLC/3.0.16 (LIVE555 Streaming Media v2021.08.24)
Accept: application/sdp

Reply: RTSP/1.0 200 OK
CSeq: 3
Content-Type: application/sdp
Content-Length: 422

v=0
o=- 1421069297525233 1 IN IP4 192.168.0.113
s=H.264 Video, RtspServer_0.0.0.2
t=0 0
a=tool:RtspServer_0.0.0.2
a=type:broadcast
a=control:*
a=range:npt=0-
m=video 0 RTP/AVP 96
c=IN IP4 0.0.0.0
b=AS:500
a=rtpmap:96 H264/90000
a=fmtp:96 packetization-mode=1;profile-level-id=42001F;sprop-parameter-sets=Z0IAH5WoFAFuQA==,aM48gA==
a=control:track1
m=audio 0 RTP/AVP 8
a=control:track2
a=rtpmap:8 PCMA/8000

Ask: SETUP rtsp://192.168.0.114:554/onvif1/track1 RTSP/1.0
CSeq: 4
User-Agent: LibVLC/3.0.16 (LIVE555 Streaming Media v2021.08.24)
Transport: RTP/AVP;unicast;client_port=45150-45151

Reply: RTSP/1.0 200 OK
CSeq: 4
Transport: RTP/AVP;unicast;destination=192.168.0.165;source=192.168.0.113;client_port=45150-45151;server_port=7060-7061
Session: 7c2467db;timeout=60

Ask: SETUP rtsp://192.168.0.114:554/onvif1/track2 RTSP/1.0
CSeq: 5
User-Agent: LibVLC/3.0.16 (LIVE555 Streaming Media v2021.08.24)
Transport: RTP/AVP;unicast;client_port=35736-35737
Session: 7c2467db

Reply: RTSP/1.0 200 OK
CSeq: 5
Transport: RTP/AVP;unicast;destination=192.168.0.165;source=192.168.0.113;client_port=35736-35737;server_port=7062-7063
Session: 7c2467db;timeout=60

Ask: PLAY rtsp://192.168.0.114:554/onvif1 RTSP/1.0
CSeq: 6
User-Agent: LibVLC/3.0.16 (LIVE555 Streaming Media v2021.08.24)
Session: 7c2467db
Range: npt=0.000-

Reply: RTSP/1.0 200 OK
CSeq: 6
Range: npt=0.000-
Session: 7c2467db
RTP-Info: url=rtsp:192.168.0.113:554/onvif1/track1;seq=57651;rtptime=61388916750,url=rtsp:192.168.0.113:554/onvif1/track2;seq=58422;rtptime=5456792600

*** Тут камера начинает посылать поток на указанный порт ***

Ask: TEARDOWN rtsp://192.168.0.114:554/onvif1 RTSP/1.0
CSeq: 7
User-Agent: LibVLC/3.0.16 (LIVE555 Streaming Media v2021.08.24)
Session: 7c2467db

Hikvision

Ask: OPTIONS rtsp://192.168.0.110:554/ISAPI/Streaming/Channels/101 RTSP/1.0
CSeq: 2
User-Agent: LibVLC/3.0.16 (LIVE555 Streaming Media v2021.08.24)

Reply: RTSP/1.0 200 OK
CSeq: 2
Public: OPTIONS, DESCRIBE, PLAY, PAUSE, SETUP, TEARDOWN, SET_PARAMETER, GET_PARAMETER
Date: Mon, Nov 22 2021 09:57:17 GMT

Ask: DESCRIBE rtsp://192.168.0.110:554/ISAPI/Streaming/Channels/101 RTSP/1.0
CSeq: 3
User-Agent: LibVLC/3.0.16 (LIVE555 Streaming Media v2021.08.24)
Accept: application/sdp

Reply: RTSP/1.0 401 Unauthorized
CSeq: 3
WWW-Authenticate: Digest realm="IP Camera(G2669)", nonce="17215f510ab5085c7aef996a1d42769f", stale="FALSE"
Date: Mon, Nov 22 2021 09:57:17 GMT

Ask: DESCRIBE rtsp://192.168.0.110:554/ISAPI/Streaming/Channels/101 RTSP/1.0
CSeq: 4
Authorization: Digest username="login", realm="IP Camera(G2669)", nonce="17215f510ab5085c7aef996a1d42769f", uri="rtsp://192.168.0.110:554/ISAPI/Streaming/Channels/101", response="69ce13d857b38e6e68f7f5a4a85cd709"
User-Agent: LibVLC/3.0.16 (LIVE555 Streaming Media v2021.08.24)
Accept: application/sdp

Reply: RTSP/1.0 200 OK
CSeq: 4
Content-Type: application/sdp
Content-Base: rtsp://192.168.0.110:554/ISAPI/Streaming/Channels/101/
Content-Length: 587

v=0
o=- 1637575037561170 1637575037561170 IN IP4 192.168.0.110
s=Media Presentation
e=NONE
b=AS:5050
t=0 0
a=control:rtsp://192.168.0.110:554/ISAPI/Streaming/Channels/101/
m=video 0 RTP/AVP 96
c=IN IP4 0.0.0.0
b=AS:5000
a=recvonly
a=x-dimensions:1920,1080
a=control:rtsp://192.168.0.110:554/ISAPI/Streaming/Channels/101/trackID=1
a=rtpmap:96 H265/90000
a=fmtp:96 sprop-sps=QgEBAWAAAAMAsAAAAwAAAwB7oAPAgBDljb5JMvTcBAQEAg==; sprop-pps=RAHA8vA8kAA=
a=Media_header:MEDIAINFO=494D4B48010300000400050000000000000000000000000081000000000000000000000000000000;
a=appversion:1.0

Ask: SETUP rtsp://192.168.0.110:554/ISAPI/Streaming/Channels/101/trackID=1 RTSP/1.0
CSeq: 5
Authorization: Digest username="login", realm="IP Camera(G2669)", nonce="17215f510ab5085c7aef996a1d42769f", uri="rtsp://192.168.0.110:554/ISAPI/Streaming/Channels/101/", response="2341d81156d9cee08db0004835486f51"
User-Agent: LibVLC/3.0.16 (LIVE555 Streaming Media v2021.08.24)
Transport: RTP/AVP;unicast;client_port=59446-59447

Reply: RTSP/1.0 200 OK
CSeq: 5
Session: 695167870;timeout=60
Transport: RTP/AVP;unicast;client_port=59446-59447;server_port=8302-8303;ssrc=568ed713;mode="play"
Date: Mon, Nov 22 2021 09:57:17 GMT

Ask: PLAY rtsp://192.168.0.110:554/ISAPI/Streaming/Channels/101/ RTSP/1.0
CSeq: 6
Authorization: Digest username="login", realm="IP Camera(G2669)", nonce="17215f510ab5085c7aef996a1d42769f", uri="rtsp://192.168.0.110:554/ISAPI/Streaming/Channels/101/", response="a2a71ba4866e2f77d14f7368f368da5f"
User-Agent: LibVLC/3.0.16 (LIVE555 Streaming Media v2021.08.24)
Session: 695167870
Range: npt=0.000-

Reply: RTSP/1.0 200 OK
CSeq: 6
Session: 695167870
RTP-Info: url=rtsp://192.168.0.110:554/ISAPI/Streaming/Channels/101/trackID=1;seq=54784;rtptime=2171307498
Date: Mon, Nov 22 2021 09:57:17 GMT

Ask: GET_PARAMETER rtsp://192.168.0.110:554/ISAPI/Streaming/Channels/101/ RTSP/1.0
CSeq: 7
Authorization: Digest username="login", realm="IP Camera(G2669)", nonce="17215f510ab5085c7aef996a1d42769f", uri="rtsp://192.168.0.110:554/ISAPI/Streaming/Channels/101/", response="192d15433a0964eb2782026d8e908ed3"
User-Agent: LibVLC/3.0.16 (LIVE555 Streaming Media v2021.08.24)
Session: 695167870

Reply: RTSP/1.0 200 OK
CSeq: 7
Date: Mon, Nov 22 2021 09:58:15 GMT

Ask: GET_PARAMETER rtsp://192.168.0.110:554/ISAPI/Streaming/Channels/101/ RTSP/1.0
CSeq: 9
Authorization: Digest username="login", realm="IP Camera(G2669)", nonce="17215f510ab5085c7aef996a1d42769f", uri="rtsp://192.168.0.110:554/ISAPI/Streaming/Channels/101/", response="192d15433a0964eb2782026d8e908ed3"
User-Agent: LibVLC/3.0.16 (LIVE555 Streaming Media v2021.08.24)
Session: 695167870

Reply: RTSP/1.0 200 OK
CSeq: 9
Date: Mon, Nov 22 2021 10:00:11 GMT

*** Тут камера начинает посылать поток на указанный порт ***

Ask: TEARDOWN rtsp://192.168.0.110:554/ISAPI/Streaming/Channels/101/ RTSP/1.0
CSeq: 16
Authorization: Digest username="login", realm="IP Camera(G2669)", nonce="17215f510ab5085c7aef996a1d42769f", uri="rtsp://192.168.0.110:554/ISAPI/Streaming/Channels/101/", response="e6ef5c8c7ab615db158e7e77c8f7b77a"
User-Agent: LibVLC/3.0.16 (LIVE555 Streaming Media v2021.08.24)
Session:

Reply: RTSP/1.0 200 OK
CSeq: 16
Session: 695167870
Date: Mon, Nov 22 2021 10:06:25 GMT

Общение происходит по протокол RTSP, который достаточно хорошо описан в интернете, поэтому останавливаться на нем я не вижу смысла. А вот о различиях стоит сказать пару слов.

Первая камера не требует авторизации и отдает картинку всем желающим, вторая же требует дайджест-аутентификации (строка WWW-Authenticate: Digest в листинге). Поэтому на первый запрос DESCRIBE она отвечает 401 Unauthorized, но дает ключи шифрования realm и nonce. Имея эти ключи, в следующем запросе нужно указать параметр response, который вычисляется так:

response = md5(md5(login:realm:password):nonce:md5(option:url),

где login:password — логин/пароль от камеры, option — текущий метод.

Еще одно важное отличие состоит в наличии у первой камеры микрофона, поэтому клиент запрашивает SETUP дважды: для видео (track1) и звуковой дорожки (track2).
Для каждой дорожки VLC запрашивает client_port, куда камера будет посылать данные по протоколу UDP.

Скрипт

Теперь, пожалуй, данных достаточно, чтобы написать первую версию сервера. Писать я буду на «чистом» Python 3.7+, без сторонних зависимостей. Поскольку все подключения и потоки должны обрабатываться параллельно, неплохо бы использовать библиотеку asyncio.

Идея скрипта предельно проста: один раз подключиться к каждой камере, запомнить ее параметры и отвечать клиентам ровно то же самое (за исключением мелких деталей). Видеопоток нужно раздавать (проксировать) клиентам по мере их подключения.

Ниже приведен листинг минимальной работающей версии сервера, исключительно для облегчения понимания. Полная версия доступна на Github.

main.py
import asyncio
from config import Config
from camera import Camera
from client import Client


async def main():
    for hash in Config.cameras.keys():
        await Camera(hash).connect()

    await Client.listen()


if __name__ == '__main__':
    asyncio.run(main())

Здесь я сразу подключаю все указанные в конфиге камеры и начинаю слушать TCP порт 4554. В реальной жизни делать так не нужно — клиентов ведь может и не быть, а электричество надо экономить:)

config.py
import socket


class Config:
    cameras = {
        'хеш/любая-URL-совместимая-строка': {
            'path': 'относительный путь к хранилищу',
            'url': 'rtsp://<логин>:<пароль>@<хост>:554/<uri>'},
    }

    rtsp_port = 4554
    start_udp_port = 5550

    local_ip = socket.gethostbyname(socket.gethostname())

    # Ограничить число веб-клиентов
    web_limit = 2

    log_file = '/var/log/python-rtsp-server.log'

Доступ к камерам я буду предоставлять по адресам вида rtsp://<адрес сервера>:<порт>/<хеш камеры>, где хеш камеры — любая URL-совместимая строка, включая символы UTF. Хеши и реальные адреса камер нужно указать в словаре cameras.

camera.py
import asyncio
import re
import time
from hashlib import md5
from shared import Shared
from config import Config
from log import Log


class Camera:
    def __init__(self, hash):
        self.hash = hash
        self.url = self._parse_url(Config.cameras[hash]['url'])

    async def connect(self):
        """ Открываем TCP сокет и подключаем камеру
        """
        self.udp_ports = self._get_self_udp_ports()
        self.cseq = 1
        self.realm, self.nonce = None, None

        try:
            self.reader, self.writer = await asyncio.open_connection(self.url['host'], self.url['tcp_port'])
        except Exception as e:
            print(f"Can't connect to camera [{self.hash}]: {e}")
            return

        await self._request('OPTIONS', self.url['url'])

        reply, code = await self._request(
            'DESCRIBE',
            self.url['url'],
            'User-Agent: python-rtsp-server',
            'Accept: application/sdp')

        if code == 401:
            self.realm, self.nonce = self._get_auth_params(reply)

            reply, code = await self._request(
                'DESCRIBE',
                self.url['url'],
                'Accept: application/sdp')

        self.description = self._get_description(reply)

        track_ids = self._get_track_ids(reply)

        reply, code = await self._request(
            'SETUP',
            f'{self.url["url"]}/{track_ids[0]}',
            ('Transport: RTP/AVP;unicast;'
                f'client_port={self.udp_ports["track1"][0]}-{self.udp_ports["track1"][1]}'))

        self.session_id = self._get_session_id(reply)

        if len(track_ids) > 1:
            reply, code = await self._request(
                'SETUP',
                f'{self.url["url"]}/{track_ids[1]}',
                ('Transport: RTP/AVP;unicast;'
                    f'client_port={self.udp_ports["track2"][0]}-{self.udp_ports["track2"][1]}'),
                f'Session: {self.session_id}')

        reply, code = await self._request(
            'PLAY',
            self.url['url'],
            f'Session: {self.session_id}',
            'Range: npt=0.000-')

        Shared.data[self.hash] = {
            'description': self.description,
            'rtp_info': self._get_rtp_info(reply),
            # 'transports': {},
            'clients': {}}

        Log.add(f'Camera [{self.hash}] connected')

        await self._listen()

    async def _listen(self):
        """ Открываем UDP сокет и начинаем проксировать фреймы
        """
        await self._start_server('track1')

        if self.description['audio']:
            await self._start_server('track2')

    async def _request(self, option, url, *lines):
        """ Запрос к камере с заданным OPTION и другими строками.
            Возвращает декодированный ответ и статус
        """
        command = f'{option} {url} RTSP/1.0\r\n' \
            f'CSeq: {self.cseq}\r\n'

        auth_line = self._get_auth_line(option)
        if auth_line:
            command += f'{auth_line}\r\n'

        for row in lines:
            if row:
                command += f'{row}\r\n'
        command += '\r\n'

        print(f'*** Ask:\n{command}')
        self.writer.write(command.encode())
        reply = (await self.reader.read(4096)).decode()
        print(f"*** Reply:\n{reply}")
        self.cseq += 1

        res = re.match(r'RTSP/1.0 (\d{3}) ([^\r\n]+)', reply)
        if not res:
            print('Error: invalid reply\n')
            return reply, 0
        return reply, int(res.group(1))

    def _get_auth_params(self, reply):
        """ Достать параметры realm и nonce для "digest" авторизации
        """
        realm_nonce = re.match(r'.+?\nWWW-Authenticate:.+?realm="(.+?)", ?nonce="(.+?)"', reply, re.DOTALL)
        if not realm_nonce:
            raise RuntimeError('Invalid digest auth reply')

        return realm_nonce.group(1), realm_nonce.group(2)

    def _get_auth_line(self, option):
        """ Собрать "response" хеш авторизации
        """
        if not self.realm or not self.nonce:
            return
        ha1 = md5(f'{self.url["login"]}:{self.realm}:{self.url["password"]}'.encode('utf-8')).hexdigest()
        ha2 = md5(f'{option}:{self.url["url"]}'.encode('utf-8')).hexdigest()
        response = md5(f'{ha1}:{self.nonce}:{ha2}'.encode('utf-8')).hexdigest()
        line = f'Authorization: Digest username="{self.url["login"]}", ' \
            f'realm="{self.realm}" nonce="{self.nonce}", uri="{self.url["url"]}", response="{response}"'
        return line

    def _get_description(self, reply):
        """ Достать SDP (Session Description Protocol) из ответа
        """
        blocks = reply.split('\r\n\r\n', 2)
        if len(blocks) < 2:
            raise RuntimeError('Invalid DESCRIBE reply')

        sdp = blocks[1].strip()

        details = {'video': {}, 'audio': {}}

        res = re.match(r'.+?\nm=video (.+?)\r\n', sdp, re.DOTALL)
        if res:
            details['video'] = {'media': res.group(1), 'bandwidth': '', 'rtpmap': '', 'format': ''}

            res = re.match(r'.+?\nm=video .+?\nb=([^\r\n]+)', sdp, re.DOTALL)
            if res:
                details['video']['bandwidth'] = res.group(1)

            res = re.match(r'.+?\nm=video .+?\na=rtpmap:([^\r\n]+)', sdp, re.DOTALL)
            if res:
                details['video']['rtpmap'] = res.group(1)

            res = re.match(r'.+?\nm=video .+?\na=fmtp:([^\r\n]+)', sdp, re.DOTALL)
            if res:
                details['video']['format'] = res.group(1)

        res = re.match(r'.+?\nm=audio (.+?)\r\n', sdp, re.DOTALL)
        if res:
            details['audio'] = {'media': res.group(1), 'rtpmap': ''}

            res = re.match(r'.+?\nm=audio .+?\na=rtpmap:([^\r\n]+)', sdp, re.DOTALL)
            if res:
                details['audio']['rtpmap'] = res.group(1)

        return details

    def _get_rtp_info(self, reply):
        """ Достать строку "RTP-Info" из ответа
        """
        res = re.match(r'.+?\r\n(RTP-Info: .+?)\r\n', reply, re.DOTALL)
        if not res:
            raise RuntimeError('Invalid RTP-Info')
        rtp_info = res.group(1)

        seq = re.findall(r';seq=(\d+)', rtp_info)
        rtptime = re.findall(r';rtptime=(\d+)', rtp_info)
        if not seq or not rtptime:
            raise RuntimeError('Invalid RTP-Info')

        return {'seq': seq, 'rtptime': rtptime, 'starttime': time.time()}

    def _get_track_ids(self, reply):
        """ Достать ID дорожек из ответа
        """
        track_ids = re.findall(r'\na=control:.*?(track.*?\d)', reply, re.DOTALL)
        if not track_ids:
            raise RuntimeError('Invalid track ID in reply')
        return track_ids

    def _get_session_id(self, reply):
        """ Достать ID сессии из ответа
        """
        res = re.match(r'.+?\nSession: *([^;]+)', reply, re.DOTALL)
        if not res:
            raise RuntimeError('Invalid session ID')
        return res.group(1)

    def _get_self_udp_ports(self):
        """ Получить свободный динамический UDP порт
        """
        start_port = Config.start_udp_port
        idx = list(Config.cameras.keys()).index(self.hash) * 4
        return {
            'track1': [start_port + idx, start_port + idx + 1],
            'track2': [start_port + idx + 2, start_port + idx + 3]}

    def _parse_url(self, url):
        """ Разобрать url камеры на части
        """
        parsed_url = re.match(r'(rtsps?)://(.+?):([^@]+)@(.+?):(\d+)(.+)', url)
        if not parsed_url or len(parsed_url.groups()) != 6:
            raise RuntimeError('Invalid rtsp url')
        return {
            'login': parsed_url.group(2),
            'password': parsed_url.group(3),
            'host': parsed_url.group(4),
            'tcp_port': int(parsed_url.group(5)),
            'url': url.replace(f'{parsed_url.group(2)}:{parsed_url.group(3)}@', '')}

    async def _start_server(self, track_id):
        """ Запустить UDP сервер
        """
        loop = asyncio.get_running_loop()

        await loop.create_datagram_endpoint(
            lambda: CameraUdpProtocol(self.hash, track_id),
            local_addr=('0.0.0.0', self.udp_ports[track_id][0]))


class CameraUdpProtocol(asyncio.DatagramProtocol):
    """ Этот колбэк вызывается при подключении к каждой камере
    """
    def __init__(self, hash, track_id):
        self.hash = hash
        self.track_id = track_id

    def connection_made(self, transport):
        self.transport = transport

    def datagram_received(self, data, addr):
        if not Shared.data[self.hash]['clients']:
            return

        for _sid, client in Shared.data[self.hash]['clients'].items():
            self.transport.sendto(data, (client['host'], client['ports'][self.track_id][0]))

Обслуживает камеры. Единственный публичный метод Camera.connect тривиально реализует подключение по протоколу RTSP, описанное в листингах выше. Для каждой подключенной камеры после команды PLAY запускается сервер asyncio.create_datagram_endpoint(), проксирование входящих потоков происходит в колбэке CameraUdpProtocol.

client.py
# https://docs.python.org/3/library/asyncio-protocol.html

import asyncio
import re
import string
import time
from random import choices, randrange
from config import Config
from shared import Shared
from log import Log


class Client:
    def __init__(self):
        self.camera_hash = None
        self.udp_ports = {}

    @staticmethod
    async def listen():
        """ Слушаем здесь подключения всех клиентов
        """
        host = '0.0.0.0'
        print(f'*** Start listening {host}:{Config.rtsp_port} ***\n')

        loop = asyncio.get_running_loop()
        server = await loop.create_server(
            lambda: ClientTcpProtocol(),
            host, Config.rtsp_port)

        async with server:
            await server.serve_forever()

    def handle_request(self, transport, host, data):
        """ Общяемся с клиентами по протоколу RTSP
        """
        ask, option = self._request(data)
        session_id = self._get_session_id(ask)

        if option == 'OPTIONS':
            self._response(
                transport,
                'Public: OPTIONS, DESCRIBE, SETUP, TEARDOWN, PLAY')

        if option == 'DESCRIBE':
            sdp = self._get_description()
            self._response(
                transport,
                'Content-Type: application/sdp',
                f'Content-Length: {len(sdp) + 2}',
                '',
                sdp)

        elif option == 'SETUP':
            udp_ports = self._get_ports(ask)
            track_id = 'track1' if not self.udp_ports else 'track2'
            self.udp_ports[track_id] = udp_ports
            self._response(
                transport,
                f'Transport: RTP/AVP;unicast;client_port={udp_ports[0]}-{udp_ports[1]};server_port=5998-5999',
                f'Session: {session_id};timeout=60')

        elif option == 'PLAY':
            self._response(
                transport,
                f'Session: {session_id}',
                self._get_rtp_info())

            # if session_id not in Shared.data[self.camera_hash]['clients']:
            Shared.data[self.camera_hash]['clients'][session_id] = {
                'host': host, 'ports': self.udp_ports, 'transport': transport}

            self._check_web_limit(host)

            Log.add(f'Play [{self.camera_hash}] [{session_id}] [{host}]')

        elif option == 'TEARDOWN':
            self._response(transport, f'Session: {session_id}')

        return self.camera_hash, session_id

    def _get_rtp_info(self):
        """ Строим строку "RTP-Info" для клиента, изменив время rtptime для корректной работы счетчика.
            По хорошему, надо просто запросить камеру, но в режиме TCP (interleaved) им это не нравится.
        """
        rtp_info = Shared.data[self.camera_hash]['rtp_info']

        print(rtp_info)

        delta = time.time() - rtp_info['starttime']
        rtptime = int(rtp_info["rtptime"][0]) + int(delta * 90000)
        # 90000 is clock frequency in SDP a=rtpmap:96 H26*/90000

        res = f'RTP-Info: url=rtsp://{Config.local_ip}:{Config.rtsp_port}/track1;' \
            f'seq={rtp_info["seq"][0]};rtptime={rtptime}'

        if len(rtp_info['seq']) < 2:
            return res

        rtptime = int(rtp_info["rtptime"][1]) + int(delta * 8000)
        # 90000 is clock frequency in SDP a=rtpmap:8 PCMA/8000

        res += f',url=rtsp://{Config.local_ip}:{Config.rtsp_port}/track2;' \
            f'seq={rtp_info["seq"][1]};rtptime={rtptime}'

        return res

    def _request(self, data):
        """ Разбираем ответ клиента
        """
        try:
            ask = data.decode()
        except Exception:
            raise RuntimeError(f"can't decode this ask:\n{data}")

        print(f'*** Ask:\n{ask}')
        # res = re.match(r'(.+?) rtsps?://.+?:\d+/(.+?)(/track.*?)? .+?\r\n', ask)
        res = re.match(r'(.+?) rtsps?://.+?:\d+/?(.*?) .+?\r\n', ask)
        if not res:
            raise RuntimeError('invalid ask')

        self.cseq = self._get_cseq(ask)

        if not self.camera_hash:
            hash = res.group(2)
            if hash not in Config.cameras:
                raise RuntimeError('invalid camera hash')
            if hash not in Shared.data:
                raise RuntimeError('camera is offline')
            self.camera_hash = hash

        return ask, res.group(1)

    def _response(self, transport, *lines):
        """ Отдаем клиенту данные строки
        """
        reply = 'RTSP/1.0 200 OK\r\n' \
            f'CSeq: {self.cseq}\r\n'

        for row in lines:
            reply += f'{row}\r\n'
        reply += '\r\n'

        transport.write(reply.encode())

        print(f'*** Reply:\n{reply}')

    def _get_cseq(self, ask):
        """ Текущий счетчик из запроса клиента
        """
        res = re.match(r'.+?\r\nCSeq: (\d+)', ask, re.DOTALL)
        if not res:
            raise RuntimeError('invalid incoming CSeq')
        return int(res.group(1))

    def _get_session_id(self, ask):
        """ ID сессии из запроса клиента
        """
        res = re.match(r'.+?\nSession: *([^;\r\n]+)', ask, re.DOTALL)
        if res:
            return res.group(1).strip()

        return ''.join(choices(string.ascii_lowercase + string.digits, k=9))

    def _get_ports(self, ask):
        """ Номера портов из запроса клиента
        """
        res = re.match(r'.+?\nTransport:[^\n]+client_port=(\d+)-(\d+)', ask, re.DOTALL)
        if not res:
            raise RuntimeError('invalid transport ports')
        return [int(res.group(1)), int(res.group(2))]

    def _get_description(self):
        """ Блок SDP из запроса клиента
        """
        sdp = Shared.data[self.camera_hash]['description']
        res = 'v=0\r\n' \
            f'o=- {randrange(100000, 999999)} {randrange(1, 10)} IN IP4 {Config.local_ip}\r\n' \
            's=python-rtsp-server\r\n' \
            't=0 0'

        if not sdp['video']:
            return res
        res += f'\r\nm=video {sdp["video"]["media"]}\r\n' \
            'c=IN IP4 0.0.0.0\r\n' \
            f'b={sdp["video"]["bandwidth"]}\r\n' \
            f'a=rtpmap:{sdp["video"]["rtpmap"]}\r\n' \
            f'a=fmtp:{sdp["video"]["format"]}\r\n' \
            'a=control:track1'

        if not sdp['audio']:
            return res
        res += f'\r\nm=audio {sdp["audio"]["media"]}\r\n' \
            f'a=rtpmap:{sdp["audio"]["rtpmap"]}\r\n' \
            'a=control:track2'
        return res

    def _check_web_limit(self, host):
        """ Ограничим веб подключения. Локальные - без ограничений.
        """
        if not Config.web_limit or self._get_client_type(host) == 'local':
            return
        web_sessions = []
        for session_id, data in Shared.data[self.camera_hash]['clients'].items():
            if self._get_client_type(data['host']) == 'web':
                web_sessions.append(session_id)
        if len(web_sessions) > Config.web_limit:
            ws = web_sessions[:-Config.web_limit]
            for session_id in ws:
                print('Web limit exceeded, cloce old connection\n')
                Shared.data[self.camera_hash]['clients'][session_id]['transport'].close()
                # Shared.data item will be deleted by ClientTcpProtocol.connection_lost callback

    def _get_client_type(self, host):
        """ Хелпер для определения типа подключения.
            Если IP клиента совпадает с локальным адресом сервера, то это веб клиент за ssh туннелем
        """
        if host == '127.0.0.1' \
            or host == 'localhost' \
                or (host.startswith('192.168.') and host != Config.local_ip):
            return 'local'
        return 'web'


class ClientTcpProtocol(asyncio.Protocol):
    """ Этот колбэк вызывается при подключении к серверу каждого нового клиента
    """
    def __init__(self):
        self.client = Client()
        self.camera_hash, self.session_id = None, None
        # self.event = event

    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        self.transport = transport
        self.host = peername[0]
        print(f'*** New connection from {peername[0]}:{peername[1]} ***\n\n')

    def data_received(self, data):
        try:
            self.camera_hash, self.session_id = self.client.handle_request(self.transport, self.host, data)
        except Exception as e:
            print(f'Error in clent request handler: {e}\n\n')
            self.transport.close()

    def connection_lost(self, exc):
        if not self.session_id or self.session_id not in Shared.data[self.camera_hash]['clients']:
            return
        del(Shared.data[self.camera_hash]['clients'][self.session_id])
        Log.add(f'Close [{self.camera_hash}] [{self.session_id}] [{self.host}]')

Обслуживает клиентов. Metod Client.handle_request делает то же, что и Camera.connect, но на этот раз прикинувшись камерой. За прослушивание порта 4554 отвечает asyncio.create_server (один на всех) и его колбэк ClientTcpProtocol (по экземпляру на каждого клиента).

shared.py
class Shared:
    """ Все задачи (tasks) будут общаться через этот объект
    """
    data = {}

Объект Sared.data используется для обмена данными между задачами (tasks) и корутинами.

log.py
import time
from config import Config


class Log:
    @staticmethod
    def add(info):
        print(f'*** {info} ***\n\n')
        try:
            with open(Config.log_file, 'a') as f:
                f.write(f'{time.strftime("%Y-%m-%d %H:%M:%S")} {info}\n')
        except Exception as e:
            print(f'Log error: {e}\n\n')
            pass

Отвечает за логирование подключений, а заодно и за вывод отладочной информации.

Тестирование

Интересно, что «медленный» Python может противопоставить высокоэффективному многопоточному Go? На скриншотах представлены результаты тестирования приведенного выше скрипта и rtsp-simple-server. Условия равные, одна камера и 10 клиентов в каждом тесте. Сервер легко поднимает и большее количество клиентов, но вот мой десктоп начинает дико тормозить при запуске нескольких десятков окон. Выпадений кадров на глаз не заметно в обоих тестах.

Python
Python
Go
Go

Судите сами. На мой вкус, неплохо. Есть смысл продолжить и довести скрипт до логического конца.

Что дальше?

В реальной жизни, к сожалению, все немного сложнее, чем в теории. Получить UDP пакеты через упомянутый выше SSH туннель не получится. Да и вообще, гонять UDP по интернету кажется мне не самой лучшей идеей. Значит, придется реализовывать получение потоков через TCP.

Если верить показаниям моего WireShark’а, различие в общении VLC с камерами в режиме TCP (с ключом —rtsp-tcp) сводится к изменению строки

Transport: RTP/AVP/TCP;unicast;interleaved=0-1

в запросе SETUP. Как видите, здесь добавляется параметр TCP, а вместо портов клиента указываются каналы чередования interleaved=0-1 (для звуковой дорожки interleaved=2-3). Входящий поток в этом случае будет направлен в тот же TCP порт, фреймы с бинарными данными будут начинаться со знака доллара, за которым следует номер канала. Реализовать такое немного сложнее, так как нужно поиграть в чехарду на единственном сокете, слушающим TCP порт.

Версия на Github содержит простую реализацию чередования, детали можно посмотреть там.

Кроме того, в ТЗ входит видеорегистратор и watchdog к нему — все это находится в файле storage.py и слишком тривиально для обсуждения.

Резюме

Похоже, полученный результат удовлетворяет всем заявленным требованиям. Во всяком случае, я использую этот скрипт на своих серверах без проблем, правда, в ограниченном окружении.

Обратите внимание, приведенный код не готов к промышленной эксплуатации, но может послужить отправной точкой и сэкономить массу времени тем, кто захочет получить больше контроля над своими камерами видеонаблюдения.

Вопросы, пожелания и конструктивная критика приветствуются!

Tags:
Hubs:
Total votes 17: ↑17 and ↓0+17
Comments34

Articles