Менеджер многопоточных закачек на GO.
http://loafter.github.io/godownloader/
https://github.com/Loafter/godownloader

Давным-давно, в году этак 1998, для выхода в интернет я использовал модем на работе у отца. Он его включал вечером после работы и я мог наслаждаться просторами сети интернет на скорости аж 31.2 кбит/c. В то время не было истеричных блогеров, страницы не весили по мегабайту, а в новостных сайтах говорили только правду. Естественно основной интерес представляли ресурсы. Картинки, программы, всякие дополнения к играм, вроде машинок. Как сейчас помню качать через IE было сущим адом. Скачать файл весом больше 500 кб было просто невозможно, древний осел был намного упрямей.
В то время было очень много всяких менеджеров закачек вроде Getright, Go!zilla, Download Accelerator и, конечно, FlashGet. В то время 90% процентов из них были перегружено рекламным говном, лучшим был FlashGet. Он умел бить на части скачиваемый файл и работал шустро. Как сейчас помню версия 1.7 последняя была. Эту версию я в те времена и использовал.
Прошло 15 лет и мне понадобилось скачать большой объем данных через vpn из-за океана.
И что же изменилось за 15 лет?
Да ничего. По прежнему существуют все те же менеджеры с минимальными изменениями. Даже flashget оставил версию 1.7 вместе с новомодной 3.хх.
После того как wxfast download не сумел скачать 50 гигабайтный файл, я решил попробовать написать свой менеджер закачек, который включал бы в себя множество многопоточных заданий с возможностью контроля степени выполнения, остановки в любой момент, а также сохранения состояния загрузок между запусками приложения. Все это отличный вызов для языка GO.
С чего начать? Первое что нам необходимо это уметь останавливать закачку и получать информацию о ходе скачивания в любой момент времени. В GO есть легковесные потоки, которые можно использовать у нас в программе. То есть как минимум один поток будет у нас для скачивания, другой для управления процессом (остановка, старт закачки, получение информации о ходе выполнения). Если с процессом получения сведений о загруженных данных проблем не возникает (мы можем получать их по ссылке сквозь потоки), то с процессом останоки закачки все немного сложнее, мы не можем остановить или убить другую goroutine. Но мы можем послать ей сигнал на выход из потока. Собственно так мы и сделаем. Реализуем простую обертку, которая бы позволила создавать произвольные дискретные работы с возможностью паузы и получения информации о состоянии работы.
Для того чтобы, обернуть какую-либо структуру необходимо чтобы она подерживала следующий интерфейс:
Сама обертка:
После запуска потока, wgoroute(), в цикле функция пошагово выполняет каждую итерацию вызовом метода DoWork(), В случае, если во время выполнения работы произошла ошибка, функция выходит из цикла и завершает поток. Также в цикле осуществляется выборка из канала.
Если поступило сообщение Stopped, алгоритм выходит из потока и устанавливает соотвествующее состояние.
Воспользуемся встроенными в язык средствами тестирования для проверки работы обертки:
После того как мы создали рабочую обертку для задач, мы можем приступить к основной функции — загрузки данных по http. Наверное основная проблема протокола http — это низкая скорость при загрузки данных в один поток. Именно поэтому, когда интернет был медленный, появилось столько менеджеров закачек, которые умели разбивать загружаемый файл на фрагменты и загружать их в несколько http-соединений, за счет этого достигался выигрыш в скорости. Естественно наш менеджер закачек не исключение, он также должен уметь забирать файл в несколько потоков. Для нормальной работы данной схемы необходимо чтобы сервер поддерживал докачку.
Со стороны клиента должен быть сформирован запрос у которого есть в заголовке поле Range. Весь запрос в сыром виде выглядит вот так:
Первая моя реализация работала крайне медленно. Все дело в том, что на каждую небольшую порцию данных готовился свой запрос, т.е. если необходимо было скачать сегмент от 1 до 2 мегабайтов блоками по 100 кб. это означало, что последовательно было выполнено 10 запросов для каждого блока. Я достаточно быстро понял, что что-то не так.
В программе wireshark я проверил как выполняется закачка другой программой — download master. Правильная схема работы была другая. Если нам нужно скачать 10 сегментов, то сначала готовилось 10 http-запросов на каждый сегмент, а деление на блоки было реализовано последовательным считыванием из блока body в рамках одного http-response.
Обернув класс загрузчика в интерефейс DiscretWork из предыдущей части заметки мы можем попробовать протестировать его работу:
Уже достаточно долгое время все свои сервисы на go я делаю по одной схеме. Как правило, web-интерейс через который пользователь взаимодействует с json-сервисом посредствам http-запросов. Такая схема работы дает ряд преимуществ перед традиционными графическими интерейсами перечисленными ниже.
Обновление интерфейса осуществляется каждые 500 милисекунд. В качестве источника данных для таблицы закачек используется псевдо-файл localhost/progress.json. Если его открыть в браузере, откроются динамически обновляемые json-данные. В качестве компонента таблицы используется jgrid. Благодаря его простоте код занимает совсем немного места.


Есть ещё интересная особенность сервиса о которой я бы хотел рассказать. Это то, как завершается веб-сервис. Дело в том, что в момент запуска http-сервиса, программа зависает на функции start и виснет пока мы не завершим приложение. Но в Go есть возможность подписаться на сигналы посылаемые операционной системой. Таким образом мы можем перехватить момент, когда наш процесс завершается, даже если мы это делаем через команду kill и выполняем какие-либо завершающие действия. К примеру, это сохранение настроек и текущий прогресс закачек.
Существуют много расширений стандартной реализации http-сервиса go позволяющих выполнить какие-либо действия после завершения сервиса. На мой взгляд описанный выше способ наиболее прост и надежен, данный способ работает даже если мы убивает сервис.
В принципе, это, наверное, все, что я хотел донести для читателей.
Не знаю, на сколько актуальным получился менеджер закачек для других, но дистрибутивы и образы виртуальных машин я уже качаю при помощи своего менеджера закачек.
Но контрольные суммы периодически все же проверяю.
Скачать релиз под Mac, Windows, Linux: http://loafter.github.io/godownloader/
Git:https://github.com/Loafter/godownloader
http://loafter.github.io/godownloader/
https://github.com/Loafter/godownloader

Вступление
Давным-давно, в году этак 1998, для выхода в интернет я использовал модем на работе у отца. Он его включал вечером после работы и я мог наслаждаться просторами сети интернет на скорости аж 31.2 кбит/c. В то время не было истеричных блогеров, страницы не весили по мегабайту, а в новостных сайтах говорили только правду. Естественно основной интерес представляли ресурсы. Картинки, программы, всякие дополнения к играм, вроде машинок. Как сейчас помню качать через IE было сущим адом. Скачать файл весом больше 500 кб было просто невозможно, древний осел был намного упрямей.
В то время было очень много всяких менеджеров закачек вроде Getright, Go!zilla, Download Accelerator и, конечно, FlashGet. В то время 90% процентов из них были перегружено рекламным говном, лучшим был FlashGet. Он умел бить на части скачиваемый файл и работал шустро. Как сейчас помню версия 1.7 последняя была. Эту версию я в те времена и использовал.
Прошло 15 лет и мне понадобилось скачать большой объем данных через vpn из-за океана.
И что же изменилось за 15 лет?
Да ничего. По прежнему существуют все те же менеджеры с минимальными изменениями. Даже flashget оставил версию 1.7 вместе с новомодной 3.хх.
После того как wxfast download не сумел скачать 50 гигабайтный файл, я решил попробовать написать свой менеджер закачек, который включал бы в себя множество многопоточных заданий с возможностью контроля степени выполнения, остановки в любой момент, а также сохранения состояния загрузок между запусками приложения. Все это отличный вызов для языка GO.
Обертка
С чего начать? Первое что нам необходимо это уметь останавливать закачку и получать информацию о ходе скачивания в любой момент времени. В GO есть легковесные потоки, которые можно использовать у нас в программе. То есть как минимум один поток будет у нас для скачивания, другой для управления процессом (остановка, старт закачки, получение информации о ходе выполнения). Если с процессом получения сведений о загруженных данных проблем не возникает (мы можем получать их по ссылке сквозь потоки), то с процессом останоки закачки все немного сложнее, мы не можем остановить или убить другую goroutine. Но мы можем послать ей сигнал на выход из потока. Собственно так мы и сделаем. Реализуем простую обертку, которая бы позволила создавать произвольные дискретные работы с возможностью паузы и получения информации о состоянии работы.
Для того чтобы, обернуть какую-либо структуру необходимо чтобы она подерживала следующий интерфейс:
type DiscretWork interface { DoWork() (bool, error) GetProgress() interface{} BeforeRun() error AfterStop() error }
Сама обертка:
func (mw *MonitoredWorker) wgoroute() { log.Println("info: work start", mw.GetId()) mw.wgrun.Add(1) defer func() { log.Print("info: release work guid ", mw.GetId()) mw.wgrun.Done() }() for { select { case newState := <-mw.chsig: if newState == Stopped { mw.state = newState log.Println("info: work stopped") return } default: { isdone, err := mw.Itw.DoWork() if err != nil { log.Println("error: guid", mw.guid, " work failed", err) mw.state = Failed return } if isdone { mw.state = Completed log.Println("info: work done") return } } } } } func (mw *MonitoredWorker) Start() error { mw.lc.Lock() defer mw.lc.Unlock() if mw.state == Completed { return errors.New("error: try run completed job") } if mw.state == Running { return errors.New("error: try run runing job") } if err := mw.Itw.BeforeRun(); err != nil { mw.state = Failed return err } mw.chsig = make(chan int, 1) mw.state = Running go mw.wgoroute() return nil }
После запуска потока, wgoroute(), в цикле функция пошагово выполняет каждую итерацию вызовом метода DoWork(), В случае, если во время выполнения работы произошла ошибка, функция выходит из цикла и завершает поток. Также в цикле осуществляется выборка из канала.
select { case newState := <-mw.chsig: if newState == Stopped { mw.state = newState log.Println("info: work stopped") return }
Если поступило сообщение Stopped, алгоритм выходит из потока и устанавливает соотвествующее состояние.
Воспользуемся встроенными в язык средствами тестирования для проверки работы обертки:
package dtest import ( "errors" "fmt" "godownloader/monitor" "log" "math/rand" "testing" "time" ) type TestWorkPool struct { From, id, To int32 } func (tw TestWorkPool) GetProgress() interface{} { return tw.From } func (tw *TestWorkPool) BeforeRun() error { log.Println("info: exec before run") return nil } func (tw *TestWorkPool) AfterStop() error { log.Println("info: after stop") return nil } func (tw *TestWorkPool) DoWork() (bool, error) { time.Sleep(time.Millisecond * 300) tw.From += 1 log.Print(tw.From) if tw.From == tw.To { fmt.Println("done") return true, nil } if tw.From > tw.To { return false, errors.New("tw.From > tw.To") } return false, nil } func TestWorkerPool(t *testing.T) { wp := monitor.WorkerPool{} for i := 0; i < 20; i++ { mw := &monitor.MonitoredWorker{Itw: &TestWorkPool{From: 0, To: 20, id: rand.Int31()}} wp.AppendWork(mw) } wp.StartAll() time.Sleep(time.Second) log.Println("------------------Work Started------------------") log.Println(wp.GetAllProgress()) log.Println("------------------Get All Progress--------------") time.Sleep(time.Second) wp.StopAll() log.Println("------------------Work Stop-------------------") time.Sleep(time.Second) wp.StartAll() time.Sleep(time.Second * 5) wp.StopAll() wp.StartAll() wp.StopAll() }
Загрузка данных
После того как мы создали рабочую обертку для задач, мы можем приступить к основной функции — загрузки данных по http. Наверное основная проблема протокола http — это низкая скорость при загрузки данных в один поток. Именно поэтому, когда интернет был медленный, появилось столько менеджеров закачек, которые умели разбивать загружаемый файл на фрагменты и загружать их в несколько http-соединений, за счет этого достигался выигрыш в скорости. Естественно наш менеджер закачек не исключение, он также должен уметь забирать файл в несколько потоков. Для нормальной работы данной схемы необходимо чтобы сервер поддерживал докачку.
Со стороны клиента должен быть сформирован запрос у которого есть в заголовке поле Range. Весь запрос в сыром виде выглядит вот так:
GET /PinegrowLinux64.2.2.zip HTTP/1.1 Host: pinegrow.s3.amazonaws.com User-Agent: Go-http-client/1.1 Range: bytes=34010904-42513630
Первая моя реализация работала крайне медленно. Все дело в том, что на каждую небольшую порцию данных готовился свой запрос, т.е. если необходимо было скачать сегмент от 1 до 2 мегабайтов блоками по 100 кб. это означало, что последовательно было выполнено 10 запросов для каждого блока. Я достаточно быстро понял, что что-то не так.
В программе wireshark я проверил как выполняется закачка другой программой — download master. Правильная схема работы была другая. Если нам нужно скачать 10 сегментов, то сначала готовилось 10 http-запросов на каждый сегмент, а деление на блоки было реализовано последовательным считыванием из блока body в рамках одного http-response.
func (pd *PartialDownloader) BeforeDownload() error { //create new req r, err := http.NewRequest("GET", pd.url, nil) if err != nil { return err } r.Header.Add("Range", "bytes="+strconv.FormatInt(pd.dp.Pos, 10)+"-"+strconv.FormatInt(pd.dp.To, 10)) f,_:=iotools.CreateSafeFile("test") r.Write(f) f.Close() resp, err := pd.client.Do(r) if err != nil { log.Printf("error: error download part file%v \n", err) return err } //check response if resp.StatusCode != 206 { log.Printf("error: file not found or moved status:", resp.StatusCode) return errors.New("error: file not found or moved") } pd.req = *resp return nil } …. func (pd *PartialDownloader) DownloadSergment() (bool, error) { //write flush data to disk buffer := make([]byte, FlushDiskSize, FlushDiskSize) count, err := pd.req.Body.Read(buffer) if (err != nil) && (err.Error() != "EOF") { pd.req.Body.Close() pd.file.Sync() return true, err } //log.Printf("returned from server %v bytes", count) if pd.dp.Pos+int64(count) > pd.dp.To { count = int(pd.dp.To - pd.dp.Pos) log.Printf("warning: server return to much for me i give only %v bytes", count) } realc, err := pd.file.WriteAt(buffer[:count], pd.dp.Pos) if err != nil { pd.file.Sync() pd.req.Body.Close() return true, err } pd.dp.Pos = pd.dp.Pos + int64(realc) pd.messureSpeed(realc) //log.Printf("writed %v pos %v to %v", realc, pd.dp.Pos, pd.dp.To) if pd.dp.Pos == pd.dp.To { //ok download part complete normal pd.file.Sync() pd.req.Body.Close() pd.dp.Speed = 0 log.Printf("info: download complete normal") return true, nil } //not full download next segment return false, nil }
Обернув класс загрузчика в интерефейс DiscretWork из предыдущей части заметки мы можем попробовать протестировать его работу:
func TestDownload(t *testing.T) { dl, err := httpclient.CreateDownloader("http://pinegrow.s3.amazonaws.com/PinegrowLinux64.2.2.zip", "PinegrowLinux64.2.2.zip", 7) if err != nil { t.Error("failed: can't create downloader") } errs := dl.StartAll() if len(errs)>0 { t.Error("failed: can't start downloader") } …..wait for finish download }
Интерфейс
Уже достаточно долгое время все свои сервисы на go я делаю по одной схеме. Как правило, web-интерейс через который пользователь взаимодействует с json-сервисом посредствам http-запросов. Такая схема работы дает ряд преимуществ перед традиционными графическими интерейсами перечисленными ниже.
- Возможность в автоматическом режиме добавлять новые закачки;
- Нет привязки к определенной операционной системе, достаточно только браузера.
Обновление интерфейса осуществляется каждые 500 милисекунд. В качестве источника данных для таблицы закачек используется псевдо-файл localhost/progress.json. Если его открыть в браузере, откроются динамически обновляемые json-данные. В качестве компонента таблицы используется jgrid. Благодаря его простоте код занимает совсем немного места.


function UpdateTable() { $("#jqGrid") .jqGrid({ url: 'http://localhost:9981/progress.json', mtype: "GET", ajaxSubgridOptions: { async: false }, styleUI: 'Bootstrap', datatype: "json", colModel: [{ label: '#', name: 'Id', key: true, width: 5 }, ….. { label: 'Speed', name: 'Speed', width: 15, formatter: FormatByte }, { label: 'Progress', name: 'Progress', formatter: FormatProgressBar }], viewrecords: true, rowNum: 20, pager: "#jqGridPager" }); }
Завершение сервиса и сохранение настроек
Есть ещё интересная особенность сервиса о которой я бы хотел рассказать. Это то, как завершается веб-сервис. Дело в том, что в момент запуска http-сервиса, программа зависает на функции start и виснет пока мы не завершим приложение. Но в Go есть возможность подписаться на сигналы посылаемые операционной системой. Таким образом мы можем перехватить момент, когда наш процесс завершается, даже если мы это делаем через команду kill и выполняем какие-либо завершающие действия. К примеру, это сохранение настроек и текущий прогресс закачек.
c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) signal.Notify(c, syscall.SIGTERM) go func() { <-c func() { gdownsrv.StopAllTask() log.Println("info: save setting ", gdownsrv.SaveSettings(getSetPath())) }() os.Exit(1) }()
Существуют много расширений стандартной реализации http-сервиса go позволяющих выполнить какие-либо действия после завершения сервиса. На мой взгляд описанный выше способ наиболее прост и надежен, данный способ работает даже если мы убивает сервис.
В принципе, это, наверное, все, что я хотел донести для читателей.
Не знаю, на сколько актуальным получился менеджер закачек для других, но дистрибутивы и образы виртуальных машин я уже качаю при помощи своего менеджера закачек.
Но контрольные суммы периодически все же проверяю.
Скачать релиз под Mac, Windows, Linux: http://loafter.github.io/godownloader/
Git:https://github.com/Loafter/godownloader
