Как стать автором
Обновить

VoiceChat c Vorbis кодеком на несколько человек с блэкджеком и (Web?)UI над UDP socket python

Уровень сложностиСредний
Время на прочтение9 мин
Количество просмотров2.8K
Ругает VoIP, discord и др. за фатальный недостаток
Ругает VoIP, discord и др. за фатальный недостаток

Пару лет назад, когда все эти гитхабры для меня были птичьим пением, а делать мне было нечего — писал я значит небольшие проекты на python. Среди них был простенький голосовой чат на двоих через TCP. Но вот я нашел его и захотел допилить. Лучший способ допилить что‑то — это придумать заново. Всё что вы прочитаете далее — есть мой гайд по изготовлению велосипеда и не претендует на звание полноценного презентабельного проекта.


Вот основные логические поинты предстоящего проекта, на основании ошибок старого и полученного с того момента мной опыта:

  • Клиент делится на две независимых части:

    • 1. Sender (I) читает микрофон и отправляет на адреса получателей из списка

    • 2. Receiver (O) получает все аудиоданные и выводит их в наушники

  • Ты можешь отправить свой голос кому угодно, но не можешь случайно подслушать чужой.

  • Должно быть удобство использования при большом количестве пользователей

  • Рассчитываю на подобную схему:

    • Ты слышишь всё, что тебе приходит (но желательно иметь возможность отключать звук конкретных пользователей aka черный список)

    • Ты можешь отправить свой голос кому угодно, если знаешь его IP адрес.

TCP или UDP?

Интересующие меня в данном случае характеристики:

  • TCP:

    • + Сразу понятно как суммировать аудиопоток от множества людей - просто накладывать данные со всех сессий.

    • - Есть необходимость создавать по сессии с каждым участником - не удобно когда их много.

  • UDP:

    • + Не нужно создавать сессии (просто кидаешь данные напрямую по адресу)

    • - При получении данных не известно от какого кол-ва участников они приходят (все вперемешку)

UDP более схож с моим концептом и удобен когда участников много - его и реализуем.

Надёжный план работы voicechata через UDP
Надёжный план работы voicechata через UDP

Поскольку мы настроены серьёзно, наш проект будет солидно использовать venv.

Подготовка среды

mkdir pycord
cd pycord
python -m venv venv
venv\scripts\activate

pip install numpy
pip install pyaudio
pip install soundfile

Cоздаем папку, окружение, создаем один файлик PyCord.py.

Спойлер

Серьёзно, никаких, api, тестов — фулстак вам пухом.

Импортируем то, что точно понадобится.

import io
import pyaudio
import socket
import numpy as np
import soundfile as sf
from threading import Thread

Подготавливаем IO для аудиоданных.

Pyaudio — самая изящная библиотека работы со звуковыми устройствами, которую я только встречал. Создаешь один объект с буферами ввода и вывода — пишешь, читаешь, все интуитивно понятно.

CHUNK=2048
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
p = pyaudio.PyAudio()

stream=p.open(format=FORMAT, # Этот объект будет обеспечивать нам IO голоса
            channels=CHANNELS,
            rate=RATE,
            input=True,    
            output=True,
            frames_per_buffer=CHUNK)

Сжатие аудио кодеком

Я импортировал soundfile - библиотеку для работы с файлами MP3, FLAC, WAV, OGG и другими. Можно было бы так всё и оставить: для кодировки писать в файл нужного типа, пересылать его, а потом читать из файла заданного типа. Но это путь безумия.

Работа с файлами чревата работой с памятью на жестком диске - но мы же не хотим что бы весь аудиопоток гнался через диск, верно?

Для этого я реализовал энкодер и дэкодер, которые подсовывают soundfile io.BytesIO - ведь это file like object.

def RAW_2_OGG(raw_chunk):
  byte_io = io.BytesIO()
  signal = np.frombuffer(raw_chunk,dtype=np.float32)
  

  sf.write(byte_io, signal, RATE,format='OGG') 
  
  return bytes(byte_io.getbuffer( ))


def OGG_2_RAW(ogg_chunk):
  byte_io = io.BytesIO()
  byte_io.write(ogg_chunk)
  byte_io.seek(0)
  
  data, samplerate = sf.read(byte_io)
  
  return np.float32(data)

