Как стать автором
Обновить

Очистка текста с помощью Python. Часть 02

Уровень сложностиПростой
Время на прочтение8 мин
Количество просмотров5.3K

В предыдущей части статьи мы поговорили о том, как очистить текст от цифр и символов. Продолжим очищать данные и поговорим о том, как очистить цифры от букв и символов, а также выполнить проверку email. Также мы подсчитаем количество не пустых строк. В данном случае, строка будет пустой, если она содержит одно или менее значений. Ну и в завершении протестируем то, что у нас получилось.

Очистка строк от букв и символов

Если в предыдущей функции мы удаляли числа и символы, то сейчас нам нужно выполнить противоположную задачу. Например, нужно очистить номер телефона. И привести к какому-то одному виду для облегчения поиска. Чтобы информация не была неструктурированном и сыром виде. Создадим функцию phone_normalize(phone: str) -> str, которая на вход получает строку с номером телефона, очищает ее, приводит к требуемому виду и возвращает из функции. Для начала очистим строку с номером от скобок, кавычек и прочего. После проверим, не является ли строка пустой. Так как она может содержать не только номер телефона и потому будет просто очищена. Если номер есть, проверяем количество символов в нем. Для себя я определил, что если номер телефона, а речь идет о российских номерах, больше 11 символов, то такие номера учитывать не буду. Потому, проверяю, если больше, возвращаю пустую строку. Если количество символов в диапазоне от 6 до 10, проверяю, с какой цифры начинается номер. Если это девять, добавляю 7. Если нет, просто возвращаю номер. Если цифр 11, проверяю первую цифру. Если она 8, меняю на 7. Также здесь нужно учесть то, что не все 8 надо заменять. Потому, проверяем также вторую цифру. И если она девять, только тогда проводим замену. Ну и если номер начинается с 7, возвращаем его из функции как есть.

def email_normalize(mail: str) -> str:
    if "@" in mail:
        mail = mail.split("@")
        if mail[0].strip():
            if "." in mail[1]:
                ch_mail = mail[1].split(".")
                if len(ch_mail) == 2:
                    return "@".join(mail) if validators.email("@".join(mail)) else ""
            else:
                for char in ",:;!_*-+()/#%&":
                    if char in mail[1]:
                        ch_mail = mail[1].replace(char, ".")
                        if len(ch_mail.split(".")) == 2:
                            mail = "@".join([mail[0], ch_mail])
                            return mail if validators.email(mail) else ""
        return ""
      return ""

Очистка и проверка email

Данная функция не нуждается в особых комментариях. Здесь мы просто проверяем наличие собаки. Если есть, будем считать мылом. Нет, тогда нет. Ну и бывают опечатки вместо точки. Потому, меняем их на нее.

def email_normalize(mail: str) -> str:
    return mail.strip().replace("/", ".") if "@" in mail else ""

Подсчет количества не пустых строк

В принципе, данная функция вовсе не обязательна и нужна только в примере, который я вам хочу здесь показать. Тем не менее, для полноты картины ее нужно создать. Здесь все просто. Перебираем объекты в списке. И если они не пустые, увеличиваем счетчик. Затем, если счетчик больше 1, возвращаем True, меньше False.

def count_get(items: list) -> bool:
    cnt = 0
    for item in items:
        if item.strip():
            cnt += 1
    return True if cnt > 1 else False

Проверка функций очистки на примере файла «csv»

Давайте проверим, как работают созданные функции. Загрузим файл «.csv» и обчистим его с помощью них. Ну и запишем результат в отдельный файл.

Для этого нужно написать обработчик строк, который бы открывал файл, считывал его построчно и очищал.

Создадим функцию read_files(file: str, name: str, ascii_l=True) -> None. На входе она получает путь к «.csv» файлу, имя файла очищенное от расширения и параметр ascii_l со значением по умолчанию. Он нужен для того, чтобы сказать функции, стоит ли обрабатывать английские символы или нет. Ведь не всегда имена написаны по-русски.

Для начала откроем файл «.csv». В цикле будем итерироваться по строкам. В моем файле разделителем является «|». Если у вас что-то другое, запятая или точка с запятой, следует указать их. Проверяем первую строку. У меня она содержит заголовки. Потому их я сразу же добавляю в глобальный, предварительно объявленный, список.

Обратите внимание на то, что в данном случае структура файла нам известна заранее, а следовательно мы можем определить переменные.

Если нам нужно обработать файл, в котором структура столбцов бывает различно, то файл следует предварительно подготовить вручную. Так, скажем, привести к виду для обработки. Это касается однотипных наборов данных с различным количеством столбцов. Как обрабатывать их, здесь мы обсуждать не будем, но я сделал небольшой алгоритм. И если нужно, поделюсь им с вами в следующей статье. Потому, пишите в комментариях.

Распаковываем строку. Нормализуем Ф.И.О., email, телефон и имя пользователя. Также проверяем длину имени пользователя. Затем проверяем количество не пустых переменных. И если оно больше 1, то добавляем список в глобальный список rows_list. Выводим на печать полученные значения, чтобы не было скучно.

