В этой статья хочу рассказать о парсинге сайта jetlend.
Мое первое знакомство с этим сайтом произошло 2 месяца назад.
Как парсить и чем?
Первоначальной идея была парсить сайт с помощью языка программирования ЯП Python и Selenium.
Данные - параметры для парсинга
Приступим к анализу и изучим как устроен сайт и определимся с данными которые надо парсить.
Мне нужны данные из вторичного рынка. На странице по этому адресу находится список заемщиков (Компании). https://jetlend.ru/invest/v3/market/exchange

На этой странице есть фильтр где можно настроить по сколько компаний показать на текущей странице и кол-во всех заемщиком которые есть сейчас.
Теперь пройдем по заемщику и посмотрим какие данные есть.

Первые два параметра которые нужно сохранить это портфель заемщика. Сколько храниться в JetLend-e и сколько в других. Эти данные находятся на вкладке "Аналитика". (Запомним это нам она понадобиться в будущем)

На вкладке "О компании" находится сведения о заемщике.
Сохраним параметр "Сумма", "Ставка", "Дата регистрации" и "Выручка за год".
Определяем структуру программы
Для того чтобы получить доступ к этим данным нужно зарегестрироваться на сайт.

В этом проекте я использовал пакетный менеджер poetry он гораздо удобнее чем pip.
config — разные конфигурации, данные для входа, пароль и т. п.drivers — драйвер для selenium ( я всегда предпочитаю использовать chromedriver)result — папка для вывода конечного.csv файлаsrc — основная папка где будет находится наш скрипт.pages — отдельные методы для каждой страницыtests — для тестовutils — разные вспомогательные файлы
Hidden text
Это структура не является идеальной, я решил что хоть какая та структура не помешает в проекте и сделал не большой порядок в проекте
Пишем код
Внутри файла config.py находится небольшой класс в котором будет хранится данные для авторизации.
import os
from dotenv import load_dotenv
load_dotenv()
class UserConfig:
INVEST_USERNAME = os.getenv('INVEST_USERNAME')
INVEST_PASSWORD = os.getenv('INVEST_PASSWORD')Для того чтобы не писать каждый раз конфигурацию для Selenium я решил создать класс ChromeOptions где я пропишу все конфигурации которые я хочу и потом по надобности буду их включать или же отключать.
from selenium.webdriver.chrome.options import Options
class ChromeOptions:
@staticmethod
def get_options(headless=False, incognito=False, disable_gpu=False, disable_notifications=False,
disable_extensions=False, disable_webdriver=False, no_sandbox=False, proxy: str = None):
options = Options()
if headless:
options.add_argument("--headless")
if incognito:
options.add_argument("--incognito")
if disable_gpu:
options.add_argument("--disable-gpu")
if disable_notifications:
options.add_argument("--disable-notifications")
if disable_extensions:
options.add_argument("--disable-extensions")
if disable_webdriver:
options.add_argument("--disable-blink-features=AutomationControlled")
if no_sandbox:
options.add_argument("--no-sandbox")
if proxy:
options.add_argument(f'--proxy-server={proxy}')
return options
Внутри класса будет одна функция get_options по умолчанию все настройки выключены для того чтобы их использовать в функцию требуется передать параметр с True. Здесь закончим с нашими настройками и перейдем к основному коду с Selenium.
Для страниц я создал базовый класс BasePage от которого будем наследовать все остальные страницы.
Определим base_page.py
Показать BasePage
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.webdriver import WebDriver
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
class BasePage:
def __init__(self, driver: WebDriver, url: str, options: Options = None):
self.driver = driver
self.url = url
def navigate(self):
self.driver.get(self.url)
def wait_for_element(self, locator, timeout=10):
return WebDriverWait(self.driver, timeout).until(EC.presence_of_element_located(locator))
def wait_for_clickable_element(self, locator, timeout=10):
return WebDriverWait(self.driver, timeout).until(EC.element_to_be_clickable(locator))
def wait_for_text_in_element(self, locator, text, timeout=10):
return WebDriverWait(self.driver, timeout).until(EC.text_to_be_present_in_element(locator, text))
Класс принимает несколько атрибутов: driver url optionsВ базовом классе определил несколько методов
navigate- переходить на тот адрес который передали в инициализации классаwait_for_element- ждать пока элемент проявится на страницеwait_for_clickable_element- ждать пока прогрузиться кликабельный элемент (кнопки и тп)wait_for_text_in_element- подождать пока проявится текст
Теперь создадим класс для страницы авторизации LoginPage
Показать LoginPage
from selenium.webdriver.common.by import By
from .base_page import BasePage
class LoginPage(BasePage):
# Селекторы элементов страницы
USERNAME_INPUT = (By.NAME, 'phone')
PASSWORD_INPUT = (By.NAME, 'password')
LOGIN_BUTTON = (By.CLASS_NAME, 'button_button__text__vZRCV')
LOGIN_SUCCESS_INDICATOR = (By.CLASS_NAME, 'selectUserButton_button__tail__AEt6i') # Замените на подходящий локатор
def enter_username(self, username):
username_input = self.wait_for_element(self.USERNAME_INPUT)
username_input.clear()
username_input.send_keys(username)
def enter_password(self, password):
password_input = self.wait_for_element(self.PASSWORD_INPUT)
password_input.clear()
password_input.send_keys(password)
def click_login_button(self):
login_button = self.wait_for_clickable_element(self.LOGIN_BUTTON)
login_button.click()
def navigate_to_url(self, url):
self.driver.get(url)
def is_login_successful(self):
# Проверка успешного входа по наличию элемента, который виден только после входа в систему
try:
self.wait_for_element(self.LOGIN_SUCCESS_INDICATOR, timeout=10)
return True
except:
return False
def get_cookies(self):
return self.driver.get_cookies()
После того как авторизовались нам надо перейти на страницу вторичный рынок и спарсить всех заемщиков. Для этого создадим класс ExhangePage унаследуем все методы от BasePage.
Показать BasePage
# -*- coding: utf-8 -*-
import csv
import datetime
import math
import time
from selenium.webdriver.common.by import By
from .base_page import BasePage
class ExchangePage(BasePage):
# Селекторы элементов страницы
LINK_ELEMENTS = (By.CLASS_NAME, 'link--black')
DROP_DOWN_ELEMENT = (By.CLASS_NAME, 'MuiInputBase-root')
DROP_DOWN_100 = (By.CSS_SELECTOR, 'li.MuiTablePagination-menuItem[data-value="100"]')
NEXT_ROW_ELEMENT = (By.XPATH, '//button[@aria-label="Следующая страница"]')
TOTAL_PAGES = (By.CSS_SELECTOR, 'p.MuiTablePagination-displayedRows')
def navigate_to_url(self, url):
self.driver.get(url)
def show_100_element(self):
try:
drop_down = self.wait_for_clickable_element(locator=self.DROP_DOWN_ELEMENT, timeout=1)
drop_down.click()
drop_down_100 = self.wait_for_clickable_element(locator=self.DROP_DOWN_100, timeout=1)
drop_down_100.click()
except Exception as err:
print(err)
def parse_links(self) -> list:
"""Берет все ссылки с таблицы"""
try:
links = []
link_elements = self.driver.find_elements(*self.LINK_ELEMENTS)
for link_element in link_elements:
link = link_element.get_attribute('href')
links.append(link)
return links
except Exception as err:
print(err)
return []
def parse_all_links(self, file_path, all_pages: bool = False, table_range: int = 2):
"""Saving links to csv"""
self.wait_for_element(self.TOTAL_PAGES)
paginator_info = self.driver.find_element(*self.TOTAL_PAGES)
total_items = int(paginator_info.text.split(' ')[-1])
items_per_page = 100
all_links = [] # Список для хранения всех ссылок
if all_pages:
total_pages = math.ceil(total_items / items_per_page)
file_path = 'links/all_pages_' + datetime.datetime.now().strftime("%Y-%m-%d_%H") + ".csv"
else:
total_pages = table_range
for page in range(total_pages):
links = self.parse_links() # Обработка элементов на текущей странице
all_links.extend(links) # Добавляем ссылки на текущей странице в общий список
# Переход на следующую страницу, если это не последняя страница
if page < total_pages - 1:
self.next_row()
time.sleep(2)
self.save_links_to_csv(all_links, file_path) # Сохраняем все ссылки в CSV файл
def next_row(self):
try:
next_page_button = self.wait_for_clickable_element(self.NEXT_ROW_ELEMENT)
self.driver.execute_script("arguments[0].scrollIntoView(true);", next_page_button)
next_page_button.click()
except Exception as err:
print(err)
@staticmethod
def save_links_to_csv(links, file_path):
with open(file_path, 'a', newline='', encoding='utf-8') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['Link']) # Записываем заголовок столбца
for link in links:
writer.writerow([link]) # Записываем каждую ссылку в отдельной строке
Постарался максимально написать комментарии для понятности кода
Если надо разбор кода напишите об этом
Отлично после получения всех заемщиков, теперь надо взять наши данные из каждого заемщика.
Для этого создадим новый класс CompanyPage в этом классе будет вся логика для сбора данных
Показать CompanyPage
# -*- coding: utf-8 -*-
import time
from selenium.webdriver.common.by import By
from src.pages.base_page import BasePage
class CompanyPage(BasePage):
MAX_RETRY_ATTEMPTS = 5
ABOUT_COMPANY_BUTTON = (By.CSS_SELECTOR, '[data-tab-id="about"]')
ANALYTICS_BUTTON = (By.CSS_SELECTOR, '[data-tab-id="analytics"]')
LEGEND_ITEM = (By.CLASS_NAME, 'legendItem_container--with-value__qdFLW')
LEGEND_JETLEND = (By.CLASS_NAME, 'legendItem_inner-container__VfDKl')
LEGEND_OTHER = (By.CLASS_NAME, 'legendItem_container--with-value__qdFLW')
COMPANY_TITLE = (By.CLASS_NAME, 'companyHeader_title__vkSVS')
STAVKA_VALUE = (
By.XPATH,
'//p[contains(text(), "Ставка")]/following-sibling::p[@class="dataTable_column-value__Itr4N dataTable_bold-text__YSeCD"]'
)
REGISTER_DATE = (
By.XPATH,
'//p[contains(text(), "Дата регистрации")]/following-sibling::p[@class="dataTable_column-value__Itr4N dataTable_bold-text__YSeCD"]'
)
VIRUCHKA_YEAR = (
By.XPATH,
'//p[contains(text(), "Выручка за год")]/following-sibling::p[@class="dataTable_column-value__Itr4N dataTable_bold-text__YSeCD"]'
)
def __init__(self, driver, url=None):
super().__init__(driver, url)
def navigate_to_url(self, url):
self.driver.get(url)
def click_about(self):
button = self.wait_for_clickable_element(locator=self.ABOUT_COMPANY_BUTTON, timeout=1)
button.click()
def click_analytics(self):
button = self.wait_for_clickable_element(locator=self.ANALYTICS_BUTTON, timeout=1)
button.click()
def get_data_from_company(self) -> tuple[str, dict[str, str]]:
"""
Get data from link
return tuple(
str: company_name,
dict: company_data,
)
"""
retry_count = 0
while retry_count < self.MAX_RETRY_ATTEMPTS:
try:
# self.click_analytics()
time.sleep(1)
self.wait_for_element(self.LEGEND_ITEM, timeout=5)
self.wait_for_element(self.COMPANY_TITLE, timeout=5)
legend_items = self.driver.find_elements(*self.LEGEND_ITEM)
company_title = self.driver.find_element(*self.COMPANY_TITLE).text
# get jetlend data
self.wait_for_element((By.CLASS_NAME, "legendItem_right-data__GaMyB"), timeout=5)
value_jetlend = legend_items[0].find_element(By.CLASS_NAME, "legendItem_right-data__GaMyB").text
value_jetlend = value_jetlend.split('(')[0]
value_jetlend = value_jetlend.replace('₽', '').replace(' ', '')
# get other data
self.wait_for_element((By.CLASS_NAME, "legendItem_right-data__GaMyB"), timeout=5)
value_other = legend_items[1].find_element(By.CLASS_NAME, "legendItem_right-data__GaMyB").text
value_other = value_other.split("(")[0]
value_other = value_other.replace('₽', '').replace(' ', '')
# go to about page
self.click_about()
time.sleep(1)
self.wait_for_element(self.STAVKA_VALUE, timeout=5)
stavka = str
register_date = str
viruchka = str
# get stavka and viruchka from company
stavka = self.driver.find_element(*self.STAVKA_VALUE).text
self.wait_for_element(self.REGISTER_DATE, timeout=5)
register_date = self.driver.find_element(*self.REGISTER_DATE).text
self.wait_for_element(self.VIRUCHKA_YEAR, timeout=5)
viruchka = self.driver.find_element(*self.VIRUCHKA_YEAR).text
viruchka = viruchka.replace('₽', '').replace(' ', '')
return company_title, {
"JetLend": value_jetlend, 'Другие': value_other, 'Ставка': stavka,
'Выручка': viruchka,
"Дата регистрации": register_date
}
except Exception as err:
retry_count += 1
print(f"An error occurred: error {err}")
if retry_count < self.MAX_RETRY_ATTEMPTS:
print(f"Retrying... Attempt {retry_count}")
else:
print("Max retry attempts reached. Exiting.")
break
# Если все попытки исчерпаны или возникла ошибка, верните пустые данные
return "", {}
Закончили определение логики парсинга. Быстро определим наш utils
logger.py
import sys
from loguru import logger
def setup_logging(log_file='logs/app.log', level='INFO'):
"""Настройка системы логирования."""
logger.remove() # Удалить стандартные обработчики
logger.add(log_file, level=level,
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}") # Добавить обработчик для записи в файл
logger.add(sys.stderr, level=level,
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}") # Добавить обработчик для вывода в консоль
setup_logging() # Вызовите эту функцию для настройки логирования
saving_to_file.py
# -*- coding: utf-8 -*-
import csv
import os
import pandas as pd
def save_dataframe_to_excel(data_list: dict, filename):
"""
Save DataFrame to Excel file, appending to it if it already exists.
Parameters:
data_list (dict): data
filename (str): Name of the Excel file to save.
"""
try:
# Transpose DataFrame
df = pd.DataFrame(data_list)
df_transposed = df.T
# Save transposed DataFrame to Excel file
with pd.ExcelWriter(filename, engine='openpyxl', mode='a' if os.path.exists(filename) else 'w') as writer:
df.to_excel(writer, sheet_name='Sheet1', index=True, if_sheet_exists='replace')
print("DataFrame (Transposed) saved to '{}'".format(filename))
except Exception as e:
print("An error occurred:", e)
def save_to_csv(file_path, data):
# Проверяем наличие файла
file_exists = False
if os.path.exists(file_path):
file_exists = True
with open(file_path, 'a', newline='', encoding='utf-8') as csvfile:
fieldnames = ['Название Компании', 'JetLend', 'Другие', 'Ставка', 'Выручка', 'Дата регистрации']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
# Если файл только что создан, записываем заголовки
if not file_exists:
writer.writeheader()
# Запишем заголовки
# writer.writerow(['', 'JetLend', 'Другие', 'Ставка', 'Выручка', 'Дата регистрации'])
# Запишем данные
for key, value in data.items():
writer.writerow({'Название Компании': key, 'JetLend': value.get('JetLend', ''),
'Другие': value.get('Другие', ''), 'Ставка': value.get('Ставка', ''),
'Выручка': value.get('Выручка', ''),
'Дата регистрации': value.get('Дата регистрации', '')})
if __name__ == '__main__':
# Пример использования функции
data = {
'НАКАТС-В02': {
'JetLend': '1 355 844 ₽ (11,2%)', 'Другие': '10 742 542 ₽ (88,8%)'
},
'НАКАТС-В01': {
'JetLend': '1 355 844 ₽ (11,2%)', 'Другие': '10 742 542 ₽ (88,8%)'
},
}
save_to_csv(data=data, file_path='output.csv')
Следующим шагом будет прописать функцию для запуска сбора всех существующих заемщиков. Сделаем это в файле get_links.py
get_links.py
# -*- coding: utf-8 -*-
import csv
import os
import time
from loguru import logger
from selenium.webdriver import Chrome
from selenium.webdriver.chrome.service import Service
from config.config import UserConfig
from config.driver_options import ChromeOptions
from src.pages.company_page import CompanyPage
from src.pages.exchange_page import ExchangePage
from src.pages.login_page import LoginPage
from src.utils.loggers import setup_logging
from src.utils.saving_to_file import save_dataframe_to_excel
def read_links_from_csv(filename) -> list:
links = []
if os.path.exists(filename):
with open(filename, 'r', newline='', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
links.append(row['Link'])
return links
def get_all_links():
setup_logging()
service = Service('drivers/chromedriver.exe')
proxy_list = []
username = UserConfig.INVEST_USERNAME
password = UserConfig.INVEST_PASSWORD
login_url = 'https://jetlend.ru/invest/login'
exchange_url = 'https://jetlend.ru/invest/v3/market/exchange'
chrome_options = ChromeOptions.get_options(
headless=False,
incognito=True,
disable_gpu=True,
disable_notifications=True,
disable_extensions=True,
disable_webdriver=True,
no_sandbox=True,
)
driver = Chrome(options=chrome_options, service=service)
ALL_LINKS = []
ALL_DATA = dict()
try:
login_page = LoginPage(driver=driver, url=login_url)
exchange_page = ExchangePage(driver=driver, url=exchange_url)
company_page = CompanyPage(driver=driver)
# Авторизация
login_page.navigate()
login_page.enter_username(username)
login_page.enter_password(password)
login_page.click_login_button()
# Проверка успешной авторизации
if login_page.is_login_successful():
logger.info("Успешная авторизация!")
else:
logger.error("Ошибка авторизации: не удалось войти в систему")
exchange_page.navigate_to_url(url=exchange_url)
exchange_page.show_100_element()
driver.implicitly_wait(2)
links = exchange_page.parse_links()
all_data = dict()
for link in links:
logger.info(f"Переход по ссылке: {link}")
company_page.url = link
company_page.navigate()
time.sleep(1) # Дайте странице время для загрузки
data = company_page.get_data_from_company()
all_data[data[0]] = data[1]
# time.sleep(1)
# company_page.click_about()
time.sleep(2)
logger.info(all_data)
save_dataframe_to_excel(all_data, 'output.xlsx')
finally:
driver.quit()
if __name__ == '__main__':
get_all_links()
Для того чтобы брать уже данные компаний я планировал использовать мультипроцессы парсинг для того чтобы собирать сразу несколькими процессами. Эту логику прописал в main_multiprocessing.py
main_multiprocessing.py
# -*- coding: utf-8 -*-
import csv
import datetime
import os
import time
from multiprocessing import Pool
from loguru import logger
from selenium.webdriver import Chrome
from selenium.webdriver.chrome.service import Service
from config.config import UserConfig
from config.driver_options import ChromeOptions
from src.pages.company_page import CompanyPage
from src.pages.exchange_page import ExchangePage
from src.pages.login_page import LoginPage
from src.utils.loggers import setup_logging
from src.utils.saving_to_file import save_to_csv
def read_links_from_csv(filename) -> list:
links = []
if os.path.exists(filename):
with open(filename, 'r', newline='', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
links.append(row['Link'])
return links
def process_links(args):
service = Service('drivers/chromedriver.exe')
links, cookies, proxy = args
data_batch = {}
chrome_options = ChromeOptions.get_options(
headless=False,
incognito=True,
disable_gpu=True,
disable_notifications=True,
disable_extensions=True,
disable_webdriver=True,
no_sandbox=True,
# proxy=proxy
)
driver = Chrome(options=chrome_options, service=service)
try:
# Добавление куки в новый драйвер
driver.get('https://jetlend.ru') # Открываем любую страницу для установки куки
for cookie in cookies:
driver.add_cookie(cookie)
page = CompanyPage(driver) # Создаем новый экземпляр класса CompanyPage
for link in links:
try:
logger.info(f"Обработка ссылки: {link}")
page.url = link
page.navigate()
time.sleep(1) # Даем странице время для загрузки
data = page.get_data_from_company()
data_batch[data[0]] = data[1] # добавляем данные в batch
# print(data_batch)
# time.sleep(2) # Задержка между запросами, чтобы избежать 429 ошибки
except Exception as ex:
logger.error(f"Ошибка при обработке ссылки {link}: {ex}")
return data_batch
except Exception as ex:
logger.error(f"Ошибка при обработке ссылок: {ex}")
return None
finally:
driver.quit()
def main(file_path, batch_size):
service = Service('drivers/chromedriver.exe')
log_file_name = "logs/app_log" + datetime.datetime.now().strftime("%Y-%m-%d_%H") + ".log"
setup_logging(log_file=log_file_name)
result_file_name = "result/result_" + datetime.datetime.now().strftime("%Y-%m-%d_%H") + ".csv"
proxy_list = [
'https://37.140.31.63:80',
'https://49.48.134.216:8080',
'https://14.177.235.17:8080',
'https://91.204.239.189:8080',
'https://122.160.30.99:80'
]
username = UserConfig.INVEST_USERNAME
password = UserConfig.INVEST_PASSWORD
login_url = 'https://jetlend.ru/invest/login'
exchange_url = 'https://jetlend.ru/invest/v3/market/exchange'
chrome_options = ChromeOptions.get_options(
headless=False,
incognito=True,
disable_gpu=True,
disable_notifications=True,
disable_extensions=True,
disable_webdriver=True,
no_sandbox=True,
)
driver = Chrome(options=chrome_options, service=service)
ALL_LINKS = []
ALL_DATA = dict()
try:
login_page = LoginPage(driver=driver, url=login_url)
exchange_page = ExchangePage(driver=driver, url=exchange_url)
# Авторизация
login_page.navigate()
login_page.enter_username(username)
login_page.enter_password(password)
login_page.click_login_button()
# Проверка успешной авторизации
if login_page.is_login_successful():
logger.info("Успешная авторизация!")
else:
logger.error("Ошибка авторизации: не удалось войти в систему")
# Сохранение куки
cookies = driver.get_cookies()
# Переход на страницу обмена
# exchange_page.navigate_to_url(url=exchange_url)
# exchange_page.show_100_element()
time.sleep(2)
# Получение всех ссылок
start_parsing_time = time.time()
# exchange_page.parse_all_links() # сохранение в CSV
end_parsing_time = time.time()
logger.info(f'Время парсинга ссылок: {end_parsing_time - start_parsing_time}')
ALL_LINKS = read_links_from_csv(f'links/{file_path}')
logger.info(f'Всего ссылок: {len(ALL_LINKS)}')
# Разделение всех ссылок на группы по 50
batch_size = int(batch_size)
batches = [ALL_LINKS[i:i + batch_size] for i in range(0, len(ALL_LINKS), batch_size)]
logger.info(f"Длина batches: {len(batches)}")
# Создание пула процессов
if len(batches) > 0:
with Pool(len(batches)) as pool:
args = [(batch, cookies, proxy_list[i % len(proxy_list)]) for i, batch in enumerate(batches)]
results = pool.map(process_links, args)
# Сбор результатов
for data_batch in results:
if data_batch:
ALL_DATA.update(data_batch)
# Сохранение данных в Excel
start_saving_to_excel = time.time()
save_to_csv(data=ALL_DATA, file_path=result_file_name)
end_saving_to_excel = time.time()
logger.info(f'Время сохранения данных в Excel: {end_saving_to_excel - start_saving_to_excel}')
finally:
driver.quit()
if __name__ == '__main__':
main()
Для того чтобы было удобно пользоваться программой написал runner.py
runner.py
# -*- coding: utf-8 -*-
import datetime
from loguru import logger
from undetected_chromedriver import Chrome
from config.config import UserConfig
from config.driver_options import ChromeOptions
from main_multiprocessing import main as MultiMain
from src.pages.exchange_page import ExchangePage
from src.pages.login_page import LoginPage
from src.utils.loggers import setup_logging
def parse_links(file_path, all_pages: bool = False, table_range: int = 2):
log_file_name = "logs/app_log" + datetime.datetime.now().strftime("%Y-%m-%d_%H") + ".log"
setup_logging(log_file=log_file_name)
username = UserConfig.INVEST_USERNAME
password = UserConfig.INVEST_PASSWORD
login_url = 'https://jetlend.ru/invest/login'
exchange_url = 'https://jetlend.ru/invest/v3/market/exchange'
chrome_options = ChromeOptions.get_options(
headless=False,
incognito=True,
disable_gpu=True,
disable_notifications=True,
disable_extensions=True,
disable_webdriver=True,
no_sandbox=True,
)
driver = Chrome(options=chrome_options)
ALL_LINKS = []
ALL_DATA = dict()
try:
login_page = LoginPage(driver=driver, url=login_url)
exchange_page = ExchangePage(driver=driver, url=exchange_url)
# Авторизация
login_page.navigate()
login_page.enter_username(username)
login_page.enter_password(password)
login_page.click_login_button()
# Проверка успешной авторизации
if login_page.is_login_successful():
logger.info("Успешная авторизация!")
else:
logger.error("Ошибка авторизации: не удалось войти в систему")
exchange_page.navigate_to_url(url=exchange_url)
exchange_page.show_100_element()
driver.implicitly_wait(2)
exchange_page.parse_all_links(
file_path=file_path,
all_pages=all_pages,
table_range=table_range
)
except Exception as er:
print(er)
finally:
driver.quit()
def main():
while True:
file_path = "links/links_" + datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + ".csv"
action = input("""
Выберите действие:
1 - Запустить сбор ссылок
2 - Запустить multi parsing
3 - Завершить программу
> """)
if action == '1':
table_range: int = 2
all_pages: bool = False
question = input('Взять все ссылки?(Да/Нет) по умолчанию (нет)\n> ')
if question.lower() == 'да':
all_pages = True
elif question.lower() == 'нет':
table_range = int(input('Введите диапазон (сколько страниц по 100)?\n'))
else:
table_range = int(input('Введите диапазон (сколько страниц по 100)?\n'))
parse_links(file_path, all_pages, table_range=table_range)
elif action == '2':
file_name = input("Введите имя файла с ссылками\n> ")
batch_size = input("Введите размер партии для обработки одним браузером\n> ")
MultiMain(f"{file_name}", batch_size)
elif action == '3':
break
else:
print("Ошибка: Неверный выбор действия.")
if __name__ == "__main__":
main()

Проверяем работу
Все казалось бы просто и да все должно пройти нормально, но не тут то было
Проблемы и что я не учел
Оказывается я допустил одно упущение, у сайта была защита от большого кол-во запросов, и когда скрипт переходил по страницам таким образом делая множество запросов учитывая что заемщиков было больше 4 т. сайт блокировал большую часть запросов и мне лишь удалось собрать примерно 150 данных. И сколько бы я не пробовал мне не удалось обойти эту защиту с помощью Selenium. Я взял перерыв, и через недели 2 написал новую версию, совсем другой логикой сбора данных.
Спойлер: у сайта оказывается была api и при большем кол-во запросов блокировали запросы от акаунта на несколько часов.