Soundfile сжимает голос в ~4 раза неявно используя Vorbis кодек (брат Opus, используемого в Discord)

На самом деле, чем больше размер одного чанка, тем эффективнее сжатие, но при этом растёт задержка.

Функции выглядят не сильно громоздко, верно? Но на их реализацию у меня ушло более 6 часов экспериментов - как на всё остальное вместе взятое. Но четырёхкратное сжатие того стоило.

У аудиоданных есть огромное количество представлений, а в документациях пишут что-то уровня "raw audio" или "numpy array"... дальше гадай сам.

Sender (Audio stream -> IPs from list)

Псевдокод:

  1. Читаем кусок данных (далее чанк) из аудиопотока.

  2. Отправляем каждому из списка clients, полученному из get_clients() — об этом далее.

  3. Повторяем до бесконечности пункты 1 и 2.

def Sender():
    UDP_PORT = 6006

    sock = socket.socket(socket.AF_INET,
                        socket.SOCK_DGRAM)
    while True:
        clients=get_clients() # Реализуем дальше
        data = stream.read(CHUNK, exception_on_overflow = False)
        data=RAW_2_OGG(data)
        for addr in clients:
            sock.sendto(data, (addr, UDP_PORT))

Receiver (Internet -> Audio stream)

Есть один нюанс, recvfrom блокирует выполнение до получения чанка данных, в диалоге двух человек проблем не будет:

def Receiver():
    me = "127.0.0.1"
    UDP_PORT = 6006

    sock = socket.socket(socket.AF_INET,
                        socket.SOCK_DGRAM)
    sock.bind((me, UDP_PORT))

    while True:
        num = get_num() # Не реализуем дальше (изменю идею)
        for i in range(num):
            new = sock.recvfrom(CHUNK*10)[0]
            new=OGG_2_RAW(d)
            if i == 0:
                data = new.copy()
            else:
                data += new
        if num != 0:
            print(data.shape)
            stream.write(data.tobytes())
  • человек №1 записал чанк за 1/n секунды и отправил его.

  • человек №2 получил его и воспроизводит в течение 1/n секунды.

Но, если принимать чанки сразу от двух человек:

  • человек №1 записал чанк №1 за 1/n секунды и отправил его

  • человек №2 записал чанк №2 за 1/n секунды и отправил его

  • человек №3 получает их последовательно и воспроизводит в течение 2*1/n секунды

  • Получается, что человек №3 тормоз! Его буфер получения растёт, он должен уметь воспроизводить чанки одновременно.

В случае с аудиоданными всё просто: суммируем их как массивы numpy.

Для решения этой проблемы отключаем блокировку recvfrom так: sock.setblocking(0)

  1. Суммируем чанки пока не выпадает исключение о пустоте буфера.

  2. Полученный наложенный звук с чистой совестью пишем в аудиопоток.

def Receiver():
    me = "127.0.0.1"
    UDP_PORT = 6006

    sock = socket.socket(socket.AF_INET,
                        socket.SOCK_DGRAM)
    sock.setblocking(0)
    sock.bind((me, UDP_PORT))


    while True:
        c=0
        first=True
        while True:
            blacklist=get_black() # Реализуем дальше
            try:
                d,addr=sock.recvfrom(CHUNK*10)
                if addr[0] in blacklist:
                    continue
                new=OGG_2_RAW(d)
                c+=1
                if first:
                    data=new.copy()
                    first=False
                else:
                    data+=new
            except:
                break
        if not first:
            stream.write(data.tobytes())

Как вы могли заметить, функции используют get_black() для ЧС и get_clients() для списка адресов отправки своего голоса

Getters & setters для динамического изменения clients, blacklist

black,clients=[],[]

def set_black(s):
    global black
    black = s

    
def get_black():
    global black
    return black

    
def set_clients(s):
    global clients
    clients = s

    
def get_clients():
    global clients
    return clients

Как способ общения с Reciver и Sender

sender = Thread(target=Sender, args=())
sender.start()

recv = Thread(target=Receiver, args=())
recv.start()

Обе функции запущены как потоки, и теперь работают автономно, как теперь контролировать их поведение?

set_black(["192.168.0.1"]) # Изменится количество принимаемых аудиопотоков на 2
set_clients(["127.0.0.1","192.168.0.15"]) #Теперь аудио отправляется на оба адреса

