Как стать автором
Обновить

Мой опыт взаимодействия с SheetsAPI от Google

Время на прочтение23 мин
Количество просмотров9.8K

Доброго времени суток, хабронавты. Поделюсь в этой статье своим опытом взаимодействия с Google Drive API, конкретнее, с SheetsAPI - API для таблиц.

Ключевые слова: Google Sheets API, Google Drive API, Python, PostgreSQL, apscheduler, googleapiclient, oauth2client, psycopg2.

Сегодня разберем интересный кейс использования Google Sheets API и Google Drive API. Задача расписана подробно и кода приведено не мало (его Вы можете найти в этом репозитории GitLab).

Если Вас интересует регистрация сервисного аккаунта Google или процесс создания и кастомизация Google-таблиц, обратитесь к этой статье;

Если Вас интересует задача отслеживания изменений на Google-диске, проматывайте на заголовок "Этапы 2-4. Узнаем об изменениях в файле."

Если Вас интересует работа с SheetsAPI в комплексе, Вам нужна часть статьи под заголовком "Этапы 2-4. Работа с Sheets API.";

Тем, кто зашел сюда, желая поработать с PostreSQL через Python, пригодится раздел "Этап 1. Подключение к PostreSQL из Python.".

Приятного прочтения!

Содержание

Не будет преувеличением сказать, что Google Drive - всемирно известный продукт всемирно известной Alphabet. Они бесплатно предоставляют пользователям облачное хранилище данных с комплектом офисных редакторов, которые, как я считаю, способны заменить MS Office и особенно удобны для совместной работы над документами в небольших группах. Лично я использую Google Drive для бэкапа личных файлов и некоторых рабочих проектов. А как насчет обратной задачи - скачивания изменений, например, из Google-таблицы и сохранения их в базу данных? Этапы решения именно этой задачи мы проделаем вместе в этой публикации.

Пара слов про Google API

Не удивительно, что создав столь сложный продукт, компания-таки поделилась с разработчиками некоторыми функциями, которые можно вызывать с помощью API. Даже документация включает переводы на разные языки и включает примеры кода на Python, Java, NodeJS, Go. PHP, C#, Ruby. Но на проверку оказалось, что сложностей там не мало.

Сегодня нам понадобится зарегистрировать хранилище, получить учетные данные для использования в коде и написать скрипт на Python, который будет вызывать функции согласно документации API для Google-таблиц - SheetsAPI - с которой, в основном, будем работать.

Скрипты могут взаимодействовать с Google Drive посредством сервисных аккаунтов Google - Google Service Accounts, которые отличаются от привычных нам аккаунтов пользователя. Если кратко, то это отдельная сущность в системе Google Drive. У нее свой способ аутентификации и авторизации посредством credentials (учетных данных) - набору ключей шифрования, и свое дисковое пространство. Авторизация во всех сервисах производится посредством OAuth 2.0.

Сервисные аккаунты - унифицированный способ взаимодействия с сервисами Google. Они нужны не только для работы с Google Drive и Google Sheets, но и Google BigQuery, Google Cloud и прочими решениями для бизнеса. Про схему регистрации такого аккаунта прошу прочесть в этой статье, которая сильно мне помогла, и автору которой я благодарен.

Постановка задачи

Уточню цель: есть таблица с данными о поставках:

Рисунок 1. Вид исходной таблицы.
Рисунок 1. Вид исходной таблицы.

В ней: порядковый номер - число, индекс заказа - число, стоимость товаров в долларах США, дата поставки - дата в российском формате даты (всегда советую обращать внимание на форматы даты и времени, т. к. это поможет избежать трудностей с хранением и интерпретацией данных в базе). Нужно взять значение курса пары RUB-USD с сайта ЦБ РФ на сегодня, скопировать таблицу с облака и перенести в БД PostreSQL, добавив колонку "стоимость в рублях". После этого нужно отслеживать изменения и периодически обновлять таблицу.

Какие еще колонки стоит добавить в БД? Т.к. номеру строки в Google-таблице доверять не стоит, потому что его заносит человек, колонку с номером оставляем как отдельное поле. Строки могут быть переставлены, поэтому индекс строки тоже сохраним. Т.к. мы для чего-то сохраняем всё в свою БД, наверное, у нас будет свой клиент, и т.к. мы отслеживаем изменения, добавим колонку updated_at с датой и временем (!) обновления. PostreSQL автоматически добавляет колонку created_at, так что это будет хорошим дополнением. Оба стандартного для PostgreSQL типа даты-времени без учета временных поясов timestamp (PostreSQL работает со стандартом времени ISO 8601).

Исходную задачу копирования данных декомпозирую на следующие этапы:

  • Создать БД и подключиться к ней из скрипта Python.

  • Создать пустую таблицу в пространстве сервисного аккаунта;

  • Скопировать нужный лист из таблицы-источник данных в новую таблицу. Впрочем, можно было сразу работать с исходником, но так не интересно, верно? На самом деле, сервисному аккаунту еще нужно выдать права на чтение / редактирование, и прежде, чем работать с "боевым" источником, разумно наладить работу скрипта на копии;

  • Выдать права на чтение новой таблицы своему пользовательскому аккаунту;

  • Периодически опрашивать Google-диск на наличие изменений в файле.

  • Если изменения есть, запустить процедуру копирования таблицы в БД.

Отмечу, что задачу выявления того, что именно изменилось, не ставлю. И нет, SheetAPI не предоставляет настолько детальную информацию, хотя через браузерный клиент Google Sheets изменения видны, а значит Google их собирает и хранит. Таких деталей в работе с Sheets API будет еще много, и о каждой поговорим подробнее далее.

