Уверен многие тру-программисты и без меня это знают, но я решился опубликовать пару реализаций циклов через while, которыми я активно пользуюсь, как автоматизатор, тестировщик и разработчик ETL.
while not all(check_list)
При переносе данных из одной базы в другую, часто требуется перенести не одну таблицу, а две и более. Перенести, значит удалить данные в таблице на приемнике и затем вставить данные, взятые с источника:
-- SQL
delete from my_shema.my_table
where my_column = 'my_variable';
commit;
insert into my_shema.my_table
select *
from my_shema.my_table@src_dblink
where my_column = 'my_variable';
commit;
Дополнительно, нужно полазить по схеме и выяснить зависимости между таблицами, которые могут вызывать исключения типа ORA-02291 и ORA-02292 (это когда мы не можем или удалить данные в таблице, не удалив предварительно данные из другой таблицы, или вставить, предварительно не вставив данные в другую таблицу).
Грубо говоря нам нужно обеспечить каскадное удаление данных, а затем каскадную вставку.
Чтобы этого не делать, можно при срабатывании исключения БД отправлять DML-операцию в конец очереди и для этого можно использовать цикл while not all(check_list):
# python3.8
import cx_Oracle
# составляем чек-лист переноса данных,
# если значение в списке 0, значит данные не перенесены, обратно, если 1
etl_objects = [('MY_SCHEMA', 'MY_TABLE1'), ('MY_SCHEMA', 'MY_TABLE2'), ...]
check_list = [0 for _ in range(len(etl_objects))]
i = 0
while not all(check_list):
i = i % len(check_list) # нужно для второго и последующих циклов
try:
if not check_list[i]:
with cx_Oracle.connect(user=user,
password=password,
tns=tns,
encoding='utf-8') as db_conn:
with db_conn.cursor() as cursor:
sql_text = (f"delete from {etl_objects[i][0]}.{etl_objects[i][1]} "
"where my_column = 'my_variable'")
cursor.execute(sql_text)
cursor.execute('commit')
sql_text = (f"insert into {etl_objects[i][0]}.{etl_objects[i][1]} "
"select * "
f"from {etl_objects[i][0]}.{etl_objects[i][1]}@src_dblink "
"where my_column = 'my_variable';")
cursor.execute(sql_text)
cursor.execute('commit')
check_list[i] = 1
except Exception as e:
print(f"Сработало исключение {str(e)}: объект загрузки отправлен в конец очереди")
i += 1
Понятное дело, что формально объект загрузки не отправляется в конец очереди, но цикл while будет повторяться до тех пор, пока не останется ни одного незагруженного объекта. Также понятное дело, что если сработали другие исключения, отличные от ORA-02291 и ORA-02292, то цикл рискует стать бесконечным, но этот пример подразумевает, что в остальном ограничения источника и приемника не будут мешать загрузке.
while i < len(table_trg) and j < len(table_src)
При тестировании DDL таблиц в разных базах данных может понадобиться сравнивать их наборы полей и типов данных, которые могут отличаться. То есть семантика таблиц одинаковая, а реализация отличается. Например, обе таблицы хранят один и тот же набор основных данных, но названия полей погут отличаться, также для полей с одинаковыми названием и назначением могут отличаться типы данных или ограничения типа данных.
В oracle вывести набор полей и типов данных из двух источников можно с помощью запросов:
-- SQL
select column_name, data_type, data_length ,...,
from all_tab_columns
where owner = 'MY_SHEMA' and table_name = 'MY_TABLE';
select column_name, data_type, data_length ,...,
from all_tab_columns@src_dblink
where owner = 'MY_SHEMA' and table_name = 'MY_TABLE';
В идеале наборы и значения полей должны быть идентичны, но не всегда бывает так, особенно, когда базы данных живут своими жизнями и одна жизнь отстает в развитии от другой. Так вот, чтобы узнать в чем она отстает или в чем они расходятся между собой можно с помощью кода:
# python3.8
import cx_Oracle
with cx_Oracle.connect(user=user,
password=password,
tns=tns,
encoding='utf-8') as db_conn:
with db_conn.cursor() as cursor:
sql_text = ("select column_name, data_type, data_length "
"from all_tab_columns "
"where owner = 'MY_SHEMA' and table_name = 'MY_TABLE' "
"order by column_name")
cursor.execute(sql_text)
table_trg = cursor.fetchall()
sql_text = ("select column_name, data_type, data_length "
"from all_tab_columns@src_dblink "
"where owner = 'MY_SHEMA' and table_name = 'MY_TABLE' "
"order by column_name")
cursor.execute(sql_text)
table_src = cursor.fetchall()
i, j = 0, 0
tmp_table_trg, tmp_table_src = [], []
while i < len(table_trg) and j < len(table_src):
if table_trg[i][0] == table_src[j][0]: # сравниваем названия полей
tmp_table_trg.append(table_trg[i])
tmp_table_src.append(table_src[j])
i += 1
j += 1
elif table_trg[i][0] < table_src[j][0]:
# если название поля меньше, то в правой таблице поля с таким же названием нет
tmp_table_trg.append(table_trg[i])
tmp_table_src.append((None, None, None)) # добавляем в правую таблицу "пустую" строку
i += 1
else:
tmp_table_trg.append((None, None, None)) # добавляем в левую таблицу "пустую" строку
tmp_table_src.append(table_src[j])
j += 1
diffs = list(filter(lambda x: x[0] != x[1], zip(tmp_table_trg, tmp_table_src)))
Вот такие две реализации циклов через while я использую в своей работе.
Первый пример ( while not all() ) помогает гонять циклы по чек-листам, а второй ( while i < len(table_trg) and j < len(table_src) ), помогает выравнивать списки кортежей.
Чтобы мой код выше можно было проверить, если у вас нет возможности делать запросы к БД, можно переписать мой код на работу с таблицами csv.