Подобная схема была выбрана, для удобства реализации взаимодействия с интерфейсом.

О нём сейчас и поговорим.

UI

Можно было бы идти по знакомой дорожке, именуемой Tk, или схожей, но чуть менее популярной Qt, можно было даже упороться в стиль с DearPyGui, но это всё заезжено и банально.

И тут я подумал: А почему бы не сделать веб-интерфейс?. Первое что пришло в голову - Gradio.

Просто было интересно сделать что-то с веб фронтом, а запариваться с отдельным фронтэндом на JS и даже api не уместно, если делаешь что-то десктопное вроде моего текущего клиента.

За следующие пару минут я не передумал - значит настрой серьёзен как никогда.

pip install gradio
import gradio as gr
Спойлер

Думаю, всё получиться, так ещё и возможность управления PyCord c телефона или другого устройства, по веб майке Gradio.

Gradio позволяет конструировать webui для своего backendа прямо в его коде, при помощи интуитивно понятного конструктора.

WebUI будет иметь две вкладки:

  • Отправлять

    • Слайдер выбора кол-ва адресов

    • Динамическое количество полей ввода под эти адреса

  • Чёрный список

    • То же самое для конфигурации чёрного списка

import gradio as gr

def variable_outputs(k):
    k = int(k)
    return [gr.Textbox.update(visible=True)]*k + [gr.Textbox.update(visible=False)]*(max_textboxes-k)

  
max_textboxes = 10
with gr.Blocks() as demo:
    gr.Markdown("PyCord")

    with gr.Tab("Отправлять"):
        snd = gr.Slider(0, max_textboxes, value=10, step=1, label="Адреса получателей")
        textboxes = []
        for i in range(max_textboxes):
            t = gr.Textbox(f"127.0.0.1")
            textboxes.append(t)
        snd.change(variable_outputs, snd, textboxes)
        button = gr.Button("Начать отправку")
        button.click(lambda *s:set_clients(s[:s[-1]]),textboxes+[snd])

    with gr.Tab("Чёрный список"):
        snd2 = gr.Slider(0, max_textboxes, value=10, step=1, label="Блокировать входящие с адресов")
        textboxes2 = []
        for i in range(max_textboxes):
            t = gr.Textbox(f"127.0.0.1")
            textboxes2.append(t)
        snd2.change(variable_outputs, snd2, textboxes2)
        button2 = gr.Button("БАН")
        button2.click(lambda *s:set_black(s[:s[-1]]),textboxes2+[snd2])

"Gradio is the fastest way to demo your machine learning model with a friendly web interface so that anyone can use it, anywhere!" - я знал на что шёл, честно

Sender
Sender
Receiver
Receiver

Делать Webui к этому велосипеду была дикость, знаю. Зато можно управлять клиентом на пк из браузера на телефоне!

как кроссплатформенность - но нет
как кроссплатформенность - но нет

Напоминаю, что клиент всё еще работает локально на ПК, а это «интерфейс на вынос».

На телефоне интерфейс даже не выглядит пустым.

Но есть одна проблема, которая перекрывает всё удобство использования gradio интерфейсов на телефоне — это случайные нажатия.

Ты просто проводишь пальцем мимо слайдера/кнопки и они могут на это среагировать... Но ощущается это только тогда, когда интерфейс не вмещается в один экран телефона, и ты вынужден перематывать его свайпами.

Всё работает, но

Но кажется я понял — почему gradio не используют в подобных кейсах: оно очень, очень медленное

Оно долго обновляет фреймы, оно очень долго стартует. Этот интерфейс максимально неотзывчивый, он ловит случайные нажатия при свайпах на телефоне. При всём этом переделать под Tk не составит труда (изначальная версия была именно под него, но потом я решил сделать нечто необычное).

Изначальный прототип на TCP
Полный код текущего проекта
import io
import pyaudio
import soundfile as sf
import sys

import socket
import numpy as np

from threading import Thread
import gradio as gr

def RAW_2_OGG(raw_chunk):
  byte_io = io.BytesIO()
  signal = np.frombuffer(raw_chunk,dtype=np.float32)
  old=sys.getsizeof(raw_chunk)

  sf.write(byte_io, signal, RATE,format='OGG') 
  b=bytes(byte_io.getbuffer( ))
  n=sys.getsizeof(b)
  print(n/old)
  return b