Копировать буду чанками (chunks) - порциями по N строк. Так происходит работа с реальными боевыми БД, ведь количество оперативной памяти в компьютере не безгранично, а каждая строка может весить килобайты. Кроме того, такой подход помогает снизить количество коллизий при работе с БД в многопользовательском режиме, хотя на чтение, что имеет место в нашем случае, это не влияет. Однако, тема эта отдельная. Справедливой будет критика: "Да кто же хранит миллионы строк в Google-таблице!?" - это верно: по умолчанию, новая таблица содержит только 1000 строк, но ее можно расширять, а в ячейках строки может хранится, например, большое количество текста. В общем, оптимизация имеет место быть далее, а число строк, считываемых за операцию целесообразно подбирать исходя из данных, хранящихся в таблице, и если Вы намерены использовать код ниже в работе, то всегда можно протестировать скорость копирования и нагрузку на сеть, подобрать оптимальную, исходя из Ваших критериев.

Работа с ключами, паролями и т.п. требует особенного подхода. Буду хранить их в отдельной папке secret/ в формате .json. Работать с ними будет с помощью модулей os, json.

Подготовительный этап. Создание БД

Создадим БД и таблицу. Я пользуюсь бесплатным приложением JetBrains Datagrip. Конечно, сам PostgreSQL уже установлен.

Создаем пользователя от имени корневого пользователя:

psql -U postgres

Если Вы работаете на Windows, нужно запустить отдельную консоль psql. По умолчанию она ставится в C:\Program Files\PostgreSQL\14\scripts\runpsql.bat, но так же доступна через ярлык C:\ProgramData\Microsoft\Windows\Start Menu\Programs\PostgreSQL 14\(psql).lnk.

Вводим логин postres, пароль от Вашей учетной записи в системе, порт и хост по умолчанию: localhost:5432.

CREATE USER user_example WITH password '123456';
CREATE DATABASE example_db OWNER user_example;
# Возможно, потребуется выдать привелегии:
GRANT ALL PRIVILEGES ON DATABASE example_db TO <yourname>;

Выходим (exit;), заходим в DataGrip Файл -> новый проект, (+) -> добавить источник -> PostgreSQL -> заполняем поля, как показано ниже:

Рисунок 2. Проект Datagrip
Рисунок 2. Проект Datagrip

Далее вводим во встроенной консоли и выполняем:

-- Триггер на обновление

CREATE
    OR REPLACE FUNCTION trigger_set_timestamp()
    RETURNS TRIGGER AS
$$
BEGIN
    NEW.updated_at
        = NOW();
    RETURN NEW;
END;
$$
    LANGUAGE plpgsql;

-- создание таблицы. Дату из Google-таблицы храню в виде строки delivery_date: так проще
CREATE TABLE Orders
(
    id                        SERIAL                    NOT NULL PRIMARY KEY,
    table_row_index           INTEGER                   NOT NULL UNIQUE,     -- номер строки в адресе вида A1:A50
    table_row_number          INTEGER                   NOT NULL,
    order_number              INTEGER                   NOT NULL UNIQUE,
    cost_usd                  NUMERIC(12, 2)            NOT NULL,
    cost_rub                  NUMERIC(12, 2)            NOT NULL,
    delivery_date             CHAR(10)                  NOT NULL,
    created_at                TIMESTAMPTZ               NOT NULL DEFAULT NOW(),
    updated_at                TIMESTAMPTZ               NOT NULL DEFAULT NOW()
);

-- подключаем триггер
CREATE TRIGGER set_timestamp
    BEFORE UPDATE
    ON Orders
    FOR EACH ROW
EXECUTE PROCEDURE trigger_set_timestamp();

Обратите внимание, что delivery_date - дата из Google-таблицы - имеет тип строки, а денежные значения NUMERIC(12, 2). Со строкой работать намного проще, иначе приходилось бы каждый раз парсить строку с датой из облака в ISO 8601, а затем пихать ее в строку запроса. С денежными форматами тоже не просто, но я использую обычный числовой формат с двумя цифрами после запятой, однако чтобы вставить число, теперь нужно делать приведение типа:

INSERT INTO Orders(
  table_row_index,
  table_row_number,
  order_number,
  cost_rub,
  cost_usd,
  delivery_date
) VALUES (
  57, 1, 1342,
  300::NUMERIC(12, 2),   -- приведение типа
  18000::NUMERIC(12, 2), -- приведение типа
  '04.07.2022'::char(10) -- приведение типа
);

При обновлении данных из облака будет очень неудобно каждый раз проверять, существует ли данная строка уже в БД. Общеизвестным является подход, при котором создается функция, объединяющая функцию вставки insert и обновления update. Причем эта функция будет доступна при выполнении запросов из Python!

CREATE
    OR REPLACE FUNCTION upsert_orders(
        arg_table_row_index INTEGER,
        arg_table_row_number INTEGER,
        arg_order_number INTEGER,
        arg_cost_usd DOUBLE PRECISION,
        arg_cost_rub DOUBLE PRECISION,
        arg_delivery_date CHAR(10)
    )
    RETURNS VOID AS
$$
DECLARE
BEGIN
    UPDATE Orders as o SET
        table_row_number = arg_table_row_number,
        order_number = arg_order_number,
        cost_usd = arg_cost_usd,
        cost_rub = arg_cost_rub,
        delivery_date = arg_delivery_date
    WHERE table_row_index = arg_table_row_index;

    IF NOT FOUND THEN

    INSERT INTO Orders(table_row_index, table_row_number, order_number, cost_rub, cost_usd, delivery_date)
    VALUES (arg_table_row_index, arg_table_row_number, arg_order_number, arg_cost_rub, arg_cost_usd, arg_delivery_date);

    END IF;
