· 5 years ago · Aug 11, 2020, 11:54 AM
1import json
2import os
3import re
4from elasticsearch6 import Elasticsearch
5import logging
6import requests
7
8# import psycopg2
9
10log = logging.getLogger(__name__)
11
12
13class EsClient:
14 def __init__(self, *args, **kwargs):
15 self.fixture_match = re.compile(r'^create[^\d]+(\d+).json$')
16 self.client = self._get_client(os.environ.get("ES_HOST", "localhost"))
17
18 def _get_fixtures(self):
19 for f in os.scandir("tests/fixtures/enable"):
20 find = self.fixture_match.findall(f.name)
21 if not find:
22 # no matching
23 continue
24
25 yield find[0], f.path
26
27 def _get_client(self, host: str):
28 return Elasticsearch([{"host": host, "port": ****}])
29
30 def create_index(self):
31 if self.client.indices.exists(index="index_v2"):
32 log.debug("skip: index already exists")
33 log.debug("wait green status")
34 self.client.cluster.health(wait_for_status='green')
35 return
36
37 with open('tests/fixtures/mapping.json') as file:
38 mapping = json.load(file)
39 self.client.indices.create(index="index_v2", body=mapping)
40 log.debug("index was created")
41
42 log.debug("wait green status")
43 self.client.cluster.health(wait_for_status='green')
44
45 for location_id, file_path in self._get_fixtures():
46 with open(file_path) as json_file:
47 location = json.load(json_file)
48
49 self.client.index(
50 index='index_v2',
51 id=location_id,
52 doc_type='type_location',
53 body=location,
54 refresh='wait_for'
55 )
56 log.info(f"add new record in index {location_id}")
57
58 self.client.indices.refresh(index="location_index_v2")
59
60 log.info("--- create index done ---")
61
62
63"""
64class DbClient:
65 def __init__(self):
66 self.dbname = 'name'
67 self.user = 'user'
68 self.password = 'password'
69 self.host = 'host'
70
71 def _get_db_connect(self):
72 conn = psycopg2.connect(dbname=self.dbname, user=self.user,
73 password=self.password, host=self.host)
74
75 return conn.cursor()
76
77 def clear_after_tests(self):
78 cursor = self._get_db_connect()
79 cursor.execute('TRUNCATE TABLE ONLY table_name1, table_name2')
80 cursor.execute('ALTER SEQUENCE seq1 RESTART WITH 1')
81 cursor.execute('ALTER SEQUENCE seq2 RESTART WITH 1')
82"""
83
84
85def get_app_host():
86 host = os.environ.get("APP_HOST", "host")
87 port = os.environ.get("APP_PORT", "****")
88
89 return f'{host}:{port}'
90
91
92def get_auth_headers():
93 url = get_app_host() + "путь/до/логина"
94 response = requests.post(url,
95 json={'login': 'login',
96 'password': 'password'})
97 return dict(Authorization=f"Bearer {response.json().get('token')}")
98
99
100def not_strict_comparison(a, b):
101 if a == b:
102 return True
103
104 if type(a) != type(b):
105 return False
106
107 if isinstance(a, dict):
108 for k in a:
109 if k not in b:
110 return False
111
112 if a[k] == b[k]:
113 continue
114
115 return not_strict_comparison(a[k], b[k])
116
117 if isinstance(a, list):
118 for i, v in enumerate(a):
119 if type(v) in (int, str):
120 if v in b:
121 continue
122 return False
123 return not_strict_comparison(v, b[i])
124
125 if type(a) in (int, str):
126 return a == b
127
128 return True