· 5 months ago · Apr 22, 2025, 07:05 AM
1# clickhouse_connector.py
2
3"""
4Модуль для подключения и извлечения данных из ClickHouse
5с использованием mega_clickhouse_connector.
6"""
7
8import logging
9import pandas as pd
10from requests.exceptions import RequestException # Импортируем для обработки ошибок сети
11
12# Логгер (как и раньше)
13try:
14 from logger import logger
15except ImportError:
16 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
17 logger = logging.getLogger(__name__)
18
19# Импортируем ваш новый коннектор
20try:
21 from mega_clickhouse_connector import Connector as MegaConnector
22except ImportError:
23 logger.error("Класс Connector из mega_clickhouse_connector не найден. "
24 "Убедитесь, что файл mega_clickhouse_connector.py находится в правильном месте.")
25 # Заглушки (нефункциональные, только для предотвращения ошибок импорта)
26 class MegaConnector:
27 def __init__(self, *args, **kwargs): pass
28 def select_tsv_to_typed_df(self, query): return pd.DataFrame()
29 def _execute(self, query): pass # Добавим заглушку для connect
30
31
32class ClickHouseConnector:
33 """
34 Класс для управления подключением к ClickHouse
35 с использованием mega_clickhouse_connector.
36 """
37
38 def __init__(self, config):
39 """
40 Инициализация подключения к ClickHouse через MegaConnector.
41
42 Args:
43 config (dict): Словарь с параметрами подключения
44 (ожидаются ключи 'host', 'port', 'username', 'password').
45 Ключ 'database' игнорируется этим коннектором.
46 """
47 self.config = config
48 self.connector = None
49 self._initialize_connector()
50
51 def _initialize_connector(self):
52 """Инициализирует MegaConnector на основе конфигурации."""
53 try:
54 # Адаптируем имена параметров из config к ожидаемым MegaConnector
55 dbhost = self.config['host']
56 dbport = self.config.get('port', 8123) # Используем порт из конфига или дефолтный
57 dbuser = self.config['username']
58 dbpass = self.config['password']
59
60 logger.info(f"Инициализация MegaConnector для ClickHouse: http://{dbuser}:***@{dbhost}:{dbport}")
61
62 # Создаем экземпляр MegaConnector
63 self.connector = MegaConnector(
64 dbuser=dbuser,
65 dbpass=dbpass,
66 dbhost=dbhost,
67 dbport=dbport
68 )
69 logger.info("MegaConnector для ClickHouse успешно инициализирован.")
70
71 except KeyError as e:
72 logger.error(f"Ошибка конфигурации ClickHouse: отсутствует обязательный ключ {e}")
73 self.connector = None
74 except Exception as e:
75 logger.error(f"Ошибка при инициализации MegaConnector для ClickHouse: {e}")
76 self.connector = None
77
78 def connect(self):
79 """
80 Проверка соединения с ClickHouse путем выполнения простого запроса 'SELECT 1'.
81 """
82 if self.connector is None:
83 logger.error("MegaConnector не был инициализирован из-за ошибки конфигурации.")
84 return False
85 try:
86 # Выполняем простой запрос для проверки соединения
87 # Используем _execute, так как он проще для SELECT 1,
88 # но можно и select_tsv_to_typed_df
89 # response = self.connector.select_tsv_to_typed_df('SELECT 1')
90 # Используем низкоуровневый _execute для простой проверки
91 response = self.connector._execute('SELECT 1')
92
93 # Проверяем статус ответа
94 if response.status_code == 200 and response.text.strip() == '1':
95 logger.info("Успешное подключение к ClickHouse (проверено через MegaConnector 'SELECT 1')!")
96 return True
97 else:
98 logger.error(f"Проверка подключения к ClickHouse не удалась. "
99 f"Статус: {response.status_code}, Ответ: {response.text[:100]}")
100 return False
101 except RequestException as e:
102 logger.error(f"Ошибка сети при проверке подключения к ClickHouse через MegaConnector: {e}")
103 return False
104 except Exception as e:
105 logger.error(f"Неожиданная ошибка при проверке подключения к ClickHouse через MegaConnector: {e}")
106 return False
107
108 def extract_data(self, table_name, start_date, end_date, query=None):
109 """
110 Извлечение данных из ClickHouse с использованием MegaConnector.
111 Игнорирует переданный 'query' и всегда использует фильтр по дате.
112
113 Args:
114 table_name (str): Имя таблицы для извлечения.
115 start_date (str): Начальная дата (YYYY-MM-DD).
116 end_date (str): Конечная дата (YYYY-MM-DD, не включительно).
117 query (str, optional): Этот аргумент игнорируется. Оставлен для совместимости интерфейса.
118
119 Returns:
120 pd.DataFrame: DataFrame с извлеченными данными.
121 Возвращает пустой DataFrame в случае ошибки или отсутствия данных.
122
123 Raises:
124 Exception: Если MegaConnector не инициализирован.
125 """
126 if self.connector is None:
127 logger.error("Соединение с ClickHouse (MegaConnector) не установлено.")
128 # Можно выбрасывать исключение или возвращать пустой DF, как раньше
129 raise Exception("Соединение с ClickHouse (MegaConnector) не установлено.")
130 # return pd.DataFrame()
131
132 # Формируем ЗАДАННЫЙ запрос, игнорируя параметр query
133 # Убедитесь, что колонка 'date' действительно существует и имеет подходящий тип
134 final_query = f"SELECT * FROM {table_name} WHERE date >= '{start_date}' AND date < '{end_date}'"
135
136 logger.info(f"Используется фиксированный запрос для извлечения данных: {final_query[:200]}{'...' if len(final_query) > 200 else ''}")
137
138 try:
139 logger.info(f"Выполнение запроса к ClickHouse через MegaConnector...")
140 # Используем select_tsv_to_typed_df для получения DataFrame с типами
141 data_df = self.connector.select_tsv_to_typed_df(final_query)
142 logger.info(f"Извлечено {len(data_df)} строк из ClickHouse с помощью MegaConnector.")
143
144 # Проверка типа не обязательна, т.к. select_tsv_to_typed_df должен возвращать DataFrame
145 if not isinstance(data_df, pd.DataFrame):
146 logger.warning(f"MegaConnector.select_tsv_to_typed_df вернул не DataFrame, а {type(data_df)}. "
147 f"Попытка преобразования...")
148 try:
149 data_df = pd.DataFrame(data_df)
150 except Exception:
151 logger.error("Не удалось преобразовать результат в DataFrame.")
152 return pd.DataFrame()
153
154 return data_df
155 except RequestException as e:
156 logger.error(f"Ошибка сети при извлечении данных из ClickHouse через MegaConnector: {e}")
157 return pd.DataFrame()
158 except RuntimeError as e: # Перехватываем RuntimeError из select_tsv_to_df для не 200 статусов
159 logger.error(f"Ошибка при извлечении данных из ClickHouse (MegaConnector): {e}")
160 return pd.DataFrame()
161 except Exception as e:
162 logger.error(f"Неожиданная ошибка при извлечении данных из ClickHouse: {e}", exc_info=True)
163 return pd.DataFrame()
164
165 def get_table_schema(self, table_name):
166 """
167 Получает схему таблицы из ClickHouse с использованием MegaConnector.
168
169 Args:
170 table_name (str): Имя таблицы.
171
172 Returns:
173 pd.DataFrame: DataFrame с описанием схемы таблицы.
174 Возвращает пустой DataFrame в случае ошибки.
175
176 Raises:
177 Exception: Если MegaConnector не инициализирован.
178 """
179 if self.connector is None:
180 logger.error("Соединение с ClickHouse (MegaConnector) не установлено.")
181 raise Exception("Соединение с ClickHouse (MegaConnector) не установлено.")
182
183 try:
184 query = f"DESCRIBE TABLE {table_name}"
185 logger.info(f"Получение схемы таблицы: {query}")
186 # Используем тот же метод для получения DataFrame
187 schema_df = self.connector.select_tsv_to_typed_df(query)
188
189 # Проверка типа (на всякий случай)
190 if not isinstance(schema_df, pd.DataFrame):
191 logger.warning(f"MegaConnector.select_tsv_to_typed_df для схемы вернул не DataFrame, а {type(schema_df)}.")
192 return pd.DataFrame() # Возвращаем пустой DataFrame
193
194 logger.info(f"Схема для таблицы {table_name} получена.")
195 return schema_df
196 except RequestException as e:
197 logger.error(f"Ошибка сети при получении схемы таблицы из ClickHouse: {e}")
198 return pd.DataFrame()
199 except RuntimeError as e:
200 logger.error(f"Ошибка при получении схемы таблицы из ClickHouse (MegaConnector): {e}")
201 return pd.DataFrame()
202 except Exception as e:
203 logger.error(f"Неожиданная ошибка при получении схемы таблицы из ClickHouse: {e}")
204 return pd.DataFrame()
205
206 # --- Методы пагинации ---
207 # Оставляем их, но они не реализованы для MegaConnector
208 def extract_all_data_using_pagination(self, base_query, order_by_column, chunk_size=10000, settings=None):
209 """
210 Извлекает ВСЕ данные из ClickHouse, используя пагинацию.
211 НЕ РЕАЛИЗОВАНО для MegaConnector.
212 """
213 logger.error("Метод extract_all_data_using_pagination не реализован для MegaConnector.")
214 raise NotImplementedError("Пагинация не реализована для MegaConnector в этой версии.")
215 # Или можно вернуть пустой DataFrame:
216 # return pd.DataFrame()
217
218 def extract_data_paginated(self, base_query, order_by_column, chunk_size=10000, settings=None):
219 """
220 Извлекает данные из ClickHouse частями (пагинация) с использованием генератора.
221 НЕ РЕАЛИЗОВАНО для MegaConnector.
222 """
223 logger.error("Метод extract_data_paginated не реализован для MegaConnector.")
224 raise NotImplementedError("Пагинация не реализована для MegaConnector в этой версии.")
225 # Или можно вернуть пустой генератор:
226 # yield pd.DataFrame() # Возвращаем пустой DataFrame и выходим
227 # return
228
229
230 # Метод close() не нужен, так как MegaConnector не управляет постоянным соединением.
231 # def close(self):
232 # logger.info("Метод close() не требуется для MegaConnector.")
233 # pass