END;
$$
    LANGUAGE plpgsql;

Выделяем все -> выполнить (зеленый треугольник на панели сверху).

БД создана, приступим к написанию скрипта.

Подготовительный этап. Конфиги

Структура проекта:

root/
└─── secret/
│    credentials.json        		# файл с ключами для сервсиного аккаунта
│    db.json                 		# файл с паролями к СУБД
│    gsheets_cached_config.json # Информация о состоянии таблицы
│
└─── script/
    │   main.py           			# бесконечный цикл периодиечкого вызова
    │
    └───lib/
    		│   config.py         	# необходимые константы
        │   ConfigLoader.py   	# Работа с конфигами
        │   CRB.py            	# Забросы к API ЦБ РФ
        │   DBConnector.py    	# Работа с БД
        │   GSheets.py        	# Работа с Google Sheets API
        

Все конфиги - одноуровневые json'ы. Для их чтения создадим класс ConfigLoader:

# script/lib/ConfigLoader.py

import json
import os

class ConfigLoader:
  def __init__(self, dir, filenames):
    self.config = {}
    for filename in filenames:
      filename_path = os.path.join(dir, filename)
      # Если требуемого файла нет, он создается и выводится предупреждение
      if not os.path.exists(filename_path):
        try:
          print(filename, 'does not exist')
          with open(filename_path, 'w'): pass
          print(filename, 'created in', dir)
        except Exception as e:
          print('Config error:', e)
          continue

      with open(filename_path, 'r') as config_file:
        try:
          # в результате получаем self.config вида:
          # {
          # 	[имя файла без расширения]: [JSON в нём]
          # }
          self.config[os.path.splitext(os.path.basename(filename))[0]] = json.load(config_file)
        except Exception as e:
          print('Config error on loading config:' , e)

Например, данные для подключения к БД можно извлечь теперь вот так:

// secret/db.json

{
  "DB_NAME": "db_example",
  "USER": "user_example",
  "PSWD": "123456",
  "HOST": "localhost"
}
# script/lib/config.py

from pprint import pprint
from .ConfigLoader import ConfigLoader

# files not to share
SECRETS_DIRECTORY = 'secret/'
CREDENTIALS_FILE = 'credentials.json'           # Google service account credentials.
                                                # Relative to SECRETS_DIRECTORY
CONFIG_FILENAME = 'gsheets_cached_config.json'  # Script creates new table on launch and writes its
                                                # params here. If the file exists, new table.
                                                # Relative to SECRETS_DIRECTORY

config = ConfigLoader(SECRETS_DIRECTORY, [
  'db.json',
]).config

if not config.get('db'):
  print('there is no db.json in', SECRETS_DIRECTORY)
  exit(1)
# keys for DB connecting
try:
  DB_NAME = config['db']['DB_NAME']
  USER = config['db']['USER']
  PSWD = config['db']['PSWD']
  HOST = config['db']['HOST']
except KeyError as ke:
  print('DB: config error. missing field', ke)
except Exception as e:
  print('DB: config error', e)
  exit(1)

print('App started with config:')
pprint(config)

Этап 1. Подключение к PostreSQL из Python

Теперь, когда собрана база и скрипт знает, как к ней подключиться, можно осуществить подключение с помощью библиотеки-обертки для PostgreSQL в Python - psycopg2.

# script/lib/DBConnector.py

class DB:
  def __init__(self):
    self.connected = False
    self.conn = None
    self.cursor = None
    pass

  def _connect(self):
    self.connected = True
    self.conn = psycopg2.connect(dbname=DB_NAME, user=USER, 
                                 password=PSWD, host=HOST)
    self.cursor = self.conn.cursor()
    self.conn.autocommit = True

    self._safely_execute("select relname from pg_class where relkind='r' and relname !~ '^(pg_|sql_)';")
    tables = map(lambda x: x[0], self.cursor.fetchall())
    if not MAIN_TABLE_NAME.lower() in tables:
      print(f'DB: Table {MAIN_TABLE_NAME} is missing in {DB_NAME}!')
      self._close()
      
def _safely_execute(self, q):
    # https://www.psycopg.org/docs/errors.html
    try:
      self.cursor.execute(q)
    except Exception as e:
      print('DB: Postgre execution error:', e)

Добавим другие полезные базовые методы:

# script/lib/DBConnector.py

  def _close(self):
    self.connected = False

    self.cursor.close()
    self.conn.close()

    self.conn = None
    self.cursor = None
  
  
  def __enter__(self):
    self._connect()
    return self    


  def __exit__(self, exc_type, exc_val, exc_tb):
    self._close()


  def connect(self):
    self._connect()


  def close(self):
    self._close()

Далее требуются методы для основных запросов:

  • чтение чанка:

# script/lib/DBConnector.py

  def get_chunk(self, from_idx, to_idx):
    if not self.connected:
      self._connect()

    q = f'''
SELECT * FROM {MAIN_TABLE_NAME}
WHERE table_row_index >= {from_idx + TRACKED_SHEET_HEADER_OFFSET + 1} AND table_row_index <= {to_idx + TRACKED_SHEET_HEADER_OFFSET};};};
'''
    self._safely_execute(q)
    return self.cursor.fetchall()
  • Обновление рублевых цен при изменении курса доллара:

