· 5 months ago · Apr 17, 2025, 07:25 AM
1config.py:
2"""
3Файл для хранения конфигураций подключения к базам данных.
4"""
5
6import os
7
8# ClickHouse Configuration
9CLICKHOUSE_CONFIG = {
10 'host': os.getenv('CLICKHOUSE_HOST', 'localhost'),
11 'port': int(os.getenv('CLICKHOUSE_PORT', '8123')),
12 'username': os.getenv('CLICKHOUSE_USER', 'default'),
13 'password': os.getenv('CLICKHOUSE_PASSWORD', ''),
14 'database': os.getenv('CLICKHOUSE_DATABASE', 'default')
15}
16
17# PostgreSQL Configuration
18POSTGRES_CONFIG = {
19 'host': os.getenv('POSTGRES_HOST', 'localhost'),
20 'port': int(os.getenv('POSTGRES_PORT', '5432')),
21 'database': os.getenv('POSTGRES_DATABASE', 'your_database'),
22 'user': os.getenv('POSTGRES_USER', 'your_user'),
23 'password': os.getenv('POSTGRES_PASSWORD', 'your_password')
24}
25
26# General Configuration
27TABLE_NAME = os.getenv('TABLE_NAME', 'your_table')
28BATCH_SIZE = int(os.getenv('BATCH_SIZE', '1000'))
29
30clickhouse_connector.py:
31"""
32Модуль для подключения и извлечения данных из ClickHouse.
33"""
34
35import clickhouse_connect
36import logging
37
38class ClickHouseConnector:
39 """
40 Класс для управления подключением к ClickHouse.
41 """
42
43 def __init__(self, config):
44 """
45 Инициализация подключения к ClickHouse.
46
47 Args:
48 config (dict): Словарь с параметрами подключения.
49 """
50 self.config = config
51 self.client = None
52
53 def connect(self):
54 """
55 Установление соединения с ClickHouse.
56 """
57 try:
58 self.client = clickhouse_connect.get_client(**self.config)
59 logging.info(f"Успешно подключено к ClickHouse: {self.config['host']}")
60 except Exception as e:
61 logging.error(f"Ошибка подключения к ClickHouse: {e}")
62 raise
63
64 def extract_data(self, table_name, query=None):
65 """
66 Извлечение данных из ClickHouse.
67
68 Args:
69 table_name (str): Имя таблицы для извлечения.
70 query (str, optional): SQL запрос для извлечения. Defaults to None (использует SELECT * FROM table_name).
71
72 Returns:
73 list: Список кортежей с данными.
74 """
75 try:
76 if query is None:
77 query = f"SELECT * FROM {table_name}"
78 data = self.client.query(query).result
79 logging.info(f"Извлечено {len(data)} строк из ClickHouse.")
80 return data
81 except Exception as e:
82 logging.error(f"Ошибка при извлечении данных из ClickHouse: {e}")
83 raise
84
85 def get_table_schema(self, table_name):
86 """
87 Получает схему таблицы из ClickHouse.
88
89 Args:
90 table_name (str): Имя таблицы.
91
92 Returns:
93 list: Список кортежей с описанием столбцов.
94 """
95 try:
96 schema = self.client.query(f"DESCRIBE TABLE {table_name}").result
97 return schema
98 except Exception as e:
99 logging.error(f"Ошибка при получении схемы таблицы из ClickHouse: {e}")
100 raise
101 def close(self):
102 """
103 Закрывает соединение с ClickHouse. Хотя clickhouse-connect не имеет явного метода close(),
104 метод здесь для консистентности интерфейса.
105 """
106 logging.info("Соединение с ClickHouse закрыто.") # Для информации
107 self.client = None # Устанавливаем в None, чтобы сборщик мусора удалил клиент.
108postgres_connector.py:
109"""
110Модуль для подключения и загрузки данных в PostgreSQL.
111"""
112
113import psycopg2
114from psycopg2 import extras
115import logging
116
117
118class PostgreSQLConnector:
119 """
120 Класс для управления подключением к PostgreSQL.
121 """
122
123 def __init__(self, config):
124 """
125 Инициализация подключения к PostgreSQL.
126
127 Args:
128 config (dict): Словарь с параметрами подключения.
129 """
130 self.config = config
131 self.conn = None
132 self.cursor = None
133
134 def connect(self):
135 """
136 Установление соединения с PostgreSQL.
137 """
138 try:
139 self.conn = psycopg2.connect(**self.config)
140 self.cursor = self.conn.cursor()
141 logging.info(f"Успешно подключено к PostgreSQL: {self.config['host']}")
142 except Exception as e:
143 logging.error(f"Ошибка подключения к PostgreSQL: {e}")
144 raise
145
146 def load_data(self, table_name, data, batch_size):
147 """
148 Загрузка данных в PostgreSQL.
149
150 Args:
151 table_name (str): Имя таблицы для загрузки.
152 data (list): Список кортежей с данными.
153 batch_size (int): Размер пакета для вставки.
154 """
155 try:
156 if not data:
157 logging.warning("Нет данных для загрузки в PostgreSQL.")
158 return
159
160 # Получаем описание полей таблицы PostgreSQL
161 self.cursor.execute(f"SELECT column_name FROM information_schema.columns WHERE table_name = '{table_name}' ORDER BY ordinal_position;")
162 column_names = [row[0] for row in self.cursor.fetchall()]
163 columns_str = ", ".join(column_names) # Строка с именами колонок через запятую
164
165 # Формируем строку для вставки значений
166 placeholders = ", ".join(["%s"] * len(column_names))
167 query = f"INSERT INTO {table_name} ({columns_str}) VALUES ({placeholders})"
168
169 # Используем execute many для пакетной вставки
170 extras.execute_batch(self.cursor, query, data, page_size=batch_size)
171 self.conn.commit()
172
173 logging.info(f"Успешно загружено {len(data)} строк в PostgreSQL.")
174 except Exception as e:
175 self.conn.rollback() # Откатываем транзакцию в случае ошибки
176 logging.error(f"Ошибка при загрузке данных в PostgreSQL: {e}")
177 raise
178
179 def create_table_if_not_exists(self, table_name, clickhouse_table_schema):
180 """
181 Создает таблицу в PostgreSQL, если ее не существует.
182
183 Args:
184 table_name (str): Имя таблицы.
185 clickhouse_table_schema (list): Схема таблицы из ClickHouse (список кортежей).
186 """
187 try:
188 # Проверяем, существует ли таблица в PostgreSQL
189 self.cursor.execute(f"SELECT to_regclass('{table_name}')")
190 table_exists = self.cursor.fetchone()[0] is not None
191
192 if not table_exists:
193 logging.info(f"Таблица '{table_name}' не существует в PostgreSQL. Создаем...")
194
195 # Конвертируем типы данных ClickHouse в PostgreSQL
196 column_definitions = []
197 for column in clickhouse_table_schema:
198 column_name = column[0]
199 clickhouse_type = column[1].lower() # Приводим к нижнему регистру для надежности
200 postgres_type = self.map_clickhouse_to_postgres_type(clickhouse_type)
201 column_definitions.append(f"{column_name} {postgres_type}")
202
203 # Формируем SQL запрос для создания таблицы
204 create_table_sql = f"""
205 CREATE TABLE {table_name} (
206 {', '.join(column_definitions)}
207 );
208 """
209 self.cursor.execute(create_table_sql)
210 self.conn.commit()
211 logging.info(f"Таблица '{table_name}' успешно создана в PostgreSQL.")
212 else:
213 logging.info(f"Таблица '{table_name}' уже существует в PostgreSQL.")
214
215 except Exception as e:
216 self.conn.rollback()
217 logging.error(f"Ошибка при создании таблицы в PostgreSQL: {e}")
218 raise
219
220 def map_clickhouse_to_postgres_type(self, clickhouse_type):
221 """
222 Отображает типы данных ClickHouse на типы данных PostgreSQL.
223
224 Args:
225 clickhouse_type (str): Тип данных в ClickHouse.
226
227 Returns:
228 str: Соответствующий тип данных в PostgreSQL.
229 """
230 type_mapping = {
231 "int8": "SMALLINT",
232 "int16": "SMALLINT",
233 "int32": "INTEGER",
234 "int64": "BIGINT",
235 "uint8": "SMALLINT",
236 "uint16": "INTEGER",
237 "uint32": "BIGINT",
238 "uint64": "NUMERIC", # Может потребоваться BIGINT в зависимости от диапазона
239 "float32": "REAL",
240 "float64": "DOUBLE PRECISION",
241 "decimal": "NUMERIC",
242 "string": "TEXT",
243 "date": "DATE",
244 "datetime": "TIMESTAMP",
245 "datetime64": "TIMESTAMP",
246 "uuid": "UUID",
247 "boolean": "BOOLEAN",
248 "array(int8)": "SMALLINT[]",
249 "array(int16)": "SMALLINT[]",
250 "array(int32)": "INTEGER[]",
251 "array(int64)": "BIGINT[]",
252 "array(uint8)": "SMALLINT[]",
253 "array(uint16)": "INTEGER[]",
254 "array(uint32)": "BIGINT[]",
255 "array(uint64)": "NUMERIC[]",
256 "array(float32)": "REAL[]",
257 "array(float64)": "DOUBLE PRECISION[]",
258 "array(string)": "TEXT[]",
259 "array(date)": "DATE[]",
260 "array(datetime)": "TIMESTAMP[]",
261 "array(datetime64)": "TIMESTAMP[]",
262 "array(uuid)": "UUID[]",
263 "array(boolean)": "BOOLEAN[]",
264 # Add other types as needed
265 }
266
267 # Обработка типов с указанием размера (например, DateTime64(9))
268 if clickhouse_type.startswith("datetime64"):
269 return "TIMESTAMP" # DateTime64 в PostgreSQL просто TIMESTAMP, точность теряется
270 if clickhouse_type.startswith("decimal"):
271 return "NUMERIC"
272 if clickhouse_type.startswith("fixedstring"):
273 return "TEXT" # FixedString in ClickHouse, TEXT in PostgreSQL
274
275 # Обработка Nullable типов
276 if clickhouse_type.startswith("nullable("):
277 inner_type = clickhouse_type[9:-1]
278 return self.map_clickhouse_to_postgres_type(inner_type)
279
280 mapped_type = type_mapping.get(clickhouse_type)
281 if mapped_type is None:
282 logging.warning(f"Неизвестный тип данных ClickHouse: {clickhouse_type}. Используется TEXT.")
283 return "TEXT"
284 return mapped_type
285
286
287 def close(self):
288 """
289 Закрытие соединения с PostgreSQL.
290 """
291 if self.cursor:
292 self.cursor.close()
293 if self.conn:
294 self.conn.close()
295 logging.info("Соединение с PostgreSQL закрыто.")
296
297
298data_transformer.py:
299"""
300Модуль для трансформации данных.
301"""
302
303import logging
304
305class DataTransformer:
306 """
307 Класс для трансформации данных.
308 """
309
310 def transform(self, data):
311 """
312 Трансформация данных. Реализуйте свою логику трансформации здесь.
313
314 Args:
315 data (list): Список кортежей с данными.
316
317 Returns:
318 list: Список кортежей с трансформированными данными.
319 """
320 # Здесь можно добавить логику трансформации данных, если необходимо.
321 # Например, приведение типов, обработка значений и т.д.
322 logging.info("Трансформация данных выполнена (без изменений).")
323 return data
324
325etl_pipeline.py:
326"""
327Основной модуль, определяющий ETL пайплайн.
328"""
329
330import logging
331from datetime import datetime
332from etl.clickhouse_connector import ClickHouseConnector
333from etl.postgres_connector import PostgreSQLConnector
334from etl.data_transformer import DataTransformer
335
336
337class ETLPipeline:
338 """
339 Класс, реализующий ETL пайплайн.
340 """
341
342 def __init__(self, config):
343 """
344 Инициализация ETL пайплайна.
345
346 Args:
347 config (dict): Словарь с конфигурациями (ClickHouse, PostgreSQL, общие).
348 """
349 self.config = config
350 self.clickhouse_connector = ClickHouseConnector(config['clickhouse'])
351 self.postgres_connector = PostgreSQLConnector(config['postgres'])
352 self.data_transformer = DataTransformer()
353 self.table_name = config['general']['table_name']
354 self.batch_size = config['general']['batch_size']
355
356 def run(self):
357 """
358 Запуск ETL пайплайна.
359 """
360 start_time = datetime.now()
361 logging.info("Запуск ETL пайплайна...")
362
363 try:
364 self.clickhouse_connector.connect()
365 self.postgres_connector.connect()
366
367 # Получаем схему таблицы из ClickHouse
368 clickhouse_table_schema = self.clickhouse_connector.get_table_schema(self.table_name)
369 self.postgres_connector.create_table_if_not_exists(self.table_name, clickhouse_table_schema)
370
371 # Извлекаем данные из ClickHouse
372 data = self.clickhouse_connector.extract_data(self.table_name)
373
374 # Трансформируем данные
375 transformed_data = self.data_transformer.transform(data)
376
377 # Загружаем данные в PostgreSQL
378 self.postgres_connector.load_data(self.table_name, transformed_data, self.batch_size)
379
380
381 except Exception as e:
382 logging.error(f"ETL пайплайн завершился с ошибкой: {e}")
383
384 finally:
385 # Закрываем соединения
386 self.clickhouse_connector.close()
387 self.postgres_connector.close()
388
389 end_time = datetime.now()
390 duration = end_time - start_time
391 logging.info(f"ETL пайплайн успешно завершен за {duration}.")
392
393# main.py
394```python
395"""
396Основной файл для запуска ETL пайплайна.
397"""
398
399import logging
400from etl.etl_pipeline import ETLPipeline
401from etl import config
402
403if __name__ == '__main__':
404 # Настройка логирования
405 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
406
407 # Создаем объединенную конфигурацию
408 etl_config = {
409 'clickhouse': config.CLICKHOUSE_CONFIG,
410 'postgres': config.POSTGRES_CONFIG,
411 'general': {
412 'table_name': config.TABLE_NAME,
413 'batch_size': config.BATCH_SIZE
414 }
415 }
416
417 # Создаем экземпляр ETL пайплайна и запускаем его
418 etl = ETLPipeline(etl_config)
419 etl.run()