· 4 years ago · Jun 12, 2021, 10:12 PM
1import pandas as pd
2import numpy as np
3import datetime
4import requests
5import pandahouse as ph
6
7# путь к большой таблице на 6 млн строк
8fPath = "C:\\GA_R\\csv\\"
9
10# грузим таблицу в пандафрейм
11GAaud_hist = pd.read_csv(fPath+'GAaud_hist'+'.csv',sep=';', decimal=',')
12# добавляем столбец с индексом
13GAaud_hist['index'] = np.arange(1, len(GAaud_hist) + 1)
14
15print(f'Сформированы датафреймы для заливы в КХ \n {len(GAaud_hist)} ')
16
17
18# настройки коннекта к КХ на Hetzner
19db = 'GA'
20connection_Hetzner = {'host': 'http://xx.xx.xxx.154:8123',
21 'database': db,
22 'user': 'default',
23 'password': 'hDmTNbXqtC0LY'
24 }
25
26connection = connection_Hetzner
27
28
29# формуруем запрос на создание таблицы в КликХаузе
30# вытягиваем все названия и типы столбцов из панды
31GAaud = pd.DataFrame(GAaud_hist.dtypes)
32# переводим индекс в столбец под названием Index
33GAaud.reset_index(level=0, inplace=True)
34# в столбце Index каждому значениею добавляем двойную кавычку, так как названия столбцов в КликХаузе должны быть в двойных кавычках
35GAaud['index'] = GAaud['index'].apply(lambda x: f'"{x}"')
36# словарь соответствия типов данных панды и типов данных КликХауза
37types = {'object': 'String', 'int64': 'UInt64', 'float64': 'Float64', 'int32': 'UInt32'}
38# переводим названия типов из нотации Питона в нотацию Кликхауза
39GAaud = GAaud.replace({0: types})
40# составляем строку запроса для КХ из таблицы cnt где теперь содержатся названия столбцов в двойных кавычках и типы данных в этих столбцах для КХ
41GAaud['clickhouse'] = GAaud['index'].str.cat(GAaud[0], sep=" ") # сначала получаем столбец состоящий из двух столбцов чтобы получить название столбца и название типа данных Например: "created" Strina
42ch_columns = GAaud['clickhouse'].tolist() # переводим полученный столбец в список, чтобы потом перевести в строку
43ch_columns = "\n,".join(str(x) for x in ch_columns) # переводим полученный список в строку где элементы списка разделены переносом строки и запятой
44
45# Формируем окончательную строку запроса в КХ
46comand = 'CREATE TABLE IF NOT EXISTS' # команда для Кликхауза.
47table_name = 'GAaud_hist' # название таблицы в Кликхаузе
48
49ch_query = f'{comand} "{db}"."{table_name}"\n({ch_columns}) \n ENGINE = MergeTree \n ORDER BY ("index")'
50
51
52
53# -- удаляем старую таблицу из Кликхауза с данными из прошлой заливки Датасета
54ph.execute(query=f'DROP TABLE IF EXISTS "{db}"."{table_name}"', connection=connection)
55# выполняем запрос qq
56ph.execute(query=ch_query, connection=connection)
57# записываем данные из pandas в clickhouse
58ph.to_clickhouse(GAaud_hist, table_name, index=False, connection=connection, chunksize=100000)
59
60print(f'Конец заливки {db}.{table_name}')
61