# script/lib/DBConnector.py

  def update_cost_rub(self, price):
    if not self.connected:
      self._connect()

    q = f'''
UPDATE {MAIN_TABLE_NAME}
SET cost_rub = cost_usd * {price};
    '''

    self._safely_execute(q)
  • Вставка или изменение строк при обновлении данных с облака:

# script/lib/DBConnector.py

  def upsert(self, values):
    if not self.connected:
      self._connect()

    q = ''
    for value in values:
      if len(value) < FIELDS_NUM:
        if len(value) > 0:
          print('\tDB.upsert:\tmissing values:', value, '\trequired length is', FIELDS_NUM)
        continue
      q += f'SELECT upsert_orders{_gen_value_string(value)};\n'
    
    self._safely_execute(q)

В последнем методе добавим небольшую проверку формата. Для подстановки данных я не использую шаблонные строки, а генерирую строку значений из массива values с помощью функции _gen_value_string (о ней ниже). Нужно иметь ввиду, что вместо значений может быть переданы строки SQL-запросов, которые потенциально несут угрозу базе данных. В данной статье уделять внимание этому не будем.

Тут есть несколько констант, связанных с таблицей. Они описаны в script/lib/config.py

# script/lib/config.py

# размер чанка
CHUNK_SIZE = 1000
MAIN_TABLE_NAME = 'Orders' # Имя таблицы 

# Сопоставление имени нужного поля в таблице Postgres с индексом соответствующего
# значения в строках Google-таблицы.

MAIN_TABLE_FIELD_NAME__GOOGLE_SPREADSHEET_COLUMN_IDX__MAP = {
  "table_row_index": 4,
  "table_row_number": 0,
  "order_number": 1,
  "cost_usd": 2,
  "cost_rub": 5,
  "delivery_date": 3,
}

# Указываем тут, нужно ли приведение типа для данного поля
MAIN_TABLE_FIELD_NAME__MAIN_TABLE_FIELD_TYPE = {
  "table_row_index": '',
  "table_row_number": '',
  "order_number": '',
  "cost_usd": '::NUMERIC(12, 2)',
  "cost_rub": '::NUMERIC(12, 2)',
  "delivery_date": '::CHAR(10)',
}

# Порядок, в котором поля требуется вставить в SQL-запрос
DEFAULT_ORDER = (
  'table_row_index',
  'table_row_number',
  'order_number',
  'cost_usd',
  'cost_rub',
  'delivery_date',
)

TRACKED_SHEET_HEADER_OFFSET = 1                 # Столько строк от начала будет пропущено.
																								# Предполагается, что там заголовок

Словари нужны по той причине, что в функцию _gen_value_string передается iterable: кортеж или список., порядок полей в котором определяется как порядком столбцов в Google-таблице, так и нами, т.к. поля table_row_index и cost_rub отсутствуют в облаке. Функция _gen_value_string сопоставит индекс в списке значений с именем поля, чтобы вставить имя и значение поля так, чтобы они соответствовали друг другу, в строку SQL-запроса. Таким образом, становится возможным привести форматы Google-таблицы и таблицы в БД и добавить новые поля.

Возможно, для нашей задачи это излишнее усложнение, но вот пример, почему я считаю, что эта настройка нужна:

INSERT INTO Orders(
  table_row_index, table_row_number, order_number, cost_usd, cost_rub, delivery_date
) VALUES(3, 2, 1338, 250, 15000, '05.07.2022');

INSERT INTO Orders(
  table_row_index, order_number, cost_usd, cost_rub, delivery_date, table_row_number
) VALUES(3, 2, 1338, 250, 15000, '05.07.2022');

Второй запрос выдаст ошибку, потому что поля перепутаны. В определенной выше функции upsert порядок полей вообще жестко фиксирован. Забегая вперед скажу, что SheetsAPI вернет массив из 4-х элементов - это поля table_row_number, order_number, cost_usd, delivery_date - именно в таком порядке. С точки зрения оптимизации кода целесообразно вставить дополнительные вычисляемые значения в конец массива, что при такой реализации легко сделать. В функцию _gen_value_string теперь можно передать список значений [<table_row_number>, <order_number>, <cost_usd>, <delivery_date>, <table_row_index>, <cost_rub>] - индексы полей соответствуют тем, что описаны в словаре MAIN_TABLE_FIELD_NAME__GOOGLE_SPREADSHEET_COLUMN_IDX__MAP.

Теперь функция _gen_value_string сгенерирует SQL-запрос с правильной расстановкой полей, соответствующей аргументу order:

# script/lib/DBConnector.py

def _gen_value_string(row, order = DEFAULT_ORDER):
  result = '('
  last_field = order[-1]
  for field in order:
    if field == 'delivery_date':
      result += '\''
    result += str(row[MAIN_TABLE_FIELD_NAME__GOOGLE_SPREADSHEET_COLUMN_IDX__MAP[field]])
    if field == 'delivery_date':
      result += '\''
    result += MAIN_TABLE_FIELD_NAME__MAIN_TABLE_FIELD_TYPE[field]
    if field != last_field:
      result += ', '
  return result + ')'

Этап 2. Обновление курса

Интересного тут мало. Идем в XML-API ЦБ РФ и изымаем нужные цифры. Тэги, по которым ищем нужные значения, вынесены в конфиг. Тут парсинг XML простой, поэтому пользуемся встроенными в python методами работы со строками. Для сложных XML документов рекомендую модуль BeautifulSoup. API отдает данные в таком виде (посмотреть пример):

<CharCode>USD</CharCode>
<Name>Доллар США</Name>
<Value>30,9436</Value>
# script/lib/CRB.py

import httplib2
from locale import atof
from datetime import datetime

