Pull to refresh

Основы работы с потоками в языке Python

Python *

Предисловие


Данную статью я затеял написать после учащающихся вопросов как на форуме так и вопросов в icq на тему многопоточности в CPython. Проблема людей, которые их задают происходит, в основном, из незнания или непонимания основных принципов работы многопоточных приложений. По крайней мере, это относится к используемой мной модели многопоточности, которая носит название Thread Pool (Пул потоков). Часто встречаемой проблемой является и другое: люди не имеют элементарных навыков работы со стандартными модулями CPython. В статья я постараюсь привести примеры такого незнания, не останавливаясь на личностях, так как это по моему скромному мнению неважно. Исходя из условий, в которых пишется эта статья, то мы немного затронем и работу через proxy серверы (не путать с SOCKS).

Детали


Статья писалась в то время когда последними версиями CPython были: 2.6.2 для второй ветки и 3.1.1 для третьей ветки. В статье используются новоявленные в CPython 2.6 with statements, следовательно при желании ипользования этого кода в более ранних версиях придется переделывать код на своё усмотрение. В процессе написания данной статьи использовались только стандартные модули, доступные «из коробки». Также, исходя из того, что я являюсь не профессиональным программистом, а самоучкой, то прошу извинения у уважаемой аудитории за возможные неточности относительно трактования тех или иных понятий. Поэтому, приглашаю вас задавать вопросы, на которые я по возможности буду отвечать.

Итак, приступим, фактически, то что я собрался описывать впервые было порекомендовано уважаемым lorien с python.su (правда в его примере Queue вообще в отдельном потоке обрабатывалось :)), не уверен что он автор продемонстрированного им концепта, но впервые я увидел это опубликованным именно от него, и являет собой даже скорее не Thread Pool, а Task Pool (хотя возможно я и не прав в трактовании сего термина).
Что представляет собой многопоточное приложение? Это приложение, в котором определенное количество потоков выполняют некие задачи. Беда многих в том, что они не до конца улавливают то, что потоки действуют отдельно друг от друга до тех пор, пока активен главный поток. Лично я стараюсь писать таким образом, чтобы это им не мешало, но об этом позже. Также их проблемой является так называемый «индусский» код, который просто и бездумно откуда-то копируется, а программа доводится до уровня «лишь бы работало». Господа, усвойте раз и навсегда: если вы не понимаете, как работает тот или иной участок вашей программы, то перепишите его так, чтобы это было понятно ВАМ, если в будущем вы дорастете до понимания тех вещей, которые вы предполагали бездумно скопировать, то вам без проблем можно будет использовать этот код. Главным является именно ВАШЕ понимание того, как работает ваше творение.
Затронем проблему отдельной работы потоков. Господа, взаимодействие потоков стоит продумывать до того как вы начинаете писать приложение, а не когда вы его уже написали. В принципе, если придерживаться некоторых правил работы с исходным кодом приложения, то переделывание программы из однопоточной в многопоточную происходит легко, безболезненно, и быстро.
Касательно активности главного потока. Когда, как вам кажется, вы запускаете ОДИН поток, фактически работает уже ДВА потока. Нужно понимать, что количество потоков, активных в данный момент равняется количеству потоков, запущенных в данный момент вами +1 поток, в котором работает основное тело приложения. Лично я стараюсь писать таким образом, чтобы четко отделять основной поток от запущенных мной. Если этого не делать, то возможно преждевременное (как вам кажется) завершение работы приложения, хотя на самом деле приложение отработает именно так, как вы его написали.
Вроде на словах понятно, теперь приступаем к практике. На практике в CPython есть такое понятние как GIL (Global Interpreter Lock). Под сим подразумевается глобальная блокировка интерпритатора в тот момент когда потоки вашего приложения обращаются к процессору. Фактически, в каждый отдельно взятый момент с процессором работает только один поток. В связи с этим максимальное количество потоков, которое вообще можно запустить в стандартном CPython колеблется в районе 350 штук.
В качестве примера будет сделана попытка реализовать многопоточный парсер www.google.com. Как я уже написал выше, для работы будут использованы исключительно стандартные модули, для выполнения задачи понадобятся модули urllib2, urllib, queue, threading, re.

