Pull to refresh

Comments 41

Вообще говоря, ситуацию, когда несколько компонент хотят данные из одного потока данных, обычно разрешают при помощи паттерна Observer. Велосипеды тут совершенно ни к чему, на мой скромный взгляд.
Я сам не люблю велосипеды, и очень стараюсь не создавать их. В случае потребления одного Stream разными компонентами есть два отдельных случая и перекрыть из использованием Observer мне не представляется возможным. Случай один, это когда данные потребляются последовательно, одним компонентом следом за другим. Но есть и другой случай: когда одному компоненту (дочернему) нужна часть данных из тех, которые нужны другому (родительскому).
Я боюсь, что мы сейчас можем начать говорить о разном, но я в своем комментарии предлагал такое решения: из stream читает только один объект-читатель, и прочитанные данные оборачивает в обьекты бизнес логики по мере готовности (причем желательно в отдельном потоке, а оборачивалки можно реализовать в виде отдельных сущностей, которыми населяется объект-читатель при иницилизации) — это решает затруднение 2. Как толкьо объект бизнес логики сформирован — он отдается всем подписавшимся слушателям, и далее эти объекты по иерархии каждого слушателя гуляют как угодно (дублируются, удаляются — это уже от задачи зависит). При этом слушатели изолированы от обьекта читателя Stream, что дает бенефиты как в плане стабильности, так и расширяемости, и это решает проблему 1.
По вашему получается, что единственный объект-читатель потока либо должен сам парсить содержимое и превращать в конкретные сущности, что делает его привязанным к этим конкретным сущностям, либо как то передавать считанные данные в методы парсинга отдельных компонентов. Вот именно тому, как передавать эти данные, и посвящена моя статья.
Согласен, я немного не в ту степь пошел — но ваши комментарии про JPEG внесли ясности (я писал с оглядкой на сетевые потоки). Но тем не менее мне ваш механизм совершенно не нравится.

В вашей модели больше всего смущает то, что в цепочке парсеров кому то необходимо читать уже прочитанное. Если оно прочитано, то почему данные не распарсены, и не превратились в обьект бизнес логики, который можно уже прокинуть, если необходимо, дальше по цепочке парсинга?
В своем решении я предполагаю, что бинарные данные передаются в виде пакетов, сформированных по OSI-like схеме (вообще существующие системные решения, работающие уже десятилетиями — кладезень дизайн-решений). В этом случае необходим только один читатель, который единожды прочтет весь поток без перечитываний, постепенно превращая пакеты в обьекты логики.

Не знаю как в JPEG, но, например в PNG эта схема отлично сработает в следущем ключе — из стрима читает первичный парсер, который встречая чанк парсит только его шапку, почле чего создает обьект Chunk, содержащий распаршенную шапку, и прочитанные бинарные данные этого чанка. Этот объект передается следующему парсеру в зависимости от типа чанка, и так далее… При этом никакие повторные перечитывания из стрима не нужны. Если вы взглянете в исходники кодеров/декодеров популярных форматов, то там, как парвило, реализована подобная схема. Потому что этот дизайн дает большую читабельность, стабильность и возможность легко расширять парсер формата.

При этом согласен, observer в цепочке парсинга, пожалуй, излишество. В этом был не прав:)
Да, такая схема работает с PNG или с любым другим источником одной структуры, а для каждого другого вида структур придётся иметь свой объект Chunk. В такой схеме первичный парсер должен быть жёстко связан со всеми последующими парсерами, что делает невозможным создание универсальных парсеров, пригодных для использования в любых проектах.
А моё решение как раз даёт возможность делать универсальные компоненты-парсеры, которые получают на вход BufferedSource, а также могут делегировать часть парсинга другим независимым компонентам.
А я во втором же комментарии указал, что первичному читателю можно населить коллекцию парсеров (я их там обозвал оборачивалками) реализующих интерфейс, содержащий функции вроде IsMyStructure( сюда передаем сигнатуру структуры) и Parse(). В этом случае на каждой новой структуре из файла читатель просто опрашивает парсеры, кто из них будет это парсить, и дальше уже делегирует работу ему. Это как раз универсальный подход, при этом не требующий от Stream никакой функциональности кроме «прочти N байт», и не требующий перепрочитывания и перемалывания перемолотого.
Будут проблемы с униформностью сигнатур (одному нужно 5 байт, другому — 15).
Ну если в 15байтной сигнатуре по первым пяти байтам можно понять, что эта сигнатура имеет продолжение, то это затруднение вполне разрешимо — и не единственным образом. По моему вкусу у парсеров структур с 15байтными сигнатурами появляется пре-парсер, который по пяти байтам понимает, что это один из тех парсеров, и дальше уже среди внутренней коллекции этих парсеров по оставшимся 10 байтам решает кому отдать. При этом все они реализуют упомянутый интерфейс, и могут быть собраны в самых разных конфигурациях. Получается вполне внятная древовидная структура.