def read_files(file: str, name: str, ascii_l=True) -> None:
    global rows_list
    with open(file, "r", encoding="utf-8") as cs:
        for nm, row in enumerate(csv.reader(cs, delimiter="|")):
            if nm == 0:
                rows_list.append(row)
                continue
            phone, email, fio, uname = row
            fio = fio_normalize(fio, ascii_l)
            email = email_normalize(email)
            phone = phone_normalize(phone)
            uname = uname.encode().decode()
            if len(uname) > 50:
                uname = ""
            if count_get([phone, email, fio, uname]):
                rows_list.append([phone, email, fio, uname])
            else:
                continue
            print(f"\r{nm+1} | {fio} | {phone} | {email} | {uname}", end="")
            if len(rows_list) == 100000:
                with open(f"{name}_clean.csv", mode="a", encoding='utf-8', newline='') as csv_f:
                    file_writer = csv.writer(csv_f, delimiter=";")
                    file_writer.writerows(rows_list)
                rows_list.clear()

Записывать большие файлы лучше по частям. Потому, проверяем количество списков в глобальном списке. И если оно равно 100 000, записываем в файл. После чего глобальный список очищаем для новой порции.

Запрос пути к файлу. Функция main

Итак, мы приближаемся к финалу данной статьи. Создадим функцию main. Запрашиваем у пользователя путь к файлу. Также запрашиваем выполнять транслитерацию или нет. В данном случае я опустил уточняющие запросы вроде да или нет. Так как в данном случае да уже по умолчанию. А нет равно вводу «n». Проверяем существует ли файл, и что это вообще файл. Следовало бы проверить, является ли он «.csv» хотя бы по расширению. Если файла не существует, выходим из скрипта. Если же все в порядке – двигаемся дальше. Замерим также время выполнения скрипта.

Получаем имя файла без расширения. Подсчитываем кол-во строк в файле и выводим его имя и кол-во в сообщении для пользователя. Проверяем нужно ли выполнять транслитерацию. Ну и передаем путь к файлу в функцию очистки. После отработки скрипта проверяем, не пуст ли глобальный список. Если нет, сохраняем остатки данных в новый файл «.csv». Выводим в терминал время выполнения скрипта.

def main() -> None:
    global rows_list
    path = input("path file: >>> ")
    ascii_l = input("ascii_l: >>> ")
    if not Path(path).exists() or not path or not Path(path).is_file():
        exit(0)
    tm = time.monotonic()
    name = Path(path).name.removesuffix(Path(path).suffix)
    cnt_line = sum(1 for _ in open(path, "rb"))
    print(f"\n{Path(path).name} | Lines: {cnt_line}\n{'*' * 35}")
    if ascii_l == "n":
        read_files(path, name, ascii_l=False)
    else:
        read_files(path, name, ascii_l=True)

    if 0 < len(rows_list) < 100000:
        with open(f"{name}_clean.csv", mode="a", encoding='utf-8', newline='') as csv_f:
            file_writer = csv.writer(csv_f, delimiter=";")
            file_writer.writerows(rows_list)
        rows_list.clear()

    ch_time = (f'All complete | {(int(time.monotonic() - tm) // 3600) % 24:d} h. '
               f'{(int(time.monotonic() - tm) // 60) % 60:02d} m. {int(time.monotonic() - tm) % 60:02d} s.')
    lnt = len(ch_time)
    print(f'\n{"-" * lnt}\n{ch_time}\n{"-" * lnt}')


if __name__ == "__main__":
    main()

Вот в принципе и все. Ниже я приведу полный код скрипта для очистки. То есть, то, что мы написали для тестирования функций.

Полный код скрипта:

"""
pip install transliterate
"""
import csv
import string
import time
from pathlib import Path

import validators
from transliterate import translit

from transliterate import translit

csv.field_size_limit(2147483647)

rows_list = []


def replacer(txt: str) -> str:
    symbols = ("ahkbtmxcepAHKBTMXCEP",
               "анквтмхсерАНКВТМХСЕР")
    tr = {ord(a): ord(b) for a, b in zip(*symbols)}
    return txt.translate(tr)


def fio_normalize(fio: str, ascii_l=True) -> str:
    if "http" in fio or "https" in fio or "Http" in fio or "Https" in fio:
        return ""
    if fio.startswith("-") or fio.endswith("-"):
        fio = fio.strip("-").strip()
    if "-" in fio:
        fio = fio.replace("-", "тирре")
    fio = "".join(x for x in fio if x.isalpha() or x == " ").strip().replace("тирре", "-")
    ascii_count = 0
    for xz in fio:
        if xz == " ":
            ascii_count += 1
        ascii_count += sum(1 for x in xz if x in string.ascii_letters)
    if ascii_l and ascii_count == len(fio):
        fio = translit(fio, "ru")
    elif ascii_l:
        temp = []
        for x in fio:
            temp.append(replacer(x)) if x in string.ascii_letters else temp.append(x)
        fio = "".join(temp)
    fio = " ".join(x.strip().capitalize() for x in fio.split())
    lst = []
    for x in fio.split():
        if "-" in x:
            lst.append("-".join(z.capitalize() for z in x.split("-")))
        else:
            lst.append(x)
    fio = " ".join(lst)
    if len(fio.split()) > 3:
        fio = " ".join(fio.split()[0:3])
    if len(fio) > 50:
        fio = fio[:51]
    return fio if fio else ""