def OGG_2_RAW(ogg_chunk):
  byte_io = io.BytesIO()
  byte_io.write(ogg_chunk)
  byte_io.seek(0)
  
  data, samplerate = sf.read(byte_io)
  
  return np.float32(data)


CHUNK=4096
FORMAT = pyaudio.paFloat32
CHANNELS = 1
RATE = 16000
p = pyaudio.PyAudio()

stream=p.open(format=FORMAT,
            channels=CHANNELS,
            rate=RATE,
            input=True,
            output=True,
            frames_per_buffer=CHUNK)




black,clients=[],[]

def set_black(s):
    global black
    black=s

def get_black():
    global black
    return black

def set_clients(s):
    global clients
    clients=s

def get_clients():
    global clients
    return clients

def Sender():
    UDP_PORT = 6006

    sock = socket.socket(socket.AF_INET,
                        socket.SOCK_DGRAM)
    while True:
        clients=get_clients()
        data = stream.read(CHUNK, exception_on_overflow = False)
        data=RAW_2_OGG(data)
        #print(len(data))
        for addr in clients:
            sock.sendto(data, (addr, UDP_PORT))



def Receiver():
    me = "127.0.0.1"
    UDP_PORT = 6006

    sock = socket.socket(socket.AF_INET,
                        socket.SOCK_DGRAM)
    sock.setblocking(0)
    sock.bind((me, UDP_PORT))


    while True:
        c=0
        first=True
        while True:
            blacklist=get_black()
            try:
                d,addr=sock.recvfrom(CHUNK*10)
                if addr[0] in blacklist:
                    continue
                new=OGG_2_RAW(d)
                #new=np.frombuffer(d,dtype=np.uint16) # buffer size is 1024 bytes
                c+=1
                if first:
                    data=new.copy()
                    first=False
                else:
                    data+=new
            except:
                break
        if not first:
            #print(c)
            stream.write(data.tobytes())




def variable_outputs(k):
    k = int(k)
    return [gr.Textbox.update(visible=True)]*k + [gr.Textbox.update(visible=False)]*(max_textboxes-k)


max_textboxes=10
with gr.Blocks() as demo:
    gr.Markdown("# PyCord")
    with gr.Tab("Отправлять"):
        snd = gr.Slider(0, max_textboxes, value=10, step=1, label="Адреса получателей")
        textboxes = []
        for i in range(max_textboxes):
            t = gr.Textbox(f"127.0.0.1")
            textboxes.append(t)
        snd.change(variable_outputs, snd, textboxes)
        button = gr.Button("Начать отправку")
        button.click(lambda *s:set_clients(s[:s[-1]]),textboxes+[snd])

    with gr.Tab("Чёрный список"):
        snd2 = gr.Slider(0, max_textboxes, value=10, step=1, label="Блокировать входящие с адресов")
        textboxes2 = []
        for i in range(max_textboxes):
            t = gr.Textbox(f"127.0.0.1")
            textboxes2.append(t)
        snd2.change(variable_outputs, snd2, textboxes2)
        button2 = gr.Button("БАН")
        button2.click(lambda *s:set_black(s[:s[-1]]),textboxes2+[snd2])
    


sender = Thread(target=Sender, args=())
sender.start()

recv = Thread(target=Receiver, args=())
recv.start()

demo.launch()

Finally

Никому не советую использовать gradio не по назначению — вот и весь вывод.

Спасибо за прочтение, надеюсь вы смогли найти что‑то интересное в этой статье, раз уж дочитали до сюда. Критика и идеи ожидаю в комментариях, на этом у меня всё, всем понятных сокетов и добра.

Теги:
Хабы:
Всего голосов 1: ↑1 и ↓0+1
Комментарии11

Публикации

Истории

Работа

Python разработчик
196 вакансий
Data Scientist
93 вакансии

Ближайшие события

27 августа – 7 октября
Премия digital-кейсов «Проксима»
МоскваОнлайн
28 сентября – 5 октября
О! Хакатон
Онлайн
3 – 18 октября
Kokoc Hackathon 2024
Онлайн
10 – 11 октября
HR IT & Team Lead конференция «Битва за IT-таланты»
МоскваОнлайн
25 октября
Конференция по росту продуктов EGC’24
МоскваОнлайн
7 – 8 ноября
Конференция byteoilgas_conf 2024
МоскваОнлайн