Если же нет — то надо разбираться конкренто, возможно действительно понадобится какой-то костыль.
Вы не могли бы кратко описать сценарии использования вашего интерфейса для случая с одним и двумя потребителями?
Описываю словами типичный сценарий. Я пишу компонент для чтения иерархической структуры (например, mkv-файла), который получая на вход ISourceBuffered, потребляет его, перебирая все встреченные порции. Для каждой порции, в зависимости от её типа, он может создать дочерний ISourceBuffered (который читает данные внутри родительского) и передать его другому компоненту, который, например, парсит двоичное представление JPEG-изображения. Читая и потребляя дочерний источник, JPEG-компонент также потребляет и родительский (они имеют общий буфер и указатель потребления), что избавляет от двойного чтения и буферирования. При этом компоненты совершенно независимы и создаются в разных проектах. Последовательность вызовов можно посмотреть в примере кода в статье. Для одного потребителя мало что отличается от использования Stream, кроме наличия удобной функции проверки количества и дочитывания необходимого.
Я имел в виду не словами, а кодом. Какова именно последовательность вызовов на интерфейсе?
Итак, наш метод получил источник данных в виде ISourceBuffered. Далее, если нам нужен из него некоторый блок данных известного размера, вызываем src.EnsureBuffer(размер_блока). Если размер нам неизвестен, и мы будем смотреть всё что предоставляет источник, то вызываем src.FillBuffer (). После этого мы совершенно произвольно и многократно обращаемся к буферу src.Buffer в пределах ограниченных src.Offset и src.Count. Как только мы потребили некоторые данные из начала буфера, мы их исключаем из дальнейшей обработки вызовом src.SkipBuffer(размер), при этом изменятся свойства src.Offset и src.Count. Далее повторять по необходимости с начала до тех пор, пока не получим все нужные данные либо src.FillBuffer () не вернёт ноль. При этом, в ЛЮБОЙ момент, мы можем создать дочерний SourceBuffered, который будет получать данные из нашего родительского источника и как либо их преобразовывать либо просто ограничивать по размеру. Этот дочерний источник мы можем передать другому компоненту, который будет действовать по этому же сценарию.
Буфер — 4к, источник — 28м. Каждый последующий вызов FillBuffer изменяет содержимое буфера?
Источники могут быть разные, их поведение ограничено контрактом. Контракт гласит, что FillBuffer() должен заполнить буфер насколько возможно больше и вернуть ноль только тогда, когда источник пуст. Сколько именно вернёт FillBuffer() и изменится ли это число при повторных вызовах — не специфицируется и зависит от реализации. Если важно чтобы из источника было доступно какое то количество байт в буфере, то надо пользоваться методом EnsureBuffer(размер), он либо выполнит требования, либо бросит исключение означающее что источник не может предоставить столько данных.
Так, понятно, что ничего не понятно. Давайте пойдем с другой стороны. Вот тривиальный код работы со стримом (пишу по памяти, извините, могут быть несущественные ошибки):

//s - это некий Stream, переданный снаружи
var buf = new byte[4096];
//читаем данные из потока чанками до 4к
while((var count = s.Read(buf, 0, 4096)) > 0)
{
    //пишем полученные данные в БД
    db.WriteData(buf, 0, count);
}


Как эта же задача (прочитать все данные из потока и записать их в БД) решается с помощью ISourceBuffered (иными словами, если sISourceBuffered, то как будет выглядеть код)?
Извиняюсь за непонятность. Буду стараться выражаться яснее. Вот аналог который вам нужен:
//s - это некий IBufferedSource, переданный снаружи
//var buf = new byte[4096]; буфер не нужен, он уже встроен в источнике
//читаем данные из потока чанками до размера буфера
while(s.FillBuffer() > 0)
{
   //пишем полученные данные в БД
   db.WriteData(s.Buffer, s.Offset, s.Count);
   s.SkipBuffer (s.Count);
}
Прекрасно. Предположим, под IBufferedSource — поток неопределенно гигантского размера (пара гигабайт), и, естественно, read-only/forward-only. Каково предполагаемое поведение реализации FillBuffer в этом случае (буфер предполагаем на несколько порядков меньше размеров потока)?
Если данные источника доступны произвольными порциями, то будет логично реализовать FillBuffer() так, чтобы при каждом вызове он дополнял уже имеющиеся в буфере данные (считывая их из источника) до полного заполнения буфера. Если никто не вызовет SkipBuffer() или TrySkip(), то следующий вызов FillBuffer() просто ничего не изменит, потому что буфер уже полон.
Хорошо.