def email_normalize(mail: str) -> str:
    return mail.strip().replace("/", ".") if "@" in mail else ""


def phone_normalize(phone: str) -> str:
    phone = "".join(x for x in phone if x.isdecimal())
    if phone:
        if len(phone) > 11:
            return ""
        elif 6 <= len(phone) < 10:
            return phone
        elif len(phone) == 10:
            if phone.startswith("9"):
                return f"7{phone}"
            else:
                return phone
        elif len(phone) == 11:
            if phone.startswith("8") and phone[1] == "9":
                return f"7{phone[1:]}"
            elif phone.startswith("7"):
                return phone
            else:
                return ""
        return ""
    return ""


def count_get(items: list) -> bool:
    cnt = 0
    for item in items:
        if item.strip():
            cnt += 1
    return True if cnt > 1 else False


def read_files(file: str, name: str, ascii_l=True) -> None:
    global rows_list
    with open(file, "r", encoding="utf-8") as cs:
        for nm, row in enumerate(csv.reader(cs, delimiter="|")):
            if nm == 0:
                rows_list.append(row)
                continue
            phone, email, fio, uname = row
            fio = fio_normalize(fio, ascii_l)
            email = email_normalize(email)
            phone = phone_normalize(phone)
            uname = uname.encode().decode()
            if len(uname) > 50:
                uname = ""
            if count_get([phone, email, fio, uname]):
                rows_list.append([phone, email, fio, uname])
            else:
                continue
            print(f"\r{nm+1} | {fio} | {phone} | {email} | {uname}", end="")
            if len(rows_list) == 100000:
                with open(f"{name}_clean.csv", mode="a", encoding='utf-8', newline='') as csv_f:
                    file_writer = csv.writer(csv_f, delimiter=";")
                    file_writer.writerows(rows_list)
                rows_list.clear()


def main() -> None:
    global rows_list
    path = input("path file: >>> ")
    ascii_l = input("ascii_l: >>> ")
    if not Path(path).exists() or not path or not Path(path).is_file():
        exit(0)
    tm = time.monotonic()
    name = Path(path).name.removesuffix(Path(path).suffix)
    cnt_line = sum(1 for _ in open(path, "rb"))
    print(f"\n{Path(path).name} | Lines: {cnt_line}\n{'*' * 35}")
    if ascii_l == "n":
        read_files(path, name, ascii_l=False)
    else:
        read_files(path, name, ascii_l=True)

    if 0 < len(rows_list) < 100000:
        with open(f"{name}_clean.csv", mode="a", encoding='utf-8', newline='') as csv_f:
            file_writer = csv.writer(csv_f, delimiter=";")
            file_writer.writerows(rows_list)
        rows_list.clear()

    ch_time = (f'All complete | {(int(time.monotonic() - tm) // 3600) % 24:d} h. '
               f'{(int(time.monotonic() - tm) // 60) % 60:02d} m. {int(time.monotonic() - tm) % 60:02d} s.')
    lnt = len(ch_time)
    print(f'\n{"-" * lnt}\n{ch_time}\n{"-" * lnt}')


if __name__ == "__main__":
    main()

Тестирование

Запустим скрипт и укажем путь к тестовому «csv». В нем содержаться случайно сгенерированные данные. Очистим их с помощью скрипта. Чтобы вы понимали, вот кусочек изображения с номерами телефонов.

Номера телефонов
Номера телефонов

Как видите – сборная солянка. Ну и то, что было до обработки и что стало после, на примере одной строки.

Строка до обработки
Строка до обработки

Обработали и получили:

Строка после обработки
Строка после обработки

Таким образом, мы узнали, что очистить строку вовсе не так сложно. Особенно с помощью методов самого python, без изобретения дополнительного велосипеда.

А на этом, пожалуй, все.
Спасибо за внимание. Надеюсь, данная информация будет вам полезна!

Подписывайся на наши телеграм каналы!


UPD: Учитывая пожелания из комментариев переделал функцию проверки email.

Теги:
Хабы:
Всего голосов 11: ↑1 и ↓10-8
Комментарии7

Публикации

Истории

Работа

Data Scientist
78 вакансий
Python разработчик
119 вакансий

Ближайшие события

7 – 8 ноября
Конференция byteoilgas_conf 2024
МоскваОнлайн
7 – 8 ноября
Конференция «Матемаркетинг»
МоскваОнлайн
15 – 16 ноября
IT-конференция Merge Skolkovo
Москва
22 – 24 ноября
Хакатон «AgroCode Hack Genetics'24»
Онлайн
28 ноября
Конференция «TechRec: ITHR CAMPUS»
МоскваОнлайн
25 – 26 апреля
IT-конференция Merge Tatarstan 2025
Казань