· 4 years ago · Jun 12, 2021, 10:14 PM
1import pandas as pd
2import numpy as np
3import requests
4import pandahouse as ph
5
6# путь к большой таблице на 6 млн строк
7fPath = "C:\\GA_R\\csv\\"
8
9# грузим таблицу в пандафрейм
10GAaud_hist = pd.read_csv(fPath+'GAaud_hist'+'.csv',sep=';', decimal=',')
11# добавляем столбец с индексом
12GAaud_hist['index'] = np.arange(1, len(GAaud_hist) + 1)
13
14print(f'Сформированы датафреймы для заливы в КХ \n {len(GAaud_hist)} ')
15
16
17# настройки коннекта к КХ на Hetzner
18db = 'GA'
19connection_Hetzner = {'host': 'http://xx.xx.xxx.154:8123',
20 'database': db,
21 'user': 'default',
22 'password': 'hDmTNbXqtC0LY'
23 }
24
25connection = connection_Hetzner
26
27
28# формуруем запрос на создание таблицы в КликХаузе
29# вытягиваем все названия и типы столбцов из панды
30GAaud = pd.DataFrame(GAaud_hist.dtypes)
31# переводим индекс в столбец под названием Index
32GAaud.reset_index(level=0, inplace=True)
33# в столбце Index каждому значениею добавляем двойную кавычку, так как названия столбцов в КликХаузе должны быть в двойных кавычках
34GAaud['index'] = GAaud['index'].apply(lambda x: f'"{x}"')
35# словарь соответствия типов данных панды и типов данных КликХауза
36types = {'object': 'String', 'int64': 'UInt64', 'float64': 'Float64', 'int32': 'UInt32'}
37# переводим названия типов из нотации Питона в нотацию Кликхауза
38GAaud = GAaud.replace({0: types})
39# составляем строку запроса для КХ из таблицы cnt где теперь содержатся названия столбцов в двойных кавычках и типы данных в этих столбцах для КХ
40GAaud['clickhouse'] = GAaud['index'].str.cat(GAaud[0], sep=" ") # сначала получаем столбец состоящий из двух столбцов чтобы получить название столбца и название типа данных Например: "created" Strina
41ch_columns = GAaud['clickhouse'].tolist() # переводим полученный столбец в список, чтобы потом перевести в строку
42ch_columns = "\n,".join(str(x) for x in ch_columns) # переводим полученный список в строку где элементы списка разделены переносом строки и запятой
43
44# Формируем окончательную строку запроса в КХ
45comand = 'CREATE TABLE IF NOT EXISTS' # команда для Кликхауза.
46table_name = 'GAaud_hist' # название таблицы в Кликхаузе
47
48ch_query = f'{comand} "{db}"."{table_name}"\n({ch_columns}) \n ENGINE = MergeTree \n ORDER BY ("index")'
49
50
51
52# -- удаляем старую таблицу из Кликхауза с данными из прошлой заливки Датасета
53ph.execute(query=f'DROP TABLE IF EXISTS "{db}"."{table_name}"', connection=connection)
54# выполняем запрос qq
55ph.execute(query=ch_query, connection=connection)
56# записываем данные из pandas в clickhouse
57ph.to_clickhouse(GAaud_hist, table_name, index=False, connection=connection, chunksize=100000)
58
59print(f'Конец заливки {db}.{table_name}')
60