· 7 months ago · Apr 20, 2025, 03:35 PM
1# clickhouse_connector.py
2
3import logging
4from urllib.parse import quote_plus
5import pandas as pd
6from math import ceil # Может понадобиться для отладки или логов
7
8try:
9 from logger import logger
10except ImportError:
11 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
12 logger = logging.getLogger(__name__)
13
14try:
15 from sqldbclient import SqlExecutor, SqlExecutorConf
16except ImportError:
17 logger.error("Библиотека sqldbclient не найдена. Установите ее: pip install sqldbclient")
18 # Заглушки...
19 class SqlExecutorConf:
20 def set(self, key, value): return self
21 class Builder:
22 def config(self, conf): return self
23 def get_or_create(self): return None
24 class SqlExecutor:
25 builder = Builder()
26 def execute(self, query, limit=None): return None # Добавим limit для совместимости заглушки
27
28
29class ClickHouseConnector:
30 """
31 Класс для управления подключением к ClickHouse
32 с использованием sqldbclient. Включает пагинацию.
33 """
34
35 def __init__(self, config):
36 """
37 Инициализация подключения к ClickHouse через SqlExecutor.
38 Args:
39 config (dict): Словарь с параметрами подключения
40 (ожидаются ключи 'host', 'port', 'username', 'password', 'database').
41 """
42 self.config = config
43 self.executor = None
44 self._initialize_executor()
45
46 def _initialize_executor(self):
47 """Инициализирует SqlExecutor на основе конфигурации."""
48 # ... (код инициализации остается без изменений) ...
49 try:
50 host = self.config['host']
51 port = self.config.get('port', 8123) # Порт по умолчанию для HTTP
52 username = self.config['username']
53 password = self.config['password']
54 database = self.config.get('database') # Получаем базу данных из конфига
55
56 quoted_password = quote_plus(password)
57
58 # Формируем URL. Включаем базу данных, если она указана.
59 db_path = f"/{database}" if database else ""
60 engine_url = f'clickhouse+http://{username}:{quoted_password}@{host}:{port}{db_path}'
61
62 logger.info(f"Инициализация SqlExecutor для ClickHouse: clickhouse+http://{username}:***@{host}:{port}{db_path}")
63
64 conf = SqlExecutorConf() \
65 .set('engine_options', engine_url) \
66 .set('history_db_name', 'ch_executor_history.db')
67
68 self.executor = SqlExecutor.builder.config(conf).get_or_create()
69 logger.info("SqlExecutor для ClickHouse успешно инициализирован.")
70
71 except KeyError as e:
72 logger.error(f"Ошибка конфигурации ClickHouse: отсутствует обязательный ключ {e}")
73 self.executor = None
74 except Exception as e:
75 logger.error(f"Ошибка при инициализации SqlExecutor для ClickHouse: {e}", exc_info=True)
76 self.executor = None
77
78
79 def connect(self):
80 """ Проверка соединения с ClickHouse. """
81 # ... (код connect остается без изменений) ...
82 if self.executor is None:
83 logger.error("SqlExecutor не был инициализирован из-за ошибки конфигурации.")
84 return False
85 try:
86 result_df = self.executor.execute('SELECT 1')
87 if not result_df.empty and result_df.iloc[0, 0] == 1:
88 logger.info("Успешное подключение к ClickHouse (проверено через SqlExecutor)!")
89 return True
90 else:
91 logger.error(f"Проверка подключения к ClickHouse не удалась. Результат: {result_df}")
92 return False
93 except Exception as e:
94 logger.error(f"Ошибка при проверке подключения к ClickHouse через SqlExecutor: {e}")
95 return False
96
97 # --- НОВЫЙ МЕТОД С ПАГИНАЦИЕЙ ---
98 def extract_data_paginated(self, base_query, order_by_column, chunk_size=10000, settings=None):
99 """
100 Извлекает данные из ClickHouse частями (пагинация) с использованием генератора.
101
102 Args:
103 base_query (str): Базовый SQL-запрос БЕЗ 'ORDER BY', 'LIMIT', 'OFFSET'.
104 Например: "SELECT * FROM my_table WHERE date = today()"
105 order_by_column (str): Имя колонки или выражения для 'ORDER BY'.
106 Крайне важно для стабильной пагинации.
107 Например: 'event_date, unique_id' или просто 'your_date_column'.
108 chunk_size (int): Количество строк для извлечения за один запрос (размер чанка).
109 Должно быть <= лимита sqldbclient (обычно 10000).
110 settings (dict, optional): Дополнительные настройки ClickHouse ('SETTINGS ...').
111
112 Yields:
113 pd.DataFrame: DataFrame с очередной порцией данных (чанком).
114
115 Raises:
116 Exception: Если SqlExecutor не инициализирован или произошла ошибка при выполнении запроса.
117 ValueError: Если не указан order_by_column или base_query.
118 """
119 if self.executor is None:
120 logger.error("Соединение с ClickHouse (SqlExecutor) не установлено.")
121 raise Exception("Соединение с ClickHouse (SqlExecutor) не установлено.")
122 if not base_query:
123 raise ValueError("Необходимо указать базовый SQL-запрос ('base_query').")
124 if not order_by_column:
125 raise ValueError("Необходимо указать колонку для сортировки ('order_by_column') для пагинации.")
126 if chunk_size <= 0:
127 raise ValueError("'chunk_size' должен быть положительным числом.")
128 # Добавим предупреждение, если chunk_size подозрительно большой
129 if chunk_size > 10000:
130 logger.warning(f"Задан chunk_size={chunk_size}, что больше стандартного лимита sqldbclient (10000). "
131 f"Убедитесь, что лимит sqldbclient был изменен, иначе запросы могут обрезаться.")
132
133
134 offset = 0
135 total_fetched = 0
136
137 # Формируем часть запроса с SETTINGS
138 settings_str = ""
139 if settings:
140 settings_parts = []
141 for key, value in settings.items():
142 if isinstance(value, str):
143 settings_parts.append(f"{key} = '{value}'")
144 else:
145 settings_parts.append(f"{key} = {value}")
146 if settings_parts:
147 settings_str = " SETTINGS " + ", ".join(settings_parts)
148
149 logger.info(f"Начало пагинации: chunk_size={chunk_size}, сортировка по '{order_by_column}'.")
150 logger.debug(f"Базовый запрос: {base_query}")
151
152 while True:
153 # Формируем полный SQL-запрос для текущей страницы
154 paginated_query = f"{base_query} ORDER BY {order_by_column} LIMIT {chunk_size} OFFSET {offset}{settings_str}"
155 logger.debug(f"Запрос чанка (offset={offset}): {paginated_query[:500]}{'...' if len(paginated_query) > 500 else ''}")
156
157 try:
158 # Выполняем запрос. НЕ передаем limit в execute, т.к. он уже в SQL!
159 chunk_df = self.executor.execute(paginated_query)
160
161 # Проверка и конвертация типа (на всякий случай)
162 if not isinstance(chunk_df, pd.DataFrame):
163 logger.warning(f"Executor вернул не DataFrame ({type(chunk_df)}) для offset={offset}. Конвертация...")
164 try:
165 chunk_df = pd.DataFrame(chunk_df)
166 except Exception as conv_err:
167 logger.error(f"Не удалось преобразовать чанк в DataFrame: {conv_err}. Прерывание.")
168 break # Прерываем пагинацию
169
170 num_fetched_in_chunk = len(chunk_df)
171 logger.debug(f"Получено строк в чанке (offset={offset}): {num_fetched_in_chunk}")
172
173 if num_fetched_in_chunk == 0:
174 logger.info("Получен пустой чанк. Данные закончились.")
175 break # Больше данных нет, выходим из цикла while
176
177 # Возвращаем (yield) полученный чанк данных
178 yield chunk_df
179
180 # Готовимся к следующей итерации
181 total_fetched += num_fetched_in_chunk
182 offset += chunk_size # Смещаемся на размер запрошенного чанка
183
184 # Проверяем, был ли это последний чанк
185 if num_fetched_in_chunk < chunk_size:
186 logger.info(f"Получено {num_fetched_in_chunk} < {chunk_size} строк. Достигнут конец данных.")
187 break # Это был последний чанк
188
189 except Exception as e:
190 logger.error(f"Ошибка при извлечении чанка данных (offset={offset}): {e}", exc_info=True)
191 # Перевыбрасываем ошибку, чтобы ETL пайплайн мог ее обработать
192 raise e
193
194 logger.info(f"Пагинация завершена. Всего извлечено строк: {total_fetched}.")
195
196
197 # --- СТАРЫЙ МЕТОД (можно оставить для совместимости или малых запросов) ---
198 def extract_data(self, table_name=None, query=None, limit=10000):
199 """
200 Извлечение данных из ClickHouse ОДНИМ ЗАПРОСОМ с явным ЛИМИТОМ.
201 ПРЕДУПРЕЖДЕНИЕ: Может быть обрезан лимитом sqldbclient, если limit > 10000.
202 Рекомендуется использовать extract_data_paginated для больших объемов.
203
204 Args:
205 table_name (str): Имя таблицы (если query=None).
206 query (str, optional): SQL запрос.
207 limit (int): SQL LIMIT для добавления к запросу. По умолчанию 10000.
208
209 Returns:
210 pd.DataFrame: DataFrame с данными (или пустой).
211 """
212 if self.executor is None:
213 logger.error("Соединение с ClickHouse (SqlExecutor) не установлено.")
214 raise Exception("Соединение с ClickHouse (SqlExecutor) не установлено.")
215
216 if query is None and table_name:
217 base_query = f"SELECT * FROM {table_name}"
218 logger.info(f'Используется стандартный запрос для таблицы: {table_name}')
219 elif query:
220 base_query = query
221 else:
222 raise ValueError("Необходимо указать 'table_name' или 'query'.")
223
224 # Добавляем LIMIT к запросу
225 final_query = f"{base_query} LIMIT {int(limit)}"
226 logger.info(f"Выполнение ОДИНОЧНОГО запроса (с LIMIT {limit}): {final_query[:200]}{'...' if len(final_query) > 200 else ''}")
227
228 try:
229 # Попытка выполнить запрос. sqldbclient МОЖЕТ все еще ругаться или обрезать,
230 # если его внутренний лимит меньше указанного limit.
231 data_df = self.executor.execute(final_query) # НЕ передаем limit как аргумент execute
232
233 logger.info(f"Извлечено {len(data_df)} строк (одиночный запрос).")
234
235 if not isinstance(data_df, pd.DataFrame):
236 logger.warning(f"SqlExecutor вернул не DataFrame, а {type(data_df)}. Преобразование...")
237 try:
238 data_df = pd.DataFrame(data_df)
239 except Exception:
240 logger.error("Не удалось преобразовать результат в DataFrame.")
241 return pd.DataFrame()
242
243 return data_df
244 except Exception as e:
245 logger.error(f"Ошибка при извлечении данных (одиночный запрос): {e}", exc_info=True)
246 return pd.DataFrame()
247
248 def get_table_schema(self, table_name):
249 """ Получает схему таблицы. """
250 # ... (код get_table_schema остается без изменений) ...
251 if self.executor is None:
252 logger.error("Соединение с ClickHouse (SqlExecutor) не установлено.")
253 raise Exception("Соединение с ClickHouse (SqlExecutor) не установлено.")
254 try:
255 query = f"DESCRIBE TABLE {table_name}"
256 logger.info(f"Получение схемы таблицы: {query}")
257 schema_df = self.executor.execute(query)
258 if not isinstance(schema_df, pd.DataFrame):
259 logger.warning(f"SqlExecutor вернул не DataFrame для схемы, а {type(schema_df)}. Преобразование...")
260 try: schema_df = pd.DataFrame(schema_df)
261 except Exception:
262 logger.error("Не удалось преобразовать схему в DataFrame.")
263 return pd.DataFrame()
264 logger.info(f"Схема для таблицы {table_name} получена.")
265 return schema_df
266 except Exception as e:
267 logger.error(f"Ошибка при получении схемы таблицы из ClickHouse: {e}")
268 return pd.DataFrame()
269
270
271
272# etl_pipeline.py
273
274from datetime import datetime
275from clickhouse_connector import ClickHouseConnector # Импортируем обновленный коннектор
276from postgres_connector import PostgreSQLConnector
277from data_transformer import DataTransformer
278from logger import logger
279import pandas as pd
280
281class ETLPipeline:
282 """
283 Класс, реализующий ETL пайплайн с использованием пагинации для ClickHouse.
284 """
285
286 def __init__(self, config):
287 """
288 Инициализация ETL пайплайна.
289
290 Args:
291 config (dict): Словарь с конфигурациями. Ожидается:
292 config['clickhouse'] - параметры ClickHouse
293 config['postgres'] - параметры PostgreSQL
294 config['general']:
295 'table_name_click' - имя таблицы в ClickHouse
296 'table_name_postgress' - имя таблицы в PostgreSQL
297 'order_by_column_click' - колонка для сортировки в ClickHouse (ВАЖНО!)
298 'chunk_size_click' - размер чанка для ClickHouse (<= 10000)
299 'batch_size_postgress' - размер батча для загрузки в PostgreSQL
300 'base_query_click' (optional) - базовый SQL запрос для ClickHouse
301 (если не указан, используется SELECT *)
302 """
303 self.config = config
304 self.clickhouse_connector = ClickHouseConnector(config['clickhouse'])
305 self.postgres_connector = PostgreSQLConnector(config['postgres'])
306 self.data_transformer = DataTransformer()
307
308 # Параметры из секции [general]
309 self.table_name_clickhouse = config['general']['table_name_click']
310 self.table_name_postgress = config['general']['table_name_postgress']
311 self.order_by_column = config['general']['order_by_column_click'] # Колонка для ORDER BY
312 self.chunk_size = int(config['general']['chunk_size_click']) # Размер чанка CH
313 self.pg_batch_size = int(config['general']['batch_size_postgress'])# Размер батча PG
314 self.base_query = config['general'].get('base_query_click') # Опциональный базовый запрос
315
316 # Валидация chunk_size
317 if self.chunk_size <= 0:
318 raise ValueError("chunk_size_click должен быть больше 0")
319 if self.chunk_size > 10000:
320 logger.warning(f"chunk_size_click={self.chunk_size} > 10000. Убедитесь, что лимит sqldbclient позволяет это.")
321
322 # Формируем базовый запрос, если он не задан
323 if not self.base_query:
324 self.base_query = f"SELECT * FROM {self.table_name_clickhouse}"
325 logger.info(f"Параметр 'base_query_click' не задан, используется: {self.base_query}")
326
327
328 def run(self):
329 """
330 Запуск ETL пайплайна с пагинацией.
331 """
332 start_time = datetime.now()
333 logger.info("Запуск ETL пайплайна с пагинацией...")
334 total_rows_processed = 0
335 processed_chunks = 0
336
337 try:
338 # 1. Проверяем соединения
339 if not self.clickhouse_connector.connect():
340 logger.error("Не удалось подключиться к ClickHouse. ETL pipeline остановлен.")
341 return
342 self.postgres_connector.connect()
343
344 # (Опционально) Создание таблицы в PostgreSQL, если нужно
345 # Это нужно делать до начала загрузки данных
346 # try:
347 # logger.info("Получение схемы из ClickHouse для создания таблицы в PostgreSQL...")
348 # ch_schema = self.clickhouse_connector.get_table_schema(self.table_name_clickhouse)
349 # if not ch_schema.empty:
350 # # Адаптируйте create_table_if_not_exists, чтобы он работал со схемой из CH
351 # # Возможно, понадобится преобразование типов данных
352 # self.postgres_connector.create_table_if_not_exists(self.table_name_postgress, ch_schema)
353 # else:
354 # logger.warning("Не удалось получить схему из ClickHouse, таблица в PostgreSQL не создается.")
355 # except Exception as schema_err:
356 # logger.error(f"Ошибка при получении схемы или создании таблицы в PostgreSQL: {schema_err}")
357 # # Решите, останавливать ли пайплайн при ошибке схемы
358
359 # --- Основной цикл ETL с пагинацией ---
360 logger.info("Начало извлечения, трансформации и загрузки данных по частям...")
361
362 # Используем генератор пагинации
363 data_chunk_generator = self.clickhouse_connector.extract_data_paginated(
364 base_query=self.base_query,
365 order_by_column=self.order_by_column,
366 chunk_size=self.chunk_size
367 )
368
369 for i, data_chunk_df in enumerate(data_chunk_generator):
370 chunk_start_time = datetime.now()
371 processed_chunks += 1
372 current_chunk_rows = len(data_chunk_df)
373 logger.info(f"--- Обработка чанка #{processed_chunks} ({current_chunk_rows} строк) ---")
374
375 if data_chunk_df.empty:
376 logger.info("Получен пустой чанк (возможно, конец данных), пропуск.")
377 continue # Пропускаем пустые чанки (хотя генератор не должен их выдавать)
378
379 # 2. Трансформируем данные текущего чанка
380 logger.debug(f"Трансформация чанка #{processed_chunks}...")
381 # Убедитесь, что transform принимает и возвращает DataFrame (или адаптируйте)
382 try:
383 transformed_chunk_df = self.data_transformer.transform(data_chunk_df)
384 logger.debug(f"Чанк #{processed_chunks} трансформирован.")
385 if not isinstance(transformed_chunk_df, pd.DataFrame):
386 logger.error("DataTransformer.transform не вернул DataFrame! Остановка.")
387 raise TypeError("DataTransformer.transform должен возвращать pandas.DataFrame")
388 if transformed_chunk_df.empty and not data_chunk_df.empty :
389 logger.warning(f"Трансформация чанка #{processed_chunks} вернула пустой DataFrame.")
390 # Решите, нужно ли загружать пустой результат
391 # continue # Например, пропустить загрузку
392 except Exception as transform_err:
393 logger.error(f"Ошибка трансформации чанка #{processed_chunks}: {transform_err}", exc_info=True)
394 # Решите, продолжать ли с другими чанками или остановить пайплайн
395 raise transform_err # Останавливаем пайплайн
396
397 # 3. Загружаем трансформированный чанк в PostgreSQL
398 logger.debug(f"Загрузка трансформированного чанка #{processed_chunks} в PostgreSQL...")
399 try:
400 # Адаптируйте load_data, чтобы он принимал DataFrame и использовал self.pg_batch_size для внутренней логики
401 self.postgres_connector.load_data(
402 table_name=self.table_name_postgress,
403 data=transformed_chunk_df, # Передаем трансформированный DataFrame
404 batch_size=self.pg_batch_size # Передаем размер батча для PG
405 )
406 total_rows_processed += current_chunk_rows # Считаем строки исходного чанка
407 logger.debug(f"Чанк #{processed_chunks} загружен в PostgreSQL.")
408 except Exception as load_err:
409 logger.error(f"Ошибка загрузки чанка #{processed_chunks} в PostgreSQL: {load_err}", exc_info=True)
410 # Решите, продолжать ли или остановить пайплайн
411 raise load_err # Останавливаем пайплайн
412
413 chunk_end_time = datetime.now()
414 logger.info(f"--- Чанк #{processed_chunks} обработан за {chunk_end_time - chunk_start_time} ---")
415
416
417 if processed_chunks == 0:
418 logger.info("Не было извлечено ни одного чанка данных из ClickHouse.")
419 else:
420 logger.info(f"Всего обработано чанков: {processed_chunks}")
421
422
423 except Exception as e:
424 # Ловим ошибки, возникшие при пагинации или на других этапах
425 logger.error(f"ETL пайплайн завершился с ошибкой: {e}", exc_info=True)
426
427 finally:
428 # Закрываем соединение с PostgreSQL в любом случае
429 self.postgres_connector.close()
430 # ClickHouseConnector.close() не нужен
431
432 end_time = datetime.now()
433 duration = end_time - start_time
434 logger.info(f"ETL пайплайн завершен за {duration}.")
435 logger.info(f"Итого обработано строк (приблизительно): {total_rows_processed}")
436