· 5 months ago · Apr 18, 2025, 12:55 PM
1import psycopg2
2from psycopg2 import extras
3import logging
4import pandas as pd
5# Assuming 'logger' is configured elsewhere, e.g.:
6# logging.basicConfig(level=logging.INFO)
7# logger = logging.getLogger(__name__)
8# Or replace logger.info/error etc. with logging.info/error
9logger = logging.getLogger(__name__) # Use standard logging if logger setup is elsewhere
10
11
12class PostgreSQLConnector:
13 """
14 Класс для управления подключением к PostgreSQL.
15 """
16
17 def __init__(self, config):
18 """
19 Инициализация подключения к PostgreSQL.
20
21 Args:
22 config (dict): Словарь с параметрами подключения.
23 """
24 self.config = config
25 self.conn = None
26 self.cursor = None
27
28 def connect(self):
29 """
30 Установление соединения с PostgreSQL.
31 """
32 try:
33 self.conn = psycopg2.connect(**self.config)
34 self.cursor = self.conn.cursor()
35 logger.info(f"Успешно подключено к PostgreSQL: {self.config.get('host', 'N/A')} (Database: {self.config.get('dbname', 'N/A')})")
36 except Exception as e:
37 logger.error(f"Ошибка подключения к PostgreSQL: {e}")
38 raise
39
40 def load_data(self, table_name, data, batch_size):
41 """
42 Загрузка данных из DataFrame в PostgreSQL, используя только столбцы,
43 существующие в целевой таблице.
44
45 Args:
46 table_name (str): Имя таблицы для загрузки (может включать схему, напр. 'schema.table').
47 data (pd.DataFrame): DataFrame с данными для загрузки.
48 batch_size (int): Размер пакета для вставки.
49 """
50 # --- Input Validation ---
51 if not isinstance(data, pd.DataFrame):
52 logger.error(f"Ошибка: Ожидался pandas DataFrame, получен {type(data)}")
53 raise TypeError("Данные для загрузки должны быть pandas DataFrame.")
54
55 if not self.conn or not self.cursor:
56 logger.error("Нет активного подключения к PostgreSQL. Вызовите connect() перед загрузкой данных.")
57 raise ConnectionError("Нет активного подключения к PostgreSQL.")
58
59 # --- Check if DataFrame is empty ---
60 if data.empty:
61 logger.warning(f"Нет данных для загрузки в таблицу '{table_name}' (DataFrame пуст).")
62 return
63
64 try:
65 # --- Get column names from the target table ---
66 # Safely handle schema.table format
67 if '.' in table_name:
68 schema_name, table_only_name = table_name.split('.', 1)
69 # Quote identifiers to handle potential special characters or keywords
70 schema_name = f'"{schema_name}"'
71 table_only_name = f'"{table_only_name}"'
72 # Use standard string formatting here as psycopg2 doesn't parameterize schema/table names directly in info schema
73 query_cols = f"""
74 SELECT column_name
75 FROM information_schema.columns
76 WHERE table_schema = {schema_name.replace('"', "'")} -- Use single quotes for SQL string literal
77 AND table_name = {table_only_name.replace('"', "'")}
78 ORDER BY ordinal_position;
79 """
80 else:
81 # Assume public schema if not specified
82 table_only_name = f'"{table_name}"'
83 query_cols = f"""
84 SELECT column_name
85 FROM information_schema.columns
86 WHERE table_schema = 'public' -- Or adjust if default schema differs
87 AND table_name = {table_only_name.replace('"', "'")}
88 ORDER BY ordinal_position;
89 """
90
91 logger.info(f"Получение столбцов для таблицы: {table_name}")
92 # logger.debug(f"Выполнение запроса: {query_cols}") # Debug log if needed
93 # No parameters needed here as table/schema names are embedded (carefully)
94 self.cursor.execute(query_cols)
95 db_column_names = [row[0] for row in self.cursor.fetchall()]
96
97 if not db_column_names:
98 logger.error(f"Не удалось получить столбцы для таблицы '{table_name}'. Существует ли таблица и есть ли у нее столбцы?")
99 raise ValueError(f"Таблица '{table_name}' не найдена или не имеет столбцов.")
100
101 logger.info(f"Ожидаемые столбцы в таблице '{table_name}': {db_column_names}")
102
103 # --- Prepare data for insertion ---
104 df_column_names = list(data.columns)
105 logger.info(f"Столбцы в предоставленном DataFrame: {df_column_names}")
106
107 # *** NEW LOGIC: Filter DataFrame columns ***
108 # Identify columns present in both the DataFrame and the DB table
109 columns_to_use = [col for col in db_column_names if col in df_column_names]
110 # Identify columns required by the DB but missing in the DataFrame
111 missing_columns = [col for col in db_column_names if col not in df_column_names]
112 # Identify extra columns in the DataFrame that are not in the DB table
113 extra_columns = [col for col in df_column_names if col not in db_column_names]
114
115 if missing_columns:
116 logger.error(f"Ошибка: Отсутствуют необходимые столбцы в DataFrame для таблицы '{table_name}'.")
117 logger.error(f"Не найдены столбцы: {missing_columns}")
118 logger.error(f"Ожидаемые столбцы (из БД): {db_column_names}")
119 logger.error(f"Столбцы в DataFrame: {df_column_names}")
120 raise ValueError(f"Несоответствие столбцов: В DataFrame отсутствуют обязательные столбцы таблицы {table_name}.")
121
122 if extra_columns:
123 logger.warning(f"Следующие столбцы из DataFrame будут проигнорированы, так как их нет в таблице '{table_name}': {extra_columns}")
124
125 if not columns_to_use:
126 logger.error(f"Ошибка: Ни один из столбцов DataFrame не соответствует столбцам таблицы '{table_name}'. Нечего вставлять.")
127 raise ValueError(f"Нет совпадающих столбцов между DataFrame и таблицей {table_name}.")
128
129 # Select only the necessary columns in the correct order
130 data_filtered = data[columns_to_use]
131 logger.info(f"Будут использованы следующие столбцы для вставки: {columns_to_use}")
132
133 # Convert filtered DataFrame to list of tuples/lists for execute_batch
134 data_list = data_filtered.values.tolist()
135
136 # --- Prepare SQL query ---
137 # Quote column names being inserted
138 columns_str = ", ".join(f'"{col}"' for col in columns_to_use)
139 placeholders = ", ".join(["%s"] * len(columns_to_use))
140 # Quote table name (already handled if schema was present)
141 # Ensure table_name is appropriately quoted if it wasn't split
142 if '.' not in table_name:
143 quoted_table_name = f'"public"."{table_name}"' # Assuming public, adjust if needed
144 else:
145 # Reuse split and quoted names
146 quoted_table_name = f"{schema_name}.{table_only_name}"
147
148 query = f"INSERT INTO {quoted_table_name} ({columns_str}) VALUES ({placeholders})"
149 logger.debug(f"Сформированный SQL запрос для execute_batch: {query}") # Debug log
150
151 # --- Execute Batch Insert ---
152 logger.info(f"Начало загрузки {len(data_list)} строк в таблицу '{table_name}' (Столбцы: {columns_to_use})...")
153 extras.execute_batch(self.cursor, query, data_list, page_size=batch_size)
154 self.conn.commit()
155
156 logger.info(f"Успешно загружено {len(data_list)} строк в PostgreSQL таблицу '{table_name}'.")
157
158 # Catch psycopg2 errors specifically for better context
159 except (Exception, psycopg2.DatabaseError) as e:
160 if self.conn: # Check connection exists before rollback
161 try:
162 self.conn.rollback() # Rollback transaction on error
163 logger.warning(f"Транзакция для таблицы '{table_name}' отменена из-за ошибки.")
164 except Exception as rb_ex:
165 logger.error(f"Ошибка при попытке отката транзакции для '{table_name}': {rb_ex}")
166
167 logger.error(f"Ошибка при загрузке данных в PostgreSQL таблицу '{table_name}': {e}")
168 # Optional: include traceback for detailed debugging
169 # import traceback
170 # logger.error(traceback.format_exc())
171 raise # Re-raise the exception after logging and rollback
172
173 def map_clickhouse_to_postgres_type(self, clickhouse_type):
174 """
175 Отображает типы данных ClickHouse на типы данных PostgreSQL.
176 (Code remains the same as in the question)
177 """
178 # ... (mapping logic as provided - no changes needed here) ...
179 type_mapping = {
180 "int8": "SMALLINT",
181 "int16": "SMALLINT",
182 "int32": "INTEGER",
183 "int64": "BIGINT",
184 "uint8": "SMALLINT", # Be careful with unsigned -> signed mapping range
185 "uint16": "INTEGER",
186 "uint32": "BIGINT",
187 "uint64": "NUMERIC", # PostgreSQL BIGINT is signed, NUMERIC is safer for full range
188 "float32": "REAL",
189 "float64": "DOUBLE PRECISION",
190 "decimal": "NUMERIC", # Need to handle precision/scale potentially
191 "string": "TEXT",
192 "fixedstring": "TEXT", # Map FixedString to TEXT
193 "date": "DATE",
194 "datetime": "TIMESTAMP",
195 "datetime64": "TIMESTAMP", # Precision might be lost
196 "uuid": "UUID",
197 "boolean": "BOOLEAN",
198 "array(int8)": "SMALLINT[]",
199 "array(int16)": "SMALLINT[]",
200 "array(int32)": "INTEGER[]",
201 "array(int64)": "BIGINT[]",
202 "array(uint8)": "SMALLINT[]",
203 "array(uint16)": "INTEGER[]",
204 "array(uint32)": "BIGINT[]",
205 "array(uint64)": "NUMERIC[]",
206 "array(float32)": "REAL[]",
207 "array(float64)": "DOUBLE PRECISION[]",
208 "array(string)": "TEXT[]",
209 "array(date)": "DATE[]",
210 "array(datetime)": "TIMESTAMP[]",
211 "array(datetime64)": "TIMESTAMP[]",
212 "array(uuid)": "UUID[]",
213 "array(boolean)": "BOOLEAN[]",
214 # Add other types as needed
215 }
216
217 clickhouse_type_lower = clickhouse_type.lower()
218
219 # Handle complex types first (Nullable, LowCardinality, Decimal(P,S), DateTime64(P), FixedString(N))
220 if clickhouse_type_lower.startswith("nullable("):
221 inner_type = clickhouse_type[9:-1]
222 return self.map_clickhouse_to_postgres_type(inner_type) # Nullability is handled by column definition, not type itself in PG
223 if clickhouse_type_lower.startswith("lowcardinality("):
224 inner_type = clickhouse_type[15:-1]
225 return self.map_clickhouse_to_postgres_type(inner_type) # Map underlying type
226 if clickhouse_type_lower.startswith("datetime64"):
227 return "TIMESTAMP"
228 if clickhouse_type_lower.startswith("decimal"):
229 # Potentially parse P, S and map to NUMERIC(P, S) if needed, otherwise default NUMERIC
230 return "NUMERIC"
231 if clickhouse_type_lower.startswith("fixedstring"):
232 return "TEXT" # Or VARCHAR(N) if you parse N and want a limit
233
234 # Handle basic types
235 mapped_type = type_mapping.get(clickhouse_type_lower)
236
237 if mapped_type is None:
238 logger.warning(f"Неизвестный или необработанный тип данных ClickHouse: {clickhouse_type}. Используется TEXT.")
239 return "TEXT" # Default fallback
240 return mapped_type
241
242
243 def close(self):
244 """
245 Закрытие соединения с PostgreSQL.
246 """
247 if self.cursor:
248 try:
249 self.cursor.close()
250 except Exception as e:
251 logger.warning(f"Ошибка при закрытии курсора PostgreSQL: {e}")
252 if self.conn:
253 try:
254 self.conn.close()
255 logger.info("Соединение с PostgreSQL закрыто.")
256 except Exception as e:
257 logger.warning(f"Ошибка при закрытии соединения PostgreSQL: {e}")
258 self.cursor = None
259 self.conn = None
260
261# Example Usage (assuming config and logger are set up)
262# if __name__ == '__main__':
263# # Configure logging
264# logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
265# logger = logging.getLogger(__name__)
266#
267# # Sample Config
268# db_config = {
269# 'dbname': 'your_db',
270# 'user': 'your_user',
271# 'password': 'your_password',
272# 'host': 'localhost',
273# 'port': '5432'
274# }
275#
276# # Sample DataFrame (Assume target table 'public.my_table' has columns 'id', 'name', 'value')
277# data_to_load = pd.DataFrame({
278# 'id': [1, 2, 3],
279# 'name': ['apple', 'banana', 'cherry'],
280# 'value': [10.5, 22.1, 7.0],
281# 'extra_col': ['foo', 'bar', 'baz'], # This column should be ignored
282# 'another_extra': [True, False, True] # This too
283# })
284#
285# # DataFrame missing a required column (will cause error)
286# # data_missing_col = pd.DataFrame({
287# # 'id': [4, 5],
288# # 'name': ['date', 'elderberry'],
289# # 'extra_col': ['x', 'y'] # Missing 'value'
290# #})
291#
292# connector = PostgreSQLConnector(db_config)
293# try:
294# connector.connect()
295#
296# # Create a dummy table for testing
297# try:
298# connector.cursor.execute("""
299# DROP TABLE IF EXISTS public.my_table;
300# CREATE TABLE public.my_table (
301# id INTEGER PRIMARY KEY,
302# name TEXT,
303# value NUMERIC(10, 2)
304# );
305# """)
306# connector.conn.commit()
307# logger.info("Тестовая таблица 'public.my_table' создана.")
308# except Exception as create_err:
309# logger.error(f"Не удалось создать тестовую таблицу: {create_err}")
310# if connector.conn: connector.conn.rollback()
311# raise
312#
313# logger.info("--- Тест 1: Загрузка данных с лишними столбцами ---")
314# connector.load_data('public.my_table', data_to_load, batch_size=100)
315#
316# # Verify insertion
317# connector.cursor.execute("SELECT * FROM public.my_table ORDER BY id;")
318# results = connector.cursor.fetchall()
319# logger.info(f"Данные в таблице после вставки: {results}")
320# # Expected output should only contain id, name, value
321#
322# # logger.info("--- Тест 2: Попытка загрузки данных с отсутствующим столбцом ---")
323# # try:
324# # connector.load_data('public.my_table', data_missing_col, batch_size=100)
325# # except ValueError as e:
326# # logger.info(f"Ожидаемая ошибка перехвачена: {e}")
327#
328# except Exception as e:
329# logger.error(f"Произошла ошибка во время выполнения примера: {e}")
330# finally:
331# connector.close()