Какое-то время назад Хабре проскакивала заметка про клиент-парсер сайтиков на Питоне. Автор на этом примере разбирал проблемы многопоточных сетевых приложений.
Но мне показалось, что ту же задачу (вернее, ее главную часть — параллельные соединения c http-cервером) вполне можно эффективно решить и без потоков.
Заглянув, для начала, в свою статейку про Twisted и Tornado, почесав затылок и накопавшись в документации по неблокирующим сокетам, я набросал асинхронныйсервер http-клиент.
Ниже — исходник ключевой части приложения с пояснениями:
import socket
import select
import sys
import errno
import time
from config import *
def ioloop(ip_source, request_source):
""«Асинхронный цикл собственной персоной
ip_source — бесконечный iterable, выдающий ip-адреса для соединений ;
request_source — iterable, генерирующий тела запросов;
»""
starttime = time.time()
# открываем пул сокетов; словари, хранящие соединения тела запросов и ответов
epoll = select.epoll()
connections = {}; responses = {}; requests = {}
bytessent = 0.0
bytesread = 0.0
timeout = 0.3
# выбираем первый запрос
request = request_source.next()
try:
while True:
# проверяем число соединений, если их меньше минимально
# возможного и остались запросы — добавляем еще одно.
#
connection_num = len(connections)
if connection_num<CLIENT_NUM and request:
ip = ip_source.next()
print «Opening a connection to %s.» % ip
clientsocket = socket.socket(socket.AF_INET,
socket.SOCK_STREAM)
# Несколько нетривиально. Неблокирующий сокет выбрасывает
# исключение-ошибку EINPROGRESS, если не может сразу соединиться сразу.
# Игнорируем ошибку и начинаем ждать события на сокете.
#
clientsocket.setblocking(0)
try:
res = clientsocket.connect((ip, 80))
except socket.error, err:
#
if err.errno != errno.EINPROGRESS:
raise
# Вносим сокет в пул и словарь соединений
epoll.register(clientsocket.fileno(), select.EPOLLOUT)
connections[clientsocket.fileno()] = clientsocket
requests[clientsocket.fileno()] = request
responses[clientsocket.fileno()] = ""
# «Пулинг» — то есть сбор событий
#
events = epoll.poll(timeout)
for fileno, event in events:
if event & select.EPOLLOUT:
# Посылаем часть запроса...
#
try:
byteswritten = connections[fileno].send(requests[fileno])
requests[fileno] = requests[fileno][byteswritten:]
print byteswritten , «bytes sent.»
bytessent += byteswritten
if len(requests[fileno]) == 0:
epoll.modify(fileno, select.EPOLLIN)
print «switched to reading.»
except socket.error, err:
print «Socket write error: „, err
except Exception, err:
print “Unknown socket error: „, err
elif event & select.EPOLLIN:
# Читаем часть ответа...“
#
try:
bytes = connections[fileno].recv(1024)
except socket.error, err:
# Вылавливаем ошибку „connection reset by peer“ —
#случается при большом числе соединений
#
if err.errno == errno.ECONNRESET:
epoll.unregister(fileno)
connections[fileno].close()
del connections[fileno]
print »Connection reset by peer."
continue
else:
raise err
print len(bytes) , «bytes read.»
bytesread += len(bytes)
responses[fileno] += bytes
if not bytes:
epoll.unregister(fileno)
connections[fileno].close();
del connections[fileno]
print «Done reading...Closed.»
# выбираем следующий запрос
if request:
request = request_source.next()
print «Connections left: „, len(connections)
if not len(connections):
break
except KeyboardInterrupt:
print “Looping interrupted by a signal.»
for fd, sock in connections.items():
sock.close()
epoll.close()
endtime = time.time()
timespent = endtime - starttime
return responses, timespent, bytesread, bytessent
Мораль тут простая — не всюду следует пихать потоки, более того, существуют ситуации, когда многопоточность только снизит надежность программы, создаст известные проблемы в тестировании и станет источником неуловимых багов. Если некритична производительность, но очень хочется что-то распараллелить, то часто вполне оправдывают себя даже обычные процессы и примитивный IPC.
Кроме того, в Питоне все равно не существует настоящих потоков уровня ядра, а здравствует и по сей день треклятый GIL. Соответственно, никаких преимуществ в производительности на многоядерных процессорах получить нельзя.
Данный скрипт, конечно, жутковато и на скорую руку исполнен, не обрабатывает обрывы соединения сервером и ошибки на операциях чтения/записи в сокет, не разбирает ответы сервера, но зато тащит многократно корень сайта cnn.com на пределе возможностей моего канала — 800-1000 Кб/с. :)
Целиком исходники скрипта можно найти где-то тут
PS Может, у кого есть мысли, для чего можно использовать производительные
асинхронные клиенты? :)
Но мне показалось, что ту же задачу (вернее, ее главную часть — параллельные соединения c http-cервером) вполне можно эффективно решить и без потоков.
Заглянув, для начала, в свою статейку про Twisted и Tornado, почесав затылок и накопавшись в документации по неблокирующим сокетам, я набросал асинхронный
Ниже — исходник ключевой части приложения с пояснениями:
import socket
import select
import sys
import errno
import time
from config import *
def ioloop(ip_source, request_source):
""«Асинхронный цикл собственной персоной
ip_source — бесконечный iterable, выдающий ip-адреса для соединений ;
request_source — iterable, генерирующий тела запросов;
»""
starttime = time.time()
# открываем пул сокетов; словари, хранящие соединения тела запросов и ответов
epoll = select.epoll()
connections = {}; responses = {}; requests = {}
bytessent = 0.0
bytesread = 0.0
timeout = 0.3
# выбираем первый запрос
request = request_source.next()
try:
while True:
# проверяем число соединений, если их меньше минимально
# возможного и остались запросы — добавляем еще одно.
#
connection_num = len(connections)
if connection_num<CLIENT_NUM and request:
ip = ip_source.next()
print «Opening a connection to %s.» % ip
clientsocket = socket.socket(socket.AF_INET,
socket.SOCK_STREAM)
# Несколько нетривиально. Неблокирующий сокет выбрасывает
# исключение-ошибку EINPROGRESS, если не может сразу соединиться сразу.
# Игнорируем ошибку и начинаем ждать события на сокете.
#
clientsocket.setblocking(0)
try:
res = clientsocket.connect((ip, 80))
except socket.error, err:
#
if err.errno != errno.EINPROGRESS:
raise
# Вносим сокет в пул и словарь соединений
epoll.register(clientsocket.fileno(), select.EPOLLOUT)
connections[clientsocket.fileno()] = clientsocket
requests[clientsocket.fileno()] = request
responses[clientsocket.fileno()] = ""
# «Пулинг» — то есть сбор событий
#
events = epoll.poll(timeout)
for fileno, event in events:
if event & select.EPOLLOUT:
# Посылаем часть запроса...
#
try:
byteswritten = connections[fileno].send(requests[fileno])
requests[fileno] = requests[fileno][byteswritten:]
print byteswritten , «bytes sent.»
bytessent += byteswritten
if len(requests[fileno]) == 0:
epoll.modify(fileno, select.EPOLLIN)
print «switched to reading.»
except socket.error, err:
print «Socket write error: „, err
except Exception, err:
print “Unknown socket error: „, err
elif event & select.EPOLLIN:
# Читаем часть ответа...“
#
try:
bytes = connections[fileno].recv(1024)
except socket.error, err:
# Вылавливаем ошибку „connection reset by peer“ —
#случается при большом числе соединений
#
if err.errno == errno.ECONNRESET:
epoll.unregister(fileno)
connections[fileno].close()
del connections[fileno]
print »Connection reset by peer."
continue
else:
raise err
print len(bytes) , «bytes read.»
bytesread += len(bytes)
responses[fileno] += bytes
if not bytes:
epoll.unregister(fileno)
connections[fileno].close();
del connections[fileno]
print «Done reading...Closed.»
# выбираем следующий запрос
if request:
request = request_source.next()
print «Connections left: „, len(connections)
if not len(connections):
break
except KeyboardInterrupt:
print “Looping interrupted by a signal.»
for fd, sock in connections.items():
sock.close()
epoll.close()
endtime = time.time()
timespent = endtime - starttime
return responses, timespent, bytesread, bytessent
Мораль тут простая — не всюду следует пихать потоки, более того, существуют ситуации, когда многопоточность только снизит надежность программы, создаст известные проблемы в тестировании и станет источником неуловимых багов. Если некритична производительность, но очень хочется что-то распараллелить, то часто вполне оправдывают себя даже обычные процессы и примитивный IPC.
Кроме того, в Питоне все равно не существует настоящих потоков уровня ядра, а здравствует и по сей день треклятый GIL. Соответственно, никаких преимуществ в производительности на многоядерных процессорах получить нельзя.
Данный скрипт, конечно, жутковато и на скорую руку исполнен, не обрабатывает обрывы соединения сервером и ошибки на операциях чтения/записи в сокет, не разбирает ответы сервера, но зато тащит многократно корень сайта cnn.com на пределе возможностей моего канала — 800-1000 Кб/с. :)
Целиком исходники скрипта можно найти где-то тут
PS Может, у кого есть мысли, для чего можно использовать производительные
асинхронные клиенты? :)