По порядку:
#==================<Имортирование необходимых модулей>==================
import urllib2
#Модуль для работы с протоколом HTTP, высокоуровневый
import urllib
#Модуль для работы с протоколом HTTP, более низкоуровневый чем urllib2, 
#фактически из него необходима одна функция - urllib.urlquote
from Queue import Queue
#Модуль, который представляет собой "Pool", фактически это список, в 
#котором на нужных местах вставлены замки таким образом, чтобы к нему 
#одновременно мог обращаться только один поток
import threading
#Модуль для работы с потоками, из него понадобится только 
#threading.active_count, threading.Thread, threading.Thread.start, 
#threading.Rlock
import re
#Модуль для работы с регулярными выражениями, его использование выходит
#за пределы статьи
import time 
#Модуль для работы со временем, из него нужна только функция sleep
queue = Queue()
#Обязательное присваивание, нужно делать именно так (т.е. импортировать
#класс Queue из модуля Queue и инициализировать его)
#==================</Имортирование необходимых модулей>=================

#==============================<Настройки>==============================
PROXY = "10.10.31.103:3128"
#Во время написания статьи сижу за прокси-сервером, поэтому в статье 
#затрагивается и этот вопрос, этой строкой обьявляется глобальная
#переменная PROXY, в которой находится адрес прокси-сервера. Для работы 
#напрямую необходимо указать значение None
HEADERS = {"User-Agent" : "Opera/9.64 (Windows NT 5.1; U; en) Presto/2.1.1",
           "Accept" : "text/html, application/xml;q=0.9, application/xhtml+xml, image/ png, image/jpeg, image/gif, image/x-xbitmap, */*;q=0.1",
           "Accept-Language" : "ru,uk-UA;q=0.9,uk;q=0.8,en;q=0.7",
           "Accept-Charset" : "iso-8859-1, utf-8, utf-16, *;q=0.1",
           "Accept-Encoding" : "identity, *;q=0",
           "Connection" : "Keep-Alive"}
#Для того чтобы получить страницу с www.google.com НЕОБХОДИМО использовать
#заголовки браузера, они представлены выше в ассоциативном массиве HEADERS, 
#соответствуют реальным заголовкам браузера Opera с маленько модификацией, эти 
#заголовки означают что клиент не может принимать zlib compressed data, т.е. 
#сжатые данные - не хотел я заморачиваться еще и с разархивироанием страниц, тем 
#более что не все сайты их сжимают...
THREADS_COUNT = 10
#В принципе это все настройки приложения, это-количество потоков
DEEP = 30
#Это - значение, которое отвечает за глубину страниц поиска, которые 
#нужно просматривать, фактически же определяет собой количество ссылок, 
#которые будут собраны сборщиком.
ENCODING = "UTF-8"
#Кодировка ваших файлов (для загрузки данных из файла с запросами и 
#последующего их перевода в юникод)
#==============================</Настройки>===================================

LOCK = threading.RLock()
# Вот тут то впервые и затрагивается модуль threading
#создается обьект LOCK, который представляет собой класс threading.RLock из
#модуля threading, это -простейший замок, который запрещает исполнение 
#несколькими потоками участка кода который идет после вызова его метода 
#acquire() Основным отличием threading.RLock от threading.Lock (тоже класс из 
#модуля threading) является то, что каждый поток может обращаться к обьекту 
#threading.RLock неограниченное количество раз, обьект threading.Lock может 
#вызываться каждым потоком только единожды.



Основным принципом для беспроблемной реализации многопоточности я считаю модульность кода. Не обязательно выносить используемые Вами функции в отдельные файлы или классы, достаточно чтобы хотя бы это были отдельные функции (Прошу простить за каламбур).

Сейчас я выделю работу потока в отдельную функцию, пускай она будет представлять собой некое абстрактное понятие работы. На данном этапе пока что некоторые моменты будут непонятны, но позже все это выстроится в понятный алгоритм.

def worker():
# Обьявление функции worker, входных аргументов нет
    global queue
    #Здесь и далее я буду обьявлять функции из глобального пространства 
    #имен в локальном для лучшей читабельности кода, хотя в написании
    #софта такое делать строго не рекомендую (!)
    while True:
    #Запуск бесконечного цикла, в котором будет происходить работа
        try:
        #Обработка ошибок, блок try/except, когда обработается
        #ошибка QueueEmpty это значит, что список задач пуст, и поток 
        #должен завершить свою работу
            target_link =  queue.get_nowait() 
            #Эта строчка олицетворяет собой получение задачи потоком из
            #списка задач queue
        except Exception, error:
        #сам перехват ошибки
            return
            #Завершение работы функции
        parsed_data = get_and_parse_page(target_link)
        #Позже будет реализована функция, которая будет получать 
        #страницу и доставать из нее необходимые значения
        if parsed_data != "ERROR":
        #Проверка на то, была ли получена страница
            write_to_file(parsed_data)
            #Также будет реализована функция для записи собранных данных в файл
        else:
            queue.put(target_link)
            #Если страница не была получена, то забрасываем ее обратно в queue