from .config import VALUTE_CHAR_CODE, VALUTE_INFO_OPEN_TAG, VALUTE_INFO_VALUE_OPEN_TAG, \
  VALUTE_INFO_VALUE_CLOSE_TAG, VALUTE_INFO_NOMINAL_OPEN_TAG, VALUTE_INFO_NOMINAL_CLOSE_TAG, \
  REQUEST_TIMEOUT

class CRB:
  def __init__(self):
    self.client = httplib2.Http(timeout=REQUEST_TIMEOUT)
    self.currency_rate = 0
    self.fetch_currency_rate()

  def fetch_currency_rate(self):
    try:
      cbr_response, cbr_response_s = self.client.request(
        f'http://www.cbr.ru/scripts/XML_daily.asp?date_req={datetime.now().strftime("%d/%m/%Y")}', 'GET',
      )
      cbr_response_s = str(cbr_response_s)

      if cbr_response.get('status') != '200':
        print('Error fetching currency from http://www.cbr.ru/ :\n\t', cbr_response_s)

      try:
        start_charcode_idx = cbr_response_s.index(VALUTE_INFO_OPEN_TAG + VALUTE_CHAR_CODE) + len(VALUTE_INFO_OPEN_TAG + VALUTE_CHAR_CODE)
        start_value_idx = cbr_response_s.index(VALUTE_INFO_VALUE_OPEN_TAG, start_charcode_idx) +  len(VALUTE_INFO_VALUE_OPEN_TAG)
        end_value_idx = cbr_response_s.index(VALUTE_INFO_VALUE_CLOSE_TAG, start_value_idx)
        start_nominal_idx = cbr_response_s.index(VALUTE_INFO_NOMINAL_OPEN_TAG) + len(VALUTE_INFO_NOMINAL_OPEN_TAG)
        end_nominal_idx = cbr_response_s.index(VALUTE_INFO_NOMINAL_CLOSE_TAG, start_nominal_idx)
        currency_rate = cbr_response_s[start_value_idx: end_value_idx]
        nominal = cbr_response_s[start_nominal_idx: end_nominal_idx]
        self.currency_rate = atof(currency_rate.replace(',','.')) / atof(nominal.replace(',','.'))
      except ValueError:
        print(f'Error: unable to fetch {VALUTE_CHAR_CODE} currency rate. The CBR API has changed')


    except Exception as e:
      print(f'Error: unable to fetch {VALUTE_CHAR_CODE} currency rate:', e)
    finally:
      return self.currency_rate
    

Приступим же скорее к самому интересному!

Этапы 2-4. Работа с Sheets API

Понадобятся следующие данные об аккаунтах и таблицах:

# script/lib/config.py

CREDENTIALS_FILE = 'secret/credentials.json'    # Google service account credentials
CONFIG_FILENAME = 'gsheets_cached_config.json'  # Информация о состоянии таблицы на диске
                                                # Если этого файла нет, скрипт создаст новую таблицу
EMAIL_ADDRS = ['example1@gmail.com', 'example2@gmail.com']
                                                # gmail'ы, которым будет предоставлен доступ к таблице-копии
SOURCE_SPREADSHEET = '...'                      # ID исходной таблицы
TRACKED_SHEET_NAME = 'SourceSheet'              # Имя листа с копией
TRACKED_SHEET_HEADER_OFFSET = 1                 # Столько строк от начала будет пропущено.
																								# Предполагается, что там заголовок
START_COLUMN = 'A'                              # Диапазон интересующих значений [от включая]:[до включая]
END_COLUMN = 'D'

Если Вы ранее переходили по ссылке на статью, посвященной регистрации сервисного аккаунта и заметкам по работе с SheetsAPI, то уже знаете, откуда брать SpreadSheetID, если же нет, то его можно посмотреть в адресной строке браузера:

def gen_ss_link(ssID):
  return 'https://docs.google.com/spreadsheets/d/' + ssID

Все вызовы Google Drive API и Sheets API возможны лишь при условии, что Вы авторизованы. Для этого мы ранее получили файл с ключами - credentials.json.

Авторизация через credentials и чтение конфига созданной скриптом Google-таблицы, если он запустился впервые:

# script/lib/GSheets.py

import httplib2 
from googleapiclient.discovery import build
from oauth2client.service_account import ServiceAccountCredentials
import os
import json

from .ConfigLoader import ConfigLoader

credentials = ServiceAccountCredentials.from_json_keyfile_name(
  os.path.join(SECRETS_DIRECTORY, CREDENTIALS_FILE), [
    'https://www.googleapis.com/auth/spreadsheets',
    'https://www.googleapis.com/auth/drive',
])

class GSheets:
  def __init__(self):
    httpAuth = credentials.authorize(httplib2.Http(timeout=REQUEST_TIMEOUT)) # Авторизуемся в системе

    self.drive_service = build('drive', 'v3', http = httpAuth)
    self.sheets_service = build('sheets', 'v4', http = httpAuth) # Выбираем работу с таблицами и 4 версию API
    
    config = ConfigLoader(SECRETS_DIRECTORY, [CONFIG_FILENAME]).config.get(
      os.path.splitext(CONFIG_FILENAME)[0]
    )
    if config == None:
      config = {}

    self.spreadsheet_id = config.get('spreadsheetId')
    self.main_sheet_id = config.get('mainSheetId')
    self.start_page_token = config.get('startPageToken')
    self.page_token = config.get('startPageToken')
    self.last_update = config.get('lastUpdate')

    if not self.spreadsheet_id or not self.main_sheet_id or not self.start_page_token:
      print(f'GSheets: A config {CONFIG_FILENAME} doesn\'t match a following format requirement')
      pprint({
        "spreadsheetId": 'spreadsheet id numbers as a string',
        "mainSheetId":   'main table sheet id numbers as a string',
        "start_page_token":   'changes tracking token',
      })
      self._create()

