· 7 months ago · Apr 17, 2025, 04:15 PM
1# clickhouse_connector.py
2
3"""
4Модуль для подключения и извлечения данных из ClickHouse
5с использованием sqldbclient.
6"""
7
8import logging
9from urllib.parse import quote_plus
10import pandas as pd # sqldbclient часто возвращает DataFrame
11
12# Предполагаем, что logger уже настроен где-то в приложении
13# from logger import logger
14# Если нет, используем стандартный logging
15try:
16 from logger import logger
17except ImportError:
18 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
19 logger = logging.getLogger(__name__)
20
21
22# Убедитесь, что sqldbclient установлен: pip install sqldbclient SQLAlchemy clickhouse-sqlalchemy
23# Также может потребоваться драйвер http: pip install clickhouse-http-client
24try:
25 from sqldbclient import SqlExecutor, SqlExecutorConf
26except ImportError:
27 logger.error("Библиотека sqldbclient не найдена. Установите ее: pip install sqldbclient")
28 # Можно либо выбросить исключение, либо определить заглушки, чтобы код не падал при импорте
29 # raise ImportError("Библиотека sqldbclient не найдена.")
30 # Заглушки (нефункциональные, только для предотвращения ошибок импорта)
31 class SqlExecutorConf:
32 def set(self, key, value): return self
33 class Builder:
34 def config(self, conf): return self
35 def get_or_create(self): return None
36 class SqlExecutor:
37 builder = Builder()
38 def execute(self, query): return None
39
40
41class ClickHouseConnector:
42 """
43 Класс для управления подключением к ClickHouse
44 с использованием sqldbclient.
45 """
46
47 def __init__(self, config):
48 """
49 Инициализация подключения к ClickHouse через SqlExecutor.
50
51 Args:
52 config (dict): Словарь с параметрами подключения
53 (ожидаются ключи 'host', 'port', 'username', 'password', 'database').
54 """
55 self.config = config
56 self.executor = None
57 self._initialize_executor()
58
59 def _initialize_executor(self):
60 """Инициализирует SqlExecutor на основе конфигурации."""
61 try:
62 host = self.config['host']
63 port = self.config.get('port', 8123) # Порт по умолчанию для HTTP
64 username = self.config['username']
65 password = self.config['password']
66 database = self.config.get('database', 'default') # База данных по умолчанию
67
68 # Экранируем пароль для URL
69 quoted_password = quote_plus(password)
70
71 # Формируем URL для SQLAlchemy-совместимого драйвера ClickHouse
72 # Используем clickhouse+http, как в примере
73 engine_url = f'clickhouse+http://{username}:{quoted_password}@{host}:{port}/{database}'
74
75 logger.info(f"Инициализация SqlExecutor для ClickHouse: clickhouse+http://{username}:***@{host}:{port}/{database}")
76
77 # Создаем конфигурацию для SqlExecutor
78 # history_db_name можно сделать настраиваемым или убрать, если не требуется
79 conf = SqlExecutorConf() \
80 .set('engine_options', engine_url) \
81 .set('history_db_name', 'ch_executor_history.db')
82
83 # Создаем экземпляр SqlExecutor
84 self.executor = SqlExecutor.builder.config(conf).get_or_create()
85 logger.info("SqlExecutor для ClickHouse успешно инициализирован.")
86
87 except KeyError as e:
88 logger.error(f"Ошибка конфигурации ClickHouse: отсутствует обязательный ключ {e}")
89 self.executor = None
90 except Exception as e:
91 logger.error(f"Ошибка при инициализации SqlExecutor для ClickHouse: {e}")
92 self.executor = None
93
94 def connect(self):
95 """
96 Проверка соединения с ClickHouse путем выполнения простого запроса.
97 """
98 if self.executor is None:
99 logger.error("SqlExecutor не был инициализирован из-за ошибки конфигурации.")
100 return False
101 try:
102 # Выполняем простой запрос для проверки соединения
103 result_df = self.executor.execute('SELECT 1')
104 # Проверяем, что результат не пустой и содержит ожидаемое значение
105 if not result_df.empty and result_df.iloc[0, 0] == 1:
106 logger.info("Успешное подключение к ClickHouse (проверено через SqlExecutor)!")
107 return True
108 else:
109 logger.error(f"Проверка подключения к ClickHouse не удалась. Результат: {result_df}")
110 return False
111 except Exception as e:
112 logger.error(f"Ошибка при проверке подключения к ClickHouse через SqlExecutor: {e}")
113 return False
114
115 def extract_data(self, table_name, query=None):
116 """
117 Извлечение данных из ClickHouse с использованием SqlExecutor.
118
119 Args:
120 table_name (str): Имя таблицы для извлечения (используется, если query=None).
121 query (str, optional): SQL запрос для извлечения. Defaults to None.
122
123 Returns:
124 pd.DataFrame: DataFrame с извлеченными данными.
125 Возвращает пустой DataFrame в случае ошибки или отсутствия данных.
126
127 Raises:
128 Exception: Если SqlExecutor не инициализирован.
129 """
130 if self.executor is None:
131 logger.error("Соединение с ClickHouse (SqlExecutor) не установлено.")
132 raise Exception("Соединение с ClickHouse (SqlExecutor) не установлено.")
133
134 try:
135 if query is None:
136 query = f"SELECT * FROM {table_name}"
137 logger.info(f'Используется стандартный запрос: {query}')
138
139 logger.info(f"Выполнение запроса к ClickHouse: {query[:200]}{'...' if len(query) > 200 else ''}") # Логгируем начало запроса
140 data_df = self.executor.execute(query) # SqlExecutor возвращает DataFrame
141 logger.info(f"Извлечено {len(data_df)} строк из ClickHouse.")
142
143 if not isinstance(data_df, pd.DataFrame):
144 logger.warning(f"SqlExecutor вернул не DataFrame, а {type(data_df)}. Преобразование...")
145 # Попытка преобразовать, если возможно, или вернуть пустой DataFrame
146 try:
147 data_df = pd.DataFrame(data_df)
148 except Exception:
149 logger.error("Не удалось преобразовать результат в DataFrame.")
150 return pd.DataFrame() # Возвращаем пустой DataFrame
151
152 return data_df
153 except Exception as e:
154 logger.error(f"Ошибка при извлечении данных из ClickHouse: {e}")
155 # Можно либо перевыбросить исключение, либо вернуть пустой DataFrame
156 # raise e
157 return pd.DataFrame() # Возвращаем пустой DataFrame при ошибке
158
159 def get_table_schema(self, table_name):
160 """
161 Получает схему таблицы из ClickHouse с использованием SqlExecutor.
162
163 Args:
164 table_name (str): Имя таблицы.
165
166 Returns:
167 pd.DataFrame: DataFrame с описанием схемы таблицы (колонки: name, type, default_type, default_expression, comment, codec_expression, ttl_expression).
168 Возвращает пустой DataFrame в случае ошибки.
169
170 Raises:
171 Exception: Если SqlExecutor не инициализирован.
172 """
173 if self.executor is None:
174 logger.error("Соединение с ClickHouse (SqlExecutor) не установлено.")
175 raise Exception("Соединение с ClickHouse (SqlExecutor) не установлено.")
176
177 try:
178 query = f"DESCRIBE TABLE {table_name}"
179 logger.info(f"Получение схемы таблицы: {query}")
180 schema_df = self.executor.execute(query) # DESCRIBE TABLE возвращает таблицу
181
182 if not isinstance(schema_df, pd.DataFrame):
183 logger.warning(f"SqlExecutor вернул не DataFrame для схемы, а {type(schema_df)}. Преобразование...")
184 try:
185 schema_df = pd.DataFrame(schema_df)
186 except Exception:
187 logger.error("Не удалось преобразовать схему в DataFrame.")
188 return pd.DataFrame()
189
190 logger.info(f"Схема для таблицы {table_name} получена.")
191 return schema_df
192 except Exception as e:
193 logger.error(f"Ошибка при получении схемы таблицы из ClickHouse: {e}")
194 # raise e
195 return pd.DataFrame() # Возвращаем пустой DataFrame при ошибке
196
197 # Метод close() убран, так как SqlExecutor обычно управляет жизненным циклом соединений сам.
198 # def close(self):
199 # """
200 # Закрывает соединение с ClickHouse (не требуется для SqlExecutor).
201 # """
202 # # Обычно не требуется для SqlExecutor, он может управлять пулом соединений
203 # logger.info("Метод close() не требуется для SqlExecutor.")
204 # pass
205
206
207
208
209
210
211
212
213# etl_pipeline.py (возможные изменения)
214
215"""
216Основной модуль, определяющий ETL пайплайн.
217"""
218
219from datetime import datetime
220from clickhouse_connector import ClickHouseConnector # Используем обновленный коннектор
221from postgres_connector import PostgreSQLConnector
222from data_transformer import DataTransformer
223from logger import logger
224import pandas as pd # Импортируем pandas, так как данные теперь - DataFrame
225
226class ETLPipeline:
227 """
228 Класс, реализующий ETL пайплайн.
229 """
230
231 def __init__(self, config):
232 """
233 Инициализация ETL пайплайна.
234
235 Args:
236 config (dict): Словарь с конфигурациями (ClickHouse, PostgreSQL, общие).
237 """
238 self.config = config
239 # Инициализация коннектора ClickHouse (теперь использует sqldbclient)
240 self.clickhouse_connector = ClickHouseConnector(config['clickhouse'])
241 self.postgres_connector = PostgreSQLConnector(config['postgres'])
242 self.data_transformer = DataTransformer()
243 self.table_name_clickhouse = config['general']['table_name_click']
244 self.table_name_postgress = config['general']['table_name_postgress']
245 self.batch_size = config['general']['batch_size']
246
247 def run(self):
248 """
249 Запуск ETL пайплайна.
250 """
251 start_time = datetime.now()
252 logger.info("Запуск ETL пайплайна...")
253
254 try:
255 # Проверяем подключение к ClickHouse через новый метод connect
256 clickhouse_connected = self.clickhouse_connector.connect()
257 if not clickhouse_connected:
258 logger.error("Не удалось подключиться к ClickHouse (проверка через SqlExecutor). ETL pipeline остановлен.")
259 return # Останавливаем пайплайн
260
261 self.postgres_connector.connect() # Предполагаем, что с PostgreSQL все по-старому
262
263 # Получаем схему таблицы из ClickHouse (если нужно)
264 # Обратите внимание: get_table_schema теперь возвращает DataFrame
265 # clickhouse_table_schema_df = self.clickhouse_connector.get_table_schema(self.table_name_clickhouse)
266 # Возможно, потребуется адаптация метода create_table_if_not_exists в postgres_connector
267 # self.postgres_connector.create_table_if_not_exists(self.table_name_postgress, clickhouse_table_schema_df) # Передаем DataFrame
268
269 # 1. Извлекаем данные из ClickHouse
270 # extract_data теперь возвращает Pandas DataFrame
271 data_df = self.clickhouse_connector.extract_data(self.table_name_clickhouse)
272
273 # Проверяем, не пустой ли DataFrame
274 if data_df.empty:
275 logger.info("Из ClickHouse не извлечено данных. Завершение пайплайна.")
276 return # Завершаем, если данных нет
277
278 # 2. Трансформируем данные
279 # !!! ВАЖНО: DataTransformer.transform должен теперь принимать DataFrame !!!
280 # transformed_data = self.data_transformer.transform(data) # СТАРЫЙ ВАРИАНТ
281 transformed_data = self.data_transformer.transform(data_df) # НОВЫЙ ВАРИАНТ (ожидает DataFrame)
282 # Убедитесь, что DataTransformer.transform адаптирован!
283 # И проверьте, что возвращает transform - возможно, тоже DataFrame или список словарей/кортежей
284
285 # 3. Загружаем данные в PostgreSQL
286 # !!! ВАЖНО: PostgreSQLConnector.load_data должен уметь работать с форматом transformed_data !!!
287 # Возможно, потребуется адаптация load_data для приема DataFrame или другого формата от transform
288 self.postgres_connector.load_data(self.table_name_postgress, transformed_data, self.batch_size)
289
290
291 except Exception as e:
292 logger.error(f"ETL пайплайн завершился с ошибкой: {e}", exc_info=True) # Добавлено exc_info для трейсбека
293
294 finally:
295 # Закрываем соединения
296 # self.clickhouse_connector.close() # Метод close() больше не нужен/не используется
297 self.postgres_connector.close() # Закрываем соединение с PostgreSQL
298
299 end_time = datetime.now()
300 duration = end_time - start_time
301 logger.info(f"ETL пайплайн завершен за {duration}.")
302