Теперь усложним исходную задачу: нам нужно не только записать данные в БД, но и посчитать по ним MD5. Считалка MD5 — это отдельный компонент, который, если я правильно понял суть вашей задумки, тоже принимает на вход IBufferedSource.

Как будет выглядеть код этого компонента?
«Считалка MD5» существует в базовой библиотеке классов в виде крипто-преобразования System.Security.Cryptography.MD5. Вот готовый код:
//s - это некий IBufferedSource, переданный снаружи
var hashProvider = System.Security.Cryptography.MD5.Create();
var hashingSource = new CryptoTransformingBufferedSource (s, hashProvider, new byte[1024]);
while(hashingSource.FillBuffer() > 0)
{
   //пишем полученные данные в БД
   db.WriteData (hashingSource.Buffer, hashingSource.Offset, hashingSource.Count);
   hashingSource.SkipBuffer (hashingSource.Count);
}
return hashProvider.Hash;
Понятно, плохой пример, вы используете декоратор с расширенной функциональностью. Предположим, что вам нужно не посчитать MD5, а отправить те же самые данные в файловую систему (т.е., входящий поток должен быть записан и в БД, и на диск).
Суть моей задумки как раз в том, чтобы в независимости от компонента, потреблялся один источник с единственным буфером. То есть когда один потребитель что то осознанно забирает из источника (не так как из Steam забирают просто весь буфер и неизвестно что действительно понадобится), то эти потреблённые данные также исчезают из доступных данных всех других компонентов, работающих с этим источником (или его родительским источником). Ваш пример, напротив, предполагает, что один из компонентов не должен потреблять данные источника, а должен только производить дополнительные действия с потребляемыми другими компонентами данными. Решение — это один из потребителей сделать источником-ретранслятором, методы которого напрямую ретранслируются в наш источник, но при каждом потреблении данных дополнительно что то делает с ними (пишет на диск как вы хотели). Можно взять мой ObservableBufferedSource но вместо вызова _progress.Report() сделать запись на диск. Код получится примерно такой:
//s - это некий IBufferedSource, переданный снаружи
var fileWritingSource = new FileWritingBufferedSource (s, имя_файла);
while(fileWritingSource.FillBuffer() > 0)
{
   //пишем полученные данные в БД
   db.WriteData (fileWritingSource.Buffer, fileWritingSource.Offset, fileWritingSource.Count);
   hashingSource.SkipBuffer (fileWritingSource.Count);
}
Вообще-то, мой пример полностью попадает под вашу первую проблему:

Затруднение номер один: если данные одного и того же источника нужны в нескольких компонентах, то после того как один компонент считал какие то данные из Stream, то он их «потребил», и другим компонентам они уже никак не достанутся.


Получается, что ваше решение не способно решить это (прямо, скажем, весьма типовое) затруднение кроме как через введение произвольного (по числу решаемых задач) количества оберток вокруг источника.

Собственно, непрозрачность взаимодействия между несколькими потребителями данных в вашем решении меня и пугает.
Тут я не согласен. Stream не разделяет понятие «считать, сделав данные доступными» и «потребить, убрав из доступа» (в нём это одна неделимая операция), что и является причиной невозможности потреблять один и тотже поток в разных компонентах. Эта проблема решается в BufferedSource, давая возможность считывать данные произвольно большими кусками, потребляя только то, что нужно каждому компоненту и не оглядываясь на присутствие других потребителей. Ваша задача вообще красиво не решаема в модели когда потребители запрашивают данные, тут нужна модель когда источник проталкивает данные. Такая модель совершенно ортогональна моим наработкам и может быть добавлена сверху без каких либо трудностей и потерь производительности.
Эта проблема решается в BufferedSource, давая возможность считывать данные произвольно большими кусками, потребляя только то, что нужно каждому компоненту и не оглядываясь на присутствие других потребителей.

Я, собственно, не вижу, как у вас сделано «не оглядываясь на присутствие других потребителей», это меня и смущает.
Благодаря взаимодействия с вами, добавил в статью абзац, уточняющий что моё решение не меняет модель (паттерн) потребления данных из Stream и не применимо для других моделей.
«Не оглядываясь на других потребителей» конечно же, всегда верно только для чтения в буфер. А потребление из буфера подчиняется логике компонентов и должно быть учтено на их уровне в тот момент, когда один компонент, получив на вход BufferedSource, передаёт его (с какой то трансляцией) другим компонентам.
По факту получается, что ваше решение — это обертка вокруг буфера, избавляющая получателя от необходимости задумываться о его (буфера) истощении. Я правильно понял?
С точки зрения простого потребления данных, да, именно так. Но также важна возможность создавать вложенные и трансформирующие источники.
Отдельный вопрос.