Если чтение не удалось, экземпляр класса вызовет метод _create(). В этом случае нужно создать таблицу и выдать права нам, пользователям, на чтение и редактирование.

Но сначала расскажу о другом методе нашего класса. Для некоторых функций API в библиотеке googleapiclient, на которой строится весь код, есть отдельные методы, однако иные действия в API выполняются только через метод batch, который использует некий универсальный формат обращения к серверу Google и может выполнять все функции. Оберну его для удобства:

# script/lib/GSheets.py

  def batch(self, requests: list):
    try:
      return self.sheets_service.spreadsheets().batchUpdate(
        spreadsheetId=self.spreadsheet_id,
        body={
          'requests': requests
        }).execute()
    except Exception as e:
      print(f'GSheets error: unable to fetch {self.spreadsheet_id} table with Google Sheets API:', e)
      return {}

Вернемся к созданию таблицы и правам доступа. Для этих задач воспользуемся воспользуемся методами sheets_service.spreadsheets().create(...) и drive_service.permissions().create(...). Первый из них вернет SpreadsheetID. Сохраним его в поле self.spreadsheet_id.

# script/lib/GSheets.py
  
  def _create(self):
    # init
    spreadsheet = self.sheets_service.spreadsheets().create(body = {
      'properties': {'title': 'Копия источника', 'locale': 'ru_RU'},
    }).execute()

    self.spreadsheet_id = spreadsheet['spreadsheetId']
    print('GSheets: A spreadsheet created!\n>', gen_ss_link(self.spreadsheet_id))

    # об этом позже. понадобится для отслеживания изменений.
    response = self.drive_service.changes().getStartPageToken().execute()
    self.start_page_token = response.get("startPageToken")
    self.page_token = response.get("startPageToken")
    
    for email in EMAIL_ADDRS:
      self.drive_service.permissions().create(
        fileId = self.spreadsheet_id,
        body = {
          'type': 'user',
          'role': 'writer',
          'emailAddress': email,
        },
        fields = 'id'
      ).execute()

    self._init_source_spreadsheet()
    self._dump_config()

Метод _init_source_spreadsheet() создаст новую таблицу и даст ей имя с помощью двух обращений к API: sheets_service.spreadsheets().sheets().copyTo(...) и ..., а вот тут понадобится batch():

# script/lib/GSheets.py

  def _init_source_spreadsheet(self):
    response = self.sheets_service.spreadsheets().sheets().copyTo(
      spreadsheetId = SOURCE_SPREADSHEET,
      sheetId = 0,
      body = {
        'destinationSpreadsheetId': self.spreadsheet_id,
    }).execute()

    self.main_sheet_id = response.get("sheetId")
    self.renameSheet(response.get("sheetId"), TRACKED_SHEET_NAME)
    
  def renameSheet(self, sheetId, newName):
    return self.batch({
      "updateSheetProperties": {
        "properties": {
          "sheetId": sheetId,
          "title": newName,
        },
        "fields": "title",
      }
    })
 

После создания экземпляра класса на почту придет уведомление о получении доступа к некой таблице, автором которой является ...@... .iam.gserviceaccount.com. Перейдем по ссылке в письме и увидим:

Рисунок 3. Копия исходной таблице в пространстве сервисного аккаунт
Рисунок 3. Копия исходной таблице в пространстве сервисного аккаунт

Остается сохранить данные о таблице на диск, чтобы не пересоздавать таблицу каждый раз:

# script/lib/GSheets.py
  
  def _dump_config(self):
    with open(os.path.join(SECRETS_DIRECTORY, CONFIG_FILENAME), 'w') as config_file:
      json.dump({
        "spreadsheetId": self.spreadsheet_id,
        "mainSheetId":   self.main_sheet_id,
        "startPageToken": self.start_page_token,
        "lastUpdate": self.last_update,
      }, config_file)

Научим скрипт читать строки из таблицы Google. В этом нам поможет метод sheets_service.spreadsheets().values().get(...). Новая для нас абстракция values, использующаяся в SheetsAPI для выполнения нескольких действий за один запрос. В качестве одного из аргументов тела запроса требуется указать экземпляр другой концепции - range (диапазон) вида A1:D4 - мы используем его для указания на фрагмент таблицы. Для чтения нескольких диапазонов используйте values.batchGet.

# script/lib/GSheets.py

  def get_chunk(self, start_idx):
    '''
    returns tuple:
    0: idx to continue iteration
    1: start row_idx in a google spreadsheet
    2: end row_idx in a google spreadsheets
    3: array of rows with values from START_COLUMN up to END_COLUMN
    or NONE, if chunk is empty
    '''
    range_from_to = (start_idx + TRACKED_SHEET_HEADER_OFFSET + 1,
                     start_idx + CHUNK_SIZE + TRACKED_SHEET_HEADER_OFFSET)
    try:
      response = self.sheets_service.spreadsheets().values().get(
        spreadsheetId=self.spreadsheet_id,
        range=f'{TRACKED_SHEET_NAME}!{START_COLUMN}{range_from_to[0]}:{END_COLUMN}{range_from_to[1]}'
      ).execute()
      return (start_idx + CHUNK_SIZE, *range_from_to, response.get("values"))

    except Exception as e:
      print(f'GSheets error: unable to fetch {self.spreadsheet_id} table with Google Sheets API:', e)
      return (start_idx + CHUNK_SIZE, *range_from_to, {})

