Привет! В первой части я поделился мыслями, которые побудили к созданию python библиотеки convtools. Кратко о ней: предоставляет примитивы (конверсии), объединяя которые, можно описывать сложные конверсии для обработки данных. Конверсия генерирует узкоспециализированный код, компилирует его и возвращает функцию, решающую конкретную задачу.
В этот раз хотелось бы подробнее остановиться на двух моментах:
как
pipeпозволяет повысить переиспользуемость кодановая часть библиотеки:
Table- потоковый обработчик табличных данных
1) Pipes & code reuse
Надеюсь, что читать код с комментариями удобнее, чем оторванное от него косноязычное повествование, поэтому приступим:
from datetime import datetime from decimal import Decimal from convtools import conversion as c # Допустим нам нужно прочесть некий отчет и поработать с ним. # описываем конверсии, необходимые при чтении отчета # сохраним то, как будут обрабатываться все входные строки c_str = c.this().call_method("strip") # то, как из обработанной строки получить Decimal (убиваем запятые - # разделители групп разрядов) c_decimal = c_str.call_method("replace", ",", "").as_type(Decimal) c_parse_date = c.call_func( datetime.strptime, c.this(), "%Y-%m-%d" ).call_method("date") c_date = c_str.pipe(c_parse_date) c_optional_date = c_str.pipe(c.if_(c.this(), с_parse_date, None)) c_optional_decimal = c_str.pipe(c.if_(c.this(), c_decimal, None)) # разменяем входные строки на понятные типы через (тут должен быть # Enum, но с диктом тоже красиво) c_product_type = c.naive( { "phone": 1, "laptop": 2, } ).item(c_str.call_method("lower")) # теперь определим, что нас может интересовать в данных, направляя # каждое из полей в необходимую конверсию schema = { "full_name": c.item("Full Name").pipe(c_str), "sku": c.item("SKU").pipe(c_str), "product_type": c.item("Product Type").pipe(c_product_type), "earnings": c.item("Earnings").pipe(c_decimal), "refunds": c.item("Refunds").pipe(c_optional_decimal), "date": c.item("Date").pipe(c_date), "date_of_birth": c.item("Date of Birth").pipe(c_optional_date), } # теперь можем забыть о том, как что и откуда берется. # можно описать необходимую логику converter = ( # группируем по именам и датам c.group_by(schema["full_name"], schema["date"]) .aggregate( { "full_name": schema["full_name"], "date": schema["date"], "date_of_birth": c.ReduceFuncs.First(schema["date_of_birth"]), "total_earnings": c.ReduceFuncs.Sum(schema["earnings"]), } ) # отбросим "начинающих" .filter(c.item("total_earnings") > 1000) # сгруппируем по датам, соберем якобы полезную аналитику: # - кол-во не "начинающих" # - их средний заработок # - их медианная дата рождения # - dict: SKU -> сумма заработков (агрегация в агрегации) .pipe( c.group_by(c.item("date")).aggregate( { "date": c.item("date"), "number_of_workers": c.ReduceFuncs.Count(), "average_earnings": c.ReduceFuncs.Average( c.item("total_earnings") ), "median_date_of_birth": c.ReduceFuncs.Median( c.item("date_of_birth") ), "earnings_by_sku": c.ReduceFuncs.DictSum( c.item("sku"), c.item("total_earnings"), ), } ) ) # debug=True - выведет в stdout сгенерированный код. # если установлен black, он будет еще и отформатирован .gen_converter() ) converter(input_data)
сгенерированный код
def pipe__cp(input__cp, _naive, _labels, _none): return ( _naive["strptime_ox"](input__cp.strip(), _naive["v_7o"]).date() if input__cp else None ) class AggData__cg: __slots__ = ["v0", "v1"] def __init__(self, _none=__none__): self.v0 = _none self.v1 = _none def group_by__cg(data_, _naive, _labels, _none): signature_to_agg_data__cg = defaultdict(AggData__cg) for row__cg in data_: agg_data__cg = signature_to_agg_data__cg[ ( row__cg["Full Name"].strip(), _naive["strptime_ox"]( row__cg["Date"].strip(), _naive["v_7o"] ).date(), ) ] if agg_data__cg.v0 is _none: agg_data__cg.v0 = pipe__cp( row__cg["Date of Birth"].strip(), _naive, _labels, _none ) agg_data__cg.v1 = ( _naive["Decimal_4v"]( row__cg["Earnings"].strip().replace(",", "") ) or 0 ) else: agg_data__cg.v1 = agg_data__cg.v1 + ( _naive["Decimal_4v"]( row__cg["Earnings"].strip().replace(",", "") ) or 0 ) result_ = ( { "full_name": signature__cg[0], "date": signature__cg[1], "date_of_birth": ( None if agg_data__cg.v0 is _none else agg_data__cg.v0 ), "total_earnings": ( 0 if agg_data__cg.v1 is _none else agg_data__cg.v1 ), } for signature__cg, agg_data__cg in signature_to_agg_data__cg.items() ) filtered_result_ = [ i_4z for i_4z in result_ if (i_4z["total_earnings"] > 1000) ] return filtered_result_ class AggData__gp: __slots__ = ["v0", "v1", "v2", "v3"] def __init__(self, _none=__none__): self.v0 = _none self.v1 = _none self.v2 = _none self.v3 = _none def group_by__gp(data_, _naive, _labels, _none): signature_to_agg_data__gp = defaultdict(AggData__gp) for row__gp in data_: agg_data__gp = signature_to_agg_data__gp[row__gp["date"]] if agg_data__gp.v0 is _none: agg_data__gp.v0 = 1 agg_data__gp.v2 = [row__gp["date_of_birth"]] agg_data__gp.v3 = _d = defaultdict(int) _d[row__gp["sku"]] = row__gp["total_earnings"] or 0 else: agg_data__gp.v0 = agg_data__gp.v0 + 1 agg_data__gp.v2.append(row__gp["date_of_birth"]) agg_data__gp.v3[row__gp["sku"]] = agg_data__gp.v3[ row__gp["sku"] ] + (row__gp["total_earnings"] or 0) if agg_data__gp.v1 is _none: if row__gp["total_earnings"] is not None: agg_data__gp.v1 = (1, row__gp["total_earnings"] * 1) else: if row__gp["total_earnings"] is not None: agg_data__gp.v1 = ( agg_data__gp.v1[0] + 1, agg_data__gp.v1[1] + row__gp["total_earnings"] * 1, ) return [ { "date": signature__gp, "number_of_workers": ( 0 if agg_data__gp.v0 is _none else agg_data__gp.v0 ), "average_earnings": ( None if agg_data__gp.v1 is _none else (agg_data__gp.v1[1] / agg_data__gp.v1[0]) ), "median_date_of_birth": ( None if agg_data__gp.v2 is _none else _naive["median_59"](agg_data__gp.v2) ), "earnings_by_sku": ( None if agg_data__gp.v3 is _none else (dict(agg_data__gp.v3)) ), } for signature__gp, agg_data__gp in signature_to_agg_data__gp.items() ] def converter_uk(data_): global __naive_values__, __none__ _naive = __naive_values__ _none = __none__ _labels = {} return _naive["group_by__gp"]( _naive["group_by__cg"](data_, _naive, _labels, _none), _naive, _labels, _none, )
Так как все конверсии являются выражениями, метод pipe позволяет комбинировать практически любые из них. pipe-ы разумно воспринимать как бесплатную с точки зрения производительности абстракцию, так как в случаях, когда не требуется добавление label-ов и входные данные не используются более одного раза, pipe не приводит к дополнительному вызову функции.
2) Table - потоковая обработка табличных данных
Время от времени у меня появлялась необходимость в работе с табличными данными (или схожими с ними). И меня всегда прельщала мысль "запечь" в код индексы колонок, чтобы обращения к ним были дешевыми. И вот, наконец, имея под рукой примитивы convtools, получилось реализовать задуманное, совершая малое количество телодвижений.
Логика работы: Table работает с итераторами лениво, вычитывая только первую строку, в которой может содержаться заголовок. Далее, опираясь на заголовок и запрошенные изменения, пишется код, который выполнит нужные преобразования над имеющимся итератором. Методы into_iter_rows и into_csv являются конечными.
from convtools.contrib.tables import Table from convtools import conversion as c ( Table.from_csv( "tests/csvs/ac.csv", header=True, dialect=Table.csv_dialect(delimiter="\t"), ) # возьмем только колонки "a" и "c" .take("a", "c") # добавим две новых вычисляемых колонки .update(B=c.col("a") + c.col("c"), D=c.call_func(abs, c.col("c"))) # переименуем .rename({"a": "A"}) # отфильтруем строки .filter(c.col("c") < 10) # отбросим колонку "c" .drop("c").into_csv("tests/csvs/out.csv") ) # объединим две таблицы по колонке "a" list( Table.from_rows([(1, 2), (2, 3)], ["a", "b"]) .join( Table.from_rows([(1, 3), (2, 4)], ["a", "c"]), how="inner", on=["a"], ) .into_iter_rows(dict) # поддерживаются list/tuple )
Также добавлены два изредка полезных метода:
""" Table 1 Table 2 | a | b | | b | c | | 1 | 2 | | 3 | 4 | >>> table1.chain(table2, fill_value=" ") Result: | a | b | c | | 1 | 2 | | | | 3 | 4 | """
""" Table 1 Table 2 | a | b | | b | c | | 1 | 2 | | 3 | 4 | | 5 | 6 | >>> table1.zip(table2, fill_value=" ") Result: | a | b | b | c | | 1 | 2 | 3 | 4 | | | | 5 | 6 | """
Заключение
Конечно, я не питаю иллюзий, что все резко бросятся использовать convtools, но всё равно захотелось поделиться. И даже не потому, что codegen добавляет гибкость, просто меня написание кода в таком стиле ставит на другие рельсы. С таким подходом я допускаю значительно меньше ошибок, становятся более очевидными стадии обработки данных, облегчается тестирование кода, т.к. работа ведется с чистыми "почти-функциями" + convtools поощряет работу с iterator-ами, что положительно сказывается на потребляемой памяти.
Всем успехов и, если что-нибудь придумаю или подскажете, до скорого!