Главное, что нужно четко усвоить — это алгоритм работы самого потока, и что именно потоки должны обрабатывать независимо друг от друга. Итого, задачи потока очень просты — получить ссылку на страницу поиска, передать ее в функцию-обработчик, из которой вернутся ссылки на найденные сайты а также title этих сайтов, после записать ссылки и title в файл (все это будет находиться в parsed_data).

По мере работы реализовываются задачи от простейшей к сложной, поэтому на этих этапах практически не затронуты вопросы многопоточности. Далее настала очередь реализации функции, которая будет отвечать за запись в файл.

def write_to_file(parsed_data):
#Обявление функции write_to_file, аргумент –массив данных для записи
    global LOCK
    global ENCODING
    LOCK.acquire()
    #"Накидывание замка", следующий далее участок кода может выполнятся
    #только одним потоком в один и тот же момент времени
    with open("parsed_data.txt""a"as out:
    #Используется with statement, открывается файл parsed_data.txt с
    #правами "a", что означает дозапись в конец файла, и присваиваевается
    #хэндлеру на файл имя out (я так привык)
        for site in parsed_data:
        #Проход циклом по всем элементам parsed data, имя активного в 
        #данный момент элемента будет site
            link, title = site[0], site[1]
            #Присваивание переменным link и title значений из кортежа site
            title = title.replace("<em>""").replace("</em>""").replace("<b>""").replace("</b>""")
            #.replace -это замена HTML-тэгов, которые проскакивают в title и совершено не нужны
            out.write(u"{link}|{title}\n".format(link=link, title=title).encode("cp1251"))
            #Производится сама запись в файл, используется оператор форматирования 
            #строк .format, в отличие от % он поддерживает именованные аргументы, чем я и не 
            #преминул воспользоваться, таким образом в файл пишется строка вида:
            #ссылка на сайт | title страницы\n -символ переноса строки(все это переводится
            #из юникода в cp1251)
    LOCK.release()
    #"Отпирание"  замка, в противном случае ни один из следующих 
    #потоков не сможет работать с этим участком кода. По-хорошему, тут тоже нужно 
    #сделать обработку ошибок, но это учебный пример, да и ошибка там может 
    #возникнуть (после добавки замка в этот участок кода) только если во время
    #работы приложения выставить атрибут “только чтение” для данного пользователя
    #относительно файла parsed_data.txt



Далее идет реализация функции get_and_parse_page:
def get_and_parse_page(target_link):
#Обьявление функции, аргумент – ссылка на страницу
    global PROXY
    #Указывает на то, что в данной функции используется переменная PROXY
    #из глобального пространства имен
    global HEADERS
    #То же и для переменной Headers
    if PROXY is not None:
    #Если значение PROXY не равно None
        proxy_handler = urllib2.ProxyHandler( { "http"""+PROXY+"/" } )
        #Создается Прокси-Хэндлер с указанным прокси
        opener = urllib2.build_opener(proxy_handler)
        #Далее создается opener c созданным ранее Прокси-Хэндлером
        urllib2.install_opener(opener)
        #И наконец-то он устанавливается, теперь нет необходимости в 
        #шаманствах, все запросы в которых будет использоваться urllib2 
        #(в пределах этой функции будут направляться через указанный ранее 
        #PROXY)
    page_request = urllib2.Request(url=target_link, headers=HEADERS)
    #Создается обьект Request, который олицетворяет собой Request instance,
    #фактически это GET запрос к серверу с указанными параметрами, мне 
    #же необходимо использовать заголовки...
    try:
    #Обработка всех возможных ошибок, возникающих во время получения
    #страницы, это нехорошо, но лучше чем полное отсутствие обработки
        page = urllib2.urlopen(url=page_request).read().decode("UTF-8""replace")
        #Переменной page присваиваем прочитанное значение страницы запроса, переведенное 
        #в unicode из кодировки UTF-8 (кодировка, используемая на www.google.com) (в 
        #Python 2.6 unicode -это отдельный тип данных(!))
    except Exception ,error:
    #Сам перехват ошибки и сохранение ее значения в переменную error
        print str(error)
        #Вывод ошибки в консоль, прведварительно переведя ее в строку 
        #(просто на всякий случай)
        return "ERROR"
        #Возврат из функции в том случае, если во время работы возникла ошибка
    harvested_data = re.findall(r'''\<li\ class\=g\>\<h3\ class\=r\>\<a\ href\=\"(.*?)".*?>(.*?)\<\/a\>\<\/h3\>''', page)
    #Сбор со страницы поиска ссылок и title найденных страниц
    #Очистка данных от результатов поиска по блогам, картинкам и др. сервисам гугла
    for data in harvested_data:
    #Для каждого элемента массива harvested_data присвоить ему имя data
        if data[0].startswith("/"):
        #Если нулевой элемент массива data(ссылка) начинается с символа /
            harvested_data.remove(data)
            #Удаляем его из массива harvested_data
        if ".google.com" in data[0]:
        #Если нулевой элемент массива data(ссылка) имеет в себе .google.com
            harvested_data.remove(data)
            #Также удаляем его из массива harvested_data
    return harvested_data
    #Возвращаем собранные значения из функции



Наконец-то дошла очередь до реализации основного тела приложения:
def main():
#Обявление функции, входных аргментов нет
    print "STARTED"
    #Вывод в консоль о начале процесса
    global THREADS_COUNT
    global DEEP
    global ENCODING
    #Обьявляние о том что эти переменные будут использоваться
    #из глобального пространства имен
    with open("requests.txt"as requests:
    #Открываем файл requests в котором находятся запросы к поисковику
         for request in requests:
         #На данном файлхэндлере доступен итератор, поэтому можно 
         #пройтись по файлу циклом, без загрузки файл в оперативку, но это 
         #тоже не важно, я все равно его туда загружу:)
                request = request.translate(None"\r\n").decode(ENCODING, "replace")
                #Очистка запроса от символов конца строки а также их 
                #перевод в юникод (с заменой конфликтных символов)
                empty_link = "www.google.com/search?hl=ru&client=opera&rls=ru&hs=67v&q={request}&start={N}&sa=N"
                #Это пустой адрес страницы поиска, отформатирован
                for i in xrange(0, DEEP, 10):
                #Проход итератором по диапазону #чисел от 0 до DEEP, 
                #который представляет собой максимальную глубину поиска с 
                #шагом в 10, т.е. получаем из этого диапазона только 
                #числа десятков, т.е. 10, 20, 30 (как идет поиск у гугла)
                     queue.put(empty_link.format(request=request.encode("UTF-8"), N=i))
                     #Добавление в очередь каждой сгенерированной ссылки
                     #и перевод её в кодировку UTF-8 (для гугла)
    for _ in xrange(THREADS_COUNT):
    #Проход циклом по диапазону чисел количества потоков
        thread_ = threading.Thread(target=worker)
        #Создается поток, target-имя функции, которая являет собой 
        #участок кода, выполняемый многопоточно
        thread_.start()
        #Вызывается метод start() , таким образом поток запускается
    while threading.active_count() >1:
    #До тех пор, пока количество активных потоков больше 1 (значит, 
    #запущенные потоки продолжают работу)
        time.sleep(1)
        #Основной поток засыпает на 1 секунду
    print "FINISHED"
    #Вывод в консоль о завершении работы приложения



В итоге получаем нормально работающий многопоточный парсер. Естественно с многими минусами, но красиво написанное я задолбусь комментировать.

Код:
Эта статья + исходники: sendspace.com/file/mw0pac
Код с русскими коментариями: dumpz.org/15202
Код с украинскими коментариями: dumpz.org/15201

P.S. Да я знаю, что кому-то этот пример покажется нерациональным использованием Queue (привет, cr0w). Но вот обработку ошибок проще всего делать именно, используя его.
P.P.S. Материал не претендует на непогрешимость. Естественно, тут 100% быдлокод, никакого понимания мной того что я описываю, непонятки с терминами, я-быдлокодер и т.д. и т.п. НО тут есть то, чего вам не пересрать — оно РАБОТАЕТ, причем работает именно так, как от него ожидается, код понятен и откомментирован так, что будет понятно даже младенцу.Надеюсь, что оно хоть кому-то поможет…

© login999
uasc.org.ua
Tags:
Hubs:
Total votes 77: ↑62 and ↓15 +47
Views 58K
Comments Comments 39