Этапы 2-4. Узнаем об изменениях в файле

Последняя нерешенная задача - отслеживание изменений.

Казус состоит в том, что SheetsAPI не предоставляет возможностей узнать об изменениях в документе, несмотря на то, что функция истории изменений, причем детальной и с возможностью восстановления, в Google-таблицах есть.

Рисунок 4. История версий в веб-клиенте GoogleSheets.
Рисунок 4. История версий в веб-клиенте GoogleSheets.

После изучения форумов и документации, стало ясно, что узнать, было изменение или и нет, а так же кто вносил изменения, можно только через Google Drive API. Какие именно строки были затронуты, узнать нельзя. Тут поможет метод drive_service.changes().list(...). Особенность состоит в том, что изменения файла на Google диске представлены для нас, пользователей API, как временной ряд, где вместо временной метки, мы должны передать page_token. Эта система напоминает классическую пагинацию. Однако, при обращении к API мы можем получить сообщения двух видов: либо с полем newStartPageToken, либо с полем nextPageToken.

# script/lib/GSheets.py

  def check_changes(self):
    self.page_token = self.start_page_token
    were_changes_in_a_file = False
    start_page_token = self.start_page_token

    try:
      while self.page_token is not None:

        response = self.drive_service.changes().list(
          pageToken=self.page_token,
          spaces='drive',
        ).execute()
        
        for change in response.get('changes'):
          file_id =  change.get("fileId")
          # проверка, что данный документ - интересующая нас
          # таблица, а не другой файл 
          if file_id == self.spreadsheet_id:
            were_changes_in_a_file = True
        if 'newStartPageToken' in response:
          self.start_page_token = response.get('newStartPageToken')
        self.page_token = response.get('nextPageToken')

      # Сохраняем факт изменения 
      if start_page_token != self.start_page_token:
        self.last_update = f'{datetime.now():%X %d.%m.%Y}'
        self._dump_config()
    
    except Exception as e:
      print(f'GSheets error: unable to fetch {self.spreadsheet_id} table with Google Sheets API:', e)
    finally:
      return were_changes_in_a_file

Чтобы прояснить как работает этот код, обратимся к документации метода drive_service.changes().list(...).

Request > parameters

pageToken - string - The token for continuing a previous list request on the next page. This should be set to the value of 'nextPageToken' from the previous response or to the response from the getStartPageToken method.

Response

nextPageToken - string - The page token for the next page of changes. This will be absent if the end of the changes list has been reached. If the token is rejected for any reason, it should be discarded, and pagination should be restarted from the first page of results.

newStartPageToken - string - The starting page token for future changes. This will be present only if the end of the current changes list has been reached

Итак, тут сказано, что сущность изменения подразделяется на страницы. Каждое изменение может содержать несколько страниц. Мы отправляем некий pageToken и получаем nextPageToken, при отправке которого получим следующую страницу. В ответе на запрос о последней странице текущего изменения поле nextPageToken будет отсутствовать, и эта строчка кода

self.page_token = response.get('nextPageToken')

сохранит в self.page_token значение None - завершаем итерацию. Эта итерация происходит лишь по одной сущности изменения. Как узнать, что появилось еще одно изменение? Если появится новое изменение, очередной запрос вернет нам ответ с полем newStartPageToken. Чтобы не итерировать всю многостраничную книгу изменений по нескольку раз, необходимо сохранять последний newStartPageToken на диск. Сейчас мы храним следующую информацию:

{
  "spreadsheetId": "2Ds5A7z9AGs17B0xXBNCn_nSbE6zUitLvtLzrW3B05-w",
  "mainSheetId": 1886489919,
  "startPageToken": "129",
  "lastUpdate": "02:25:10 12.07.2022"
}

Кстати, откуда брать pageToken для самого первого запроса? Надо предварительно получить его с помощью метода drive_service.changes().getStartPageToken():

drive_service.changes().getStartPageToken().execute()

Этот вызов мы делали в методе _create() класса GSheets, если не обнаруживали токен в файле на диске.

Нас абсолютно не интересует, кто внес изменения. Для нас Google-таблица - единственный достоверный источник. Возможно, было бы полезно знать, какие именно строки были затронуты - это помогло бы оптимизировать чтение через сеть. Но так как API не предоставляет нам такой информации, мы просто пролистываем все страницы в изменении. Давайте взглянем на структуру ответа, несущую непосредственную информацию.

Согласно документации эта структура называется ChangesResource и выглядит так (nextPageToken, newStartPageToken опущены):

{
  "kind": "drive#change",
  "type": string,                    deprecated
  "changeType": string,              file или drive
  "time": datetime,
  "removed": boolean,
  "fileId": string,
  "file": files Resource,
  "teamDriveId": string,             deprecated
  "driveId": string,
  "teamDrive": teamdrives Resource,  deprecated
  "drive": drives Resource
}

На деле мы обычное получаем лишь некоторые поля. Часть из них являются вложенными JSON'ами. Выполним сначала это:

self.drive_service.changes().list(pageToken=self.page_token,
                                                   spaces='drive').execute()

результат - структура ChangesResource:

{'changes': [{'changeType': 'file',
              'file': {'id': '1LuOz28wtbP5okLJ6nNwzuS03zynGYvwVw_9uaLF6n_U',
                       'kind': 'drive#file',
                       'mimeType': 'application/vnd.google-apps.spreadsheet',
                       'name': 'Копия источника'},
              'fileId': '1LuOz28wtbP5okLJ6nNwzuS03zynGYvwVw_9uaLF6n_U',
              'kind': 'drive#change',
              'removed': False,
              'time': '2022-07-06T17:26:23.907Z',
              'type': 'file'}],
 'kind': 'drive#changeList',
 'newStartPageToken': '54'}