Проблема неполной доступности блоков информации принципиально решаема, но ценой возникновения других проблем. Решение первое — читать данные по одному байту пока не наберём нужное количество, чревато критическим падением производительности. Решение второе — перед каждым обращением к буферу проверять, не достигли ли мы его границы и если достигли, то повторять чтение. Проверки, чтение Stream и последующая коррекция индекса в буфере перед каждым обращением сильно засоряют код и провоцируют многочисленные ошибки. Это можно вынести в отдельный метод, очистив смысловой код, но каждый компонент всё равно придется отдельно снабжать этим методом.

А разве BufferedStream не предназначен для решения именно этой проблемы?
Нет, упомянутый BufferedStream несмотря на название, не решает ничего. В нём нет методов, которые гарантируют наличие в буфере целого блока нужного размера. Также в нём нет доступного всем компонентам буфера и каждый компонент будет создавать свой буфер для вызова метода Read(). Единственное, чем он может помочь — слегка улучшить производительность при чтении по одному байту, но производительность при этом будет всё равно неприемлемо низкой.
Также в нём нет доступного всем компонентам буфера

Этой задачи в цитате и не стоит, давайте не смешивать.

Единственное, чем он может помочь — слегка улучшить производительность при чтении по одному байту, но производительность при этом будет всё равно неприемлемо низкой.

Почему «слегка» и «неприемлимо»? Вообще-то, BufferedStream работает строго по описанному у вас второму решению — при чтении проверяются границы внутреннего буфера, и если он «переистощен», то читается дальше из источника. Иными словами, он позволяет потребителю читать кусками той длины, которая ему удобна, но при этом забирать данные от поставщика теми квантами, которые обоснованы соображениями производительности.
Второй указанный мной способ предполагает доступ сразу к большому количеству данных (буферу) и простейшую арифметическую проверку индекса без вызова методов источника. А вызов метода источника для КАЖДОГО байта (пусть даже из буфера) при считывании мегабайтов будет неприемлемо медленным. Хотя для некоторых задач с небольшими источниками, возможно производительность будет приемлема, но мы же хотим универсальное решение?
Вы хотите сказать, что накладные расходы на вызов метода (например, ReadByte) оказывают существенное влияние на общую производительность системы при работе с реальным I/O? Не поверю, извините. Результаты профилирования есть?

Ну и на самом деле, не надо вызывать именно ReadByte, надо вызывать Read с конкретной нужной вам (т.е., определенной прилк-логикой) длиной, а BufferedStream позаботится о том, чтобы вы более эффективно работали с источником.

(а еще лучше — использовать BinaryReader, но тут бывают нюансы)
(прилк-логикой = прикладной логикой, сорри)
При работе с реальным I/O может и не будет существенного влияния на суммарное время выполнения (зато будет влияние на % загрузки процессора). Но ведь наш источник универсален, в том числе может предоставлять данные из ОЗУ (по аналогии с MemoryStream) или из уже считанных в буфер данных другого источника (при крипто-преобразовании) и тут влияние будет значительным.
Чтобы пояснить неудобство метода Read() привожу пример со Stream:
var buf = new byte[1024];
s.Read (buf, 0, 1024);
int bufIdx = 0;
// тут каким то образом мы обработали первые 1021 байт в буфере, осталось 3. bufIdx == 1021
// далее нам надо взять 32-х битное число, то есть 4 байта
if (bufIdx > (1024-4))
{
  Array.Copy (buf, bufIdx, buf, 0, 1024-bufIdx);
  if (s.Read (buf, 1024-bufIdx, bufIdx) < 4) throw new InvalidOperationException ("не хватило данных");
  bufIdx = 0;
}
BitConverter.ToInt32 (buf, bufIdx)

и аналог c BufferedSource:
s.FillBuffer ();
// тут каким то образом мы пропустили первые 1021 байт в буфере, осталось 3
// далее нам надо взять 32-х битное число, то есть 4 байта
s.EnsureBuffer (4);
BitConverter.ToInt32 (s.Buffer, s.Offset)

и это надо повторять для КАЖДОГО получения блока байтов.
А зачем вы так странно работаете со Stream? Что вам мешает сначала прочитать 1021 байт, а затем 4?
Мешает то, что я не знаю сколько их будет пока не прочитаю и найду признак окончания порции.
А читаете вы какими кусками? Можете показать алгоритм в псевдокоде?
Sign up to leave a comment.

Articles