· 5 months ago · Apr 21, 2025, 11:30 AM
1# clickhouse_connector.py
2
3"""
4Модуль для подключения и извлечения данных из ClickHouse
5с использованием sqldbclient.
6"""
7
8import logging
9from urllib.parse import quote_plus
10import pandas as pd # sqldbclient часто возвращает DataFrame
11
12# ... (импорты и настройка логгера без изменений) ...
13
14try:
15 from sqldbclient import SqlExecutor, SqlExecutorConf
16except ImportError:
17 # ... (обработка ImportError без изменений) ...
18 pass # Добавил pass для синтаксиса
19
20class ClickHouseConnector:
21 """
22 Класс для управления подключением к ClickHouse
23 с использованием sqldbclient.
24 """
25
26 def __init__(self, config):
27 # ... (метод __init__ без изменений) ...
28 self.config = config
29 self.executor = None
30 self._initialize_executor()
31
32 def _initialize_executor(self):
33 # ... (метод _initialize_executor без изменений) ...
34 try:
35 host = self.config['host']
36 port = self.config.get('port', 8123) # Порт по умолчанию для HTTP
37 username = self.config['username']
38 password = self.config['password']
39
40 # Экранируем пароль для URL
41 quoted_password = quote_plus(password)
42
43 # Формируем URL для SQLAlchemy-совместимого драйвера ClickHouse
44 # Используем clickhouse+http, как в примере
45 engine_url = f'clickhouse+http://{username}:{quoted_password}@{host}:{port}'
46
47 logger.info(f"Инициализация SqlExecutor для ClickHouse: clickhouse+http://{username}:***@{host}:{port}")
48
49 # Создаем конфигурацию для SqlExecutor
50 # history_db_name можно сделать настраиваемым или убрать, если не требуется
51 conf = SqlExecutorConf() \
52 .set('engine_options', engine_url) \
53 .set('history_db_name', 'ch_executor_history.db')
54 # ### ИЗМЕНЕНИЕ НАЧАЛО ###
55 # Убираем .set('max_rows_read', 10002), так как мы явно управляем LIMIT в пагинации
56 # и эта настройка только мешает и создает лишние warning'и в логах.
57 # .set('max_rows_read', 10002)
58 # ### ИЗМЕНЕНИЕ КОНЕЦ ###
59
60 # Создаем экземпляр SqlExecutor
61 self.executor = SqlExecutor.builder.config(conf).get_or_create()
62 logger.info("SqlExecutor для ClickHouse успешно инициализирован.")
63
64 except KeyError as e:
65 logger.error(f"Ошибка конфигурации ClickHouse: отсутствует обязательный ключ {e}")
66 self.executor = None
67 except Exception as e:
68 logger.error(f"Ошибка при инициализации SqlExecutor для ClickHouse: {e}")
69 self.executor = None
70
71 def connect(self):
72 # ... (метод connect без изменений) ...
73 if self.executor is None:
74 logger.error("SqlExecutor не был инициализирован из-за ошибки конфигурации.")
75 return False
76 try:
77 # Выполняем простой запрос для проверки соединения
78 result_df = self.executor.execute('SELECT 1')
79 # Проверяем, что результат не пустой и содержит ожидаемое значение
80 if not result_df.empty and result_df.iloc[0, 0] == 1:
81 logger.info("Успешное подключение к ClickHouse (проверено через SqlExecutor)!")
82 return True
83 else:
84 logger.error(f"Проверка подключения к ClickHouse не удалась. Результат: {result_df}")
85 return False
86 except Exception as e:
87 logger.error(f"Ошибка при проверке подключения к ClickHouse через SqlExecutor: {e}")
88 return False
89
90 def extract_data(self, table_name, execution_date, query=None):
91 # ... (метод extract_data без изменений) ...
92 # (Примечание: этот метод не используется в ошибочном сценарии, но оставляем его как есть)
93 if self.executor is None:
94 logger.error("Соединение с ClickHouse (SqlExecutor) не установлено.")
95 raise Exception("Соединение с ClickHouse (SqlExecutor) не установлено.")
96
97 try:
98 if query is None:
99 query = f"SELECT * FROM {table_name}"
100 # query = f"SELECT * FROM {table_name} where date = '{execution_date}'" # Пример использования execution_date
101 logger.info(f'Используется стандартный запрос: {query}')
102
103 logger.info(
104 f"Выполнение запроса к ClickHouse: {query[:200]}{'...' if len(query) > 200 else ''}") # Логгируем начало запроса
105 data_df = self.executor.execute(query) # SqlExecutor возвращает DataFrame
106 logger.info(f"Извлечено {len(data_df)} строк из ClickHouse.")
107
108 if not isinstance(data_df, pd.DataFrame):
109 logger.warning(f"SqlExecutor вернул не DataFrame, а {type(data_df)}. Преобразование...")
110 try:
111 data_df = pd.DataFrame(data_df)
112 except Exception:
113 logger.error("Не удалось преобразовать результат в DataFrame.")
114 return pd.DataFrame()
115
116 return data_df
117 except Exception as e:
118 logger.error(f"Ошибка при извлечении данных из ClickHouse: {e}")
119 return pd.DataFrame()
120
121 def get_table_schema(self, table_name):
122 # ... (метод get_table_schema без изменений) ...
123 if self.executor is None:
124 logger.error("Соединение с ClickHouse (SqlExecutor) не установлено.")
125 raise Exception("Соединение с ClickHouse (SqlExecutor) не установлено.")
126
127 try:
128 query = f"DESCRIBE TABLE {table_name}"
129 logger.info(f"Получение схемы таблицы: {query}")
130 schema_df = self.executor.execute(query)
131
132 if not isinstance(schema_df, pd.DataFrame):
133 logger.warning(f"SqlExecutor вернул не DataFrame для схемы, а {type(schema_df)}. Преобразование...")
134 try:
135 schema_df = pd.DataFrame(schema_df)
136 except Exception:
137 logger.error("Не удалось преобразовать схему в DataFrame.")
138 return pd.DataFrame()
139
140 logger.info(f"Схема для таблицы {table_name} получена.")
141 return schema_df
142 except Exception as e:
143 logger.error(f"Ошибка при получении схемы таблицы из ClickHouse: {e}")
144 return pd.DataFrame()
145
146 def extract_all_data_using_pagination(self, base_query, order_by_column, chunk_size=10000, settings=None):
147 """
148 Извлекает ВСЕ данные из ClickHouse, используя пагинацию ВНУТРИ,
149 и возвращает ОДИН большой DataFrame с КОРРЕКТНЫМИ заголовками.
150 (Остальная документация без изменений)
151 """
152 logger.info(f"Начало извлечения ВСЕХ данных с внутренней пагинацией (chunk_size={chunk_size}).")
153 logger.warning("ПРЕДУПРЕЖДЕНИЕ: Все результаты будут загружены в память!")
154
155 all_chunks = []
156 total_rows = 0
157
158 try:
159 # Вызываем наш же генератор пагинации, который теперь возвращает чанки с правильными заголовками
160 paginator = self.extract_data_paginated(
161 base_query=base_query,
162 order_by_column=order_by_column,
163 chunk_size=chunk_size,
164 settings=settings
165 )
166
167 # Собираем все чанки из генератора в список
168 for i, chunk_df in enumerate(paginator):
169 chunk_num = i + 1
170 logger.info(f"Сборка: получен чанк #{chunk_num} размером {len(chunk_df)} строк с корректными заголовками.")
171
172 # ### ИЗМЕНЕНИЕ НАЧАЛО ###
173 # Убираем проверку и исправление дублирующихся КОЛОНОК,
174 # так как `extract_data_paginated` теперь сам заботится о правильных заголовках.
175 # if not chunk_df.columns.is_unique:
176 # duplicate_cols = list(chunk_df.columns[chunk_df.columns.duplicated()])
177 # logger.warning(f"Чанк #{chunk_num} содержит дублирующиеся имена колонок: {duplicate_cols}. "
178 # f"Удаление дубликатов (оставляем первое вхождение)...")
179 # chunk_df = chunk_df.loc[:, ~chunk_df.columns.duplicated()]
180 # logger.info(f"Колонки чанка #{chunk_num} после дедупликации: {list(chunk_df.columns)}")
181 # ### ИЗМЕНЕНИЕ КОНЕЦ ###
182
183 if not chunk_df.empty:
184 all_chunks.append(chunk_df)
185 total_rows += len(chunk_df)
186
187 if all_chunks:
188 # logger.info(f'all_chunks={all_chunks}') # Этот лог может быть ОЧЕНЬ большим, лучше закомментировать или убрать
189 logger.info(f"Объединение {len(all_chunks)} чанков (собрано {total_rows} строк до concat)...")
190 try:
191 # pd.concat теперь должен работать корректно, так как все чанки имеют одинаковые и правильные колонки.
192 final_df = pd.concat(all_chunks, ignore_index=True)
193 logger.info(f"Итоговый DataFrame содержит {len(final_df)} строк.")
194 logger.info(f"Колонки итогового DataFrame: {list(final_df.columns)}") # Добавим лог для проверки
195
196 # Проверка на дубликаты СТРОК (остается актуальной, если ORDER BY не уникален)
197 if final_df.duplicated().any():
198 num_duplicates = final_df.duplicated().sum()
199 logger.warning(f"В итоговом DataFrame обнаружено {num_duplicates} дублирующихся СТРОК! "
200 f"Это обычно вызвано нестабильной сортировкой ('{order_by_column}'). "
201 f"Рассмотрите возможность добавления уникального ключа в ORDER BY.")
202 # Опциональное удаление дубликатов строк
203 # final_df = final_df.drop_duplicates(ignore_index=True)
204
205 return final_df
206 except Exception as concat_err:
207 logger.error(f"Неожиданная ошибка при выполнении pd.concat: {concat_err}", exc_info=True)
208 raise concat_err
209 else:
210 logger.info("Не было получено данных из ClickHouse.")
211 return pd.DataFrame()
212
213 except Exception as e:
214 logger.error(f"Ошибка во время сбора данных с внутренней пагинацией: {e}", exc_info=True)
215 raise e
216
217 def extract_data_paginated(self, base_query, order_by_column, chunk_size=10000, settings=None):
218 """
219 Извлекает данные из ClickHouse частями (пагинация) с использованием генератора,
220 ГАРАНТИРУЯ КОРРЕКТНЫЕ ЗАГОЛОВКИ для каждого чанка.
221 (Остальная документация без изменений)
222 """
223 if self.executor is None:
224 logger.error("Соединение с ClickHouse (SqlExecutor) не установлено.")
225 raise Exception("Соединение с ClickHouse (SqlExecutor) не установлено.")
226 # ... (проверки аргументов base_query, order_by_column, chunk_size без изменений) ...
227 if not base_query:
228 raise ValueError("Необходимо указать базовый SQL-запрос ('base_query').")
229 if not order_by_column:
230 logger.warning("УКАЗАНА КОЛОНКА(И) ДЛЯ СОРТИРОВКИ ('order_by_column'), НО ДЛЯ НАДЕЖНОЙ ПАГИНАЦИИ "
231 "ОНА ДОЛЖНА БЫТЬ УНИКАЛЬНОЙ ИЛИ ВКЛЮЧАТЬ ВСЕ КОЛОНКИ!")
232 raise ValueError("Необходимо указать колонку(и) для сортировки ('order_by_column') для пагинации.")
233 if chunk_size <= 0:
234 raise ValueError("'chunk_size' должен быть положительным числом.")
235
236 # ### ИЗМЕНЕНИЕ НАЧАЛО ###
237 # Убираем предупреждение и изменение chunk_size из-за max_rows_read, так как мы его убрали.
238 # effective_chunk_size = min(chunk_size, 10000)
239 # if chunk_size > 10000:
240 # logger.warning(f"Запрошенный chunk_size={chunk_size} превышает стандартный лимит sqldbclient (10000). "
241 # f"Будет использоваться эффективный лимит {effective_chunk_size}.")
242 # else:
243 # logger.info(f"Начало пагинации: chunk_size={chunk_size}, сортировка по '{order_by_column}'.")
244 effective_chunk_size = chunk_size # Используем заданный chunk_size напрямую
245 logger.info(f"Начало пагинации: chunk_size={effective_chunk_size}, сортировка по '{order_by_column}'.")
246 # ### ИЗМЕНЕНИЕ КОНЕЦ ###
247
248 offset = 0
249 total_fetched = 0
250 settings_str = "" # Не изменяем логику settings
251 # ... (код для settings_str остается прежним) ...
252
253 logger.debug(f"Базовый запрос: {base_query}")
254
255 # ### ИЗМЕНЕНИЕ НАЧАЛО ###
256 # Получаем корректные заголовки ОДИН РАЗ перед циклом
257 correct_headers = None
258 try:
259 # Используем LIMIT 1 (или LIMIT 0, если драйвер точно вернет заголовки)
260 header_query = f"{base_query} LIMIT 1"
261 logger.info(f"Получение заголовков запросом: {header_query[:200]}{'...' if len(header_query) > 200 else ''}")
262 header_df = self.executor.execute(header_query)
263
264 if isinstance(header_df, pd.DataFrame):
265 correct_headers = header_df.columns
266 logger.info(f"Корректные заголовки получены: {list(correct_headers)}")
267 else:
268 logger.error(f"Не удалось получить заголовки: execute вернул тип {type(header_df)}")
269 # Можно либо выбросить ошибку, либо попробовать продолжить, рискуя получить ту же проблему
270 raise Exception("Не удалось получить корректные заголовки столбцов из ClickHouse.")
271
272 except Exception as e:
273 logger.error(f"Ошибка при получении заголовков из ClickHouse: {e}", exc_info=True)
274 raise e # Перевыбрасываем ошибку, без заголовков продолжать бессмысленно
275
276 # Если correct_headers не None, продолжаем с пагинацией
277 # ### ИЗМЕНЕНИЕ КОНЕЦ ###
278
279 i = 0
280 while True:
281 logger.info(f"Итерация пагинации #{i + 1}, offset={offset}")
282
283 # Формируем запрос с LIMIT и OFFSET
284 paginated_query = f"""{base_query}
285 ORDER BY {order_by_column}
286 LIMIT {effective_chunk_size}
287 OFFSET {offset}
288 {settings_str}""" # Убрал лишние --ff комментарии
289
290 logger.debug(
291 f"Запрос чанка (offset={offset}): {paginated_query[:500]}{'...' if len(paginated_query) > 500 else ''}")
292
293 try:
294 # Выполняем запрос
295 chunk_df = self.executor.execute(paginated_query)
296
297 # Проверка типа (на всякий случай)
298 if not isinstance(chunk_df, pd.DataFrame):
299 logger.warning(f"SqlExecutor вернул не DataFrame для чанка #{i+1}, а {type(chunk_df)}. Попытка преобразования...")
300 try:
301 chunk_df = pd.DataFrame(chunk_df)
302 logger.info("Преобразование в DataFrame успешно.")
303 except Exception as convert_err:
304 logger.error(f"Не удалось преобразовать чанк #{i+1} в DataFrame: {convert_err}")
305 # Решаем, что делать: пропустить чанк или остановить ETL
306 raise Exception(f"Не удалось преобразовать чанк #{i+1} в DataFrame.") from convert_err
307
308 num_fetched_in_chunk = len(chunk_df)
309 logger.debug(f"Получено строк в чанке: {num_fetched_in_chunk}.")
310
311 if num_fetched_in_chunk == 0:
312 if i == 0:
313 logger.info("Получен пустой результат на первой итерации (0 строк). В источнике нет данных по запросу.")
314 else:
315 logger.info(f"Получен пустой чанк (0 строк) для offset={offset}. Завершение пагинации.")
316 break # Выход из цикла пагинации
317
318 # ### ИЗМЕНЕНИЕ НАЧАЛО ###
319 # Применяем КОРРЕКТНЫЕ заголовки к полученному чанку
320 if correct_headers is not None:
321 if len(chunk_df.columns) == len(correct_headers):
322 logger.debug(f"Применение корректных заголовков к чанку #{i+1}")
323 chunk_df.columns = correct_headers
324 else:
325 logger.error(f"Ошибка! Количество столбцов в чанке #{i+1} ({len(chunk_df.columns)}) не совпадает"
326 f" с количеством в заголовке ({len(correct_headers)})!")
327 logger.error(f"Столбцы чанка: {list(chunk_df.columns)}")
328 logger.error(f"Ожидаемые столбцы: {list(correct_headers)}")
329 # Это неожиданная ситуация, лучше остановить процесс
330 raise ValueError(f"Несоответствие количества столбцов в чанке #{i+1} и заголовке.")
331 else:
332 # Эта ветка не должна выполняться, если получение заголовков было успешным
333 logger.error("Критическая ошибка: отсутствуют корректные заголовки для применения.")
334 raise Exception("Критическая ошибка: отсутствуют корректные заголовки.")
335 # ### ИЗМЕНЕНИЕ КОНЕЦ ###
336
337 total_fetched += num_fetched_in_chunk
338 yield chunk_df # Возвращаем чанк с правильными заголовками
339 offset += num_fetched_in_chunk
340
341 except Exception as e:
342 logger.error(f"Ошибка при извлечении чанка данных (offset={offset}): {e}", exc_info=True)
343 raise e
344
345 i += 1
346
347 logger.info(f"Пагинация завершена. Всего извлечено строк через генератор: {total_fetched}.")