Не будем ничего менять в таблице, и выполним то де обращение к API еще раз:

{
  'changes': [],
  'kind': 'drive#changeList',
  'newStartPageToken': '54'
}

Поменяем значение какой-нибудь ячейки:

{'changes': [{'changeType': 'file',
              'file': {'id': '1LuOz28wtbP5okLJ6nNwzuS03zynGYvwVw_9uaLF6n_U',
                       'kind': 'drive#file',
                       'mimeType': 'application/vnd.google-apps.spreadsheet',
                       'name': 'Копия источника'},
              'fileId': '1LuOz28wtbP5okLJ6nNwzuS03zynGYvwVw_9uaLF6n_U',
              'kind': 'drive#change',
              'removed': False,
              'time': '2022-07-06T17:34:19.641Z',
              'type': 'file'}],
 'kind': 'drive#changeList',
 'newStartPageToken': '56'}

newStartPageToken говорит нам о том, что изменение включало в себя лишь одно действие. При следующем запросе надо указать {... "pageToken": "56" }.

Этап 5. Конец Пути

Вот, в принципе, и всё, что нужно нам для решения задачи.

Напишем main.py скрипт, который создаст экземпляр GSheets, который, в свою очередь, подключится к SheetsAPI и Google Drive API, экземпляр класса CRB, который заберет текущее значение курса доллара и экземпляр класса DBConnector, который будет отвечать за взаимодействие с СУБД.

Далее с помощью библиотеки apscheduler настроим периодический вызов корутин. Добавим пару новых значений в конфиг:

# script/lib/config.py

REQUEST_TIMEOUT = 200      # в секундах
CRB_POLLING_INTERVAL = 1   # в часах
GS_POLLING_INTERVAL =  15  # Интервал опроса Google spreadsheet, в минутах
# script/main.py

from lib.DBConnector import DB
from lib.CRB import CRB
from lib.GSheets import GSheets
import asyncio
from apscheduler.schedulers.asyncio import AsyncIOScheduler

from lib.config import CRB_POLLING_INTERVAL, GS_POLLING_INTERVAL


if __name__ == '__main__':
  scheduler = AsyncIOScheduler()
  sheet = GSheets()
  print(sheet, '\n')
  crb = CRB()
  with DB() as db:
    db.update_cost_rub(crb.currency_rate)
    print('USD currency rate:', crb.currency_rate, '\n')


  async def sheet_check_job():
    print('[sheet_check_job] checking the spreadsheet')
    if not sheet.check_changes():
      print('[sheet_check_job] no changes')
      return

    with DB() as db:
      print('[sheet_check_job] committing changes')
      rows = ['']
      idx = 0
      while True:
        idx, ss_start_row_idx, _, rows = sheet.get_chunk(idx)
        if not rows:
          break
        for i, row in enumerate(rows):
          # rows like [] or rows with spaces
          if (len(row) < 4):
            continue
          row.extend((ss_start_row_idx + i, 0))
        db.upsert(rows)
      print('[sheet_check_job] done!')

  async def currency_rate_check_job():
    with DB() as db:
      db.update_cost_rub(crb.fetch_currency_rate())
      print('[currency_rate_check_job] the USD-RUB currency rate has been updated:', crb.currency_rate)
      

  scheduler.add_job(currency_rate_check_job, "interval", hours=CRB_POLLING_INTERVAL)
  scheduler.add_job(sheet_check_job, "interval", minutes=GS_POLLING_INTERVAL)
  scheduler.start()

  print('Press Ctrl+C to exit\n')

  try:
    asyncio.get_event_loop().run_forever()
  except (KeyboardInterrupt, SystemExit):
    scheduler.shutdown()
    asyncio.get_event_loop().stop()

Как видно, какой-то особой магии тут нет, особенно если вы знакомы с корутинами из Python 3.5. Единственное, нужно настроить, завершение скрипта. Для полного освобождения ресурсов, надо остановить планировщик и завершить eventloop. Я делаю это при нажатии Ctrl + C. Между Ctrl + C на Windows и Linux есть различие, поэтому обработчик настроен на 2 исключения, вызванные как сигналом SIGINT, так и сигналом SIGBREAK. В данной реализации можно столкнуться еще с тем, что планировщик одновременно может выполнять только одну задачу несмотря на наличие eventloop, поэтому, если временные интервалы currency_rate_check_job и sheet_check_job пересекутся, будет выполнена только одна задача, а вторая будет отложена на еще один свой интервал.

Заключение

Респект всем, кто прочел до конца.

Google Drive - очень классный продукт, а его API позволяет автоматизировать множество рутинных задач, причем бесплатно, что немаловажно. Можно настроить синхронизацию файлов на разных компьютерах через облако, запустить сбор данных с разных источников в одну таблицу, использовать облако как ядро ETL-системы, настроить рассылку важной информации через Телеграм. Уверен, Вы найдете еще массу применений Google-диску для автоматизации самых разнообразных офисных задач.

Напомню, что код доступен всем желающим в этом репозитории на GitLab.

Обязательно оставляйте отзывы! Если будет много положительных, выпущу продолжение, где будем писать веб-клиент под это безобразие с бэкендом на Python Django и фронтендом на JS React.

До связи!

Теги:
Хабы:
Всего голосов 5: ↑3 и ↓2+1
Комментарии0

Публикации

Истории

Работа

Python разработчик
136 вакансий
Data Scientist
60 вакансий

Ближайшие события