· 7 years ago · Apr 05, 2018, 03:12 PM
1import collections
2import datetime
3import hashlib
4import hmac
5import json
6import logging
7import unittest
8import uuid
9
10import pytz
11import requests
12
13
14POSSIBLE_CURRENCIES = {'AUD', 'CAD', 'CHF', 'EUR', 'GBP', 'SEK', 'THB', 'USD'}
15MIN_STAY_VIOLATION = 'MIN_STAY_VIOLATION'
16TURNOVER_VIOLATION = 'TURNOVER_VIOLATION'
17DATE_RANGE_UNAVAILABLE = 'DATE_RANGE_UNAVAILABLE'
18VIOLATION_CODES = {MIN_STAY_VIOLATION, TURNOVER_VIOLATION, DATE_RANGE_UNAVAILABLE}
19TURNOVER_DAYS = {'MONDAY', 'TUESDAY', 'WEDNESDAY', 'THURSDAY', 'FRIDAY', 'SATURDAY', 'SUNDAY'}
20ERROR_REASONS = {'PROPERTY_INACTIVE', 'DATE_RANGE_INVALID', 'PARTY_SIZE_INVALID', 'RATE_UNAVAILABLE', 'OTHER'}
21
22# TODO: Update the following variables to match your system
23SECRET_KEY = '1ec2540e72154448851639f2c9109d9d3c553b2cbf184846a0ff2954b18e0351fe4d6d9dc703405aaafbb0891f47c5d1f081e46b378747468554ca6e99cccc57'
24BASE_URL = 'https://rates.nextpax.com/'
25PATH = 'api/v1/tripadvisor/query'
26EXTERNAL_LISTING_REFERENCE = '61555999'
27EXTERNAL_ACCOUNT_REFERENCE = '2888742'
28
29CLIENT_NAME = 'tripadvisor-vr'
30TIMESTAMP_FORMAT = '%Y-%m-%dT%H:%M:%SZ'
31SIGNATURE_FORMAT = "VRS-HMAC-SHA512 timestamp={timestamp}, client={client}, signature={signature}"
32QUERY_STRING_FORMAT = 'guests={guests}&externalListingReference={external_listing_reference}&externalAccountReference={external_account_reference}&arrival={arrival}&departure={departure}&requestId={request_id}'
33
34logging.basicConfig(
35 format='%(asctime)s %(levelname)s %(funcName)s %(message)s',
36 level=logging.INFO
37)
38
39QueryParameters = collections.namedtuple(
40 'QueryParameters',
41 [
42 'guests',
43 'external_listing_reference',
44 'external_account_reference',
45 'arrival',
46 'departure',
47 ]
48)
49
50# TODO: Update the following test inputs to match your system
51# Comment out a top-level key, value pair to skip that particular test
52TEST_CASES = {
53 'successful_response': QueryParameters(
54 guests=3,
55 external_listing_reference=EXTERNAL_LISTING_REFERENCE,
56 external_account_reference=EXTERNAL_ACCOUNT_REFERENCE,
57 arrival='2018-04-09',
58 departure='2018-04-21',
59 ),
60 # 'min_stay_violation': QueryParameters(
61 # guests=16,
62 # external_listing_reference=EXTERNAL_LISTING_REFERENCE,
63 # external_account_reference=EXTERNAL_ACCOUNT_REFERENCE,
64 # arrival='2018-08-01',
65 # departure='2018-08-05',
66 # ),
67 'date_range_unavailable_violation': QueryParameters(
68 guests=3,
69 external_listing_reference=EXTERNAL_LISTING_REFERENCE,
70 external_account_reference=EXTERNAL_ACCOUNT_REFERENCE,
71 arrival='2018-08-01',
72 departure='2018-08-31',
73 ),
74 # 'turnday_violation': QueryParameters(
75 # guests=18,
76 # external_listing_reference=EXTERNAL_LISTING_REFERENCE,
77 # external_account_reference=EXTERNAL_ACCOUNT_REFERENCE,
78 # arrival='2018-08-02',
79 # departure='2018-08-03',
80 # ),
81 # 'property_inactive_error': QueryParameters(
82 # guests=10,
83 # external_listing_reference='abc123',
84 # external_account_reference=EXTERNAL_ACCOUNT_REFERENCE,
85 # arrival='2018-08-03',
86 # departure='2018-08-04',
87 # ),
88 'date_range_invalid_error': QueryParameters(
89 guests=3,
90 external_listing_reference=EXTERNAL_LISTING_REFERENCE,
91 external_account_reference=EXTERNAL_ACCOUNT_REFERENCE,
92 arrival='2018-08-04',
93 departure='2018-08-02',
94 ),
95 'party_size_invalid_error': QueryParameters(
96 guests=7,
97 external_listing_reference=EXTERNAL_LISTING_REFERENCE,
98 external_account_reference=EXTERNAL_ACCOUNT_REFERENCE,
99 arrival='2018-08-03',
100 departure='2018-08-04',
101 ),
102 'other_error': QueryParameters(
103 guests=3,
104 external_listing_reference=EXTERNAL_LISTING_REFERENCE,
105 external_account_reference=EXTERNAL_ACCOUNT_REFERENCE,
106 arrival='2018-08-03',
107 departure='',
108 ),
109}
110
111
112class AssistedRateSpecTest(unittest.TestCase):
113 s = requests.Session()
114
115 def _send_request(self, request):
116 """
117
118 :type request: requests.PreparedRequest
119 :rtype: tuple[requests.Response, dict|None]
120 :return: tuple[response, response body as dict, if present]
121
122 """
123
124 response = self.s.send(request)
125
126 try:
127 body = response.json()
128 except ValueError:
129 body = None
130
131 if response.status_code == 200:
132 self.validate_200_response(body)
133 elif response.status_code == 400:
134 self.validate_400_response(body)
135 else:
136 raise RuntimeError('Unexpected HTTP response code' + ' ' + str(response.status_code) + str(response.content))
137
138 return response, body
139
140 def validate_200_response(self, body):
141 self.assertIn('details', body)
142
143 details = body['details']
144
145 self.assertIn('baseRate', details)
146 self.assertGreater(details['baseRate']['amount'], 0)
147 self.assertIn(details['baseRate']['currency'], POSSIBLE_CURRENCIES)
148
149 self.assertIn('tax', details)
150 self.assertGreaterEqual(details['tax']['amount'], 0)
151 self.assertIn(details['tax']['currency'], POSSIBLE_CURRENCIES)
152
153 if 'deposit' in details:
154 self.assertGreater(details['deposit']['amount'], 0)
155 self.assertIn(details['deposit']['currency'], POSSIBLE_CURRENCIES)
156
157 if 'customFees' in details:
158 self.assertGreaterEqual(len(details['customFees']), 1)
159
160 for custom_fee in details['customFees']:
161 self.assertGreaterEqual(len(custom_fee['name']), 1)
162 self.assertLessEqual(len(custom_fee['name']), 255)
163
164 self.assertGreater(custom_fee['rate']['amount'], 0)
165 self.assertIn(custom_fee['rate']['currency'], POSSIBLE_CURRENCIES)
166
167 self.assertEqual(
168 {'baseRate', 'tax', 'deposit', 'customFees'} | set(details.keys()),
169 {'baseRate', 'tax', 'deposit', 'customFees'}
170 )
171
172 if 'eligibility' in body:
173 self.assertIn('tripViolations', body['eligibility'])
174 self.assertEqual(set(body['eligibility'].keys()), {'tripViolations'})
175
176 trip_violations = body['eligibility']['tripViolations']
177
178 self.assertGreaterEqual(len(trip_violations), 1)
179 self.assertEqual(
180 len(trip_violations),
181 len(set([trip_violation['violationCode'] for trip_violation in trip_violations]))
182 )
183
184 for trip_violation in trip_violations:
185 self.assertIn(trip_violation['violationCode'], VIOLATION_CODES)
186
187 if trip_violation['violationCode'] == TURNOVER_VIOLATION:
188 self.assertEqual(set(trip_violation.keys()), {'violationCode', 'turnover'})
189 self.assertIn(trip_violation['turnover'], TURNOVER_DAYS)
190 elif trip_violation['violationCode'] == MIN_STAY_VIOLATION:
191 self.assertEqual(set(trip_violation.keys()), {'violationCode', 'minStay'})
192 self.assertIsInstance(trip_violation['minStay'], int)
193 self.assertGreater(trip_violation['minStay'], 1)
194 else:
195 self.assertEqual(set(trip_violation.keys()), {'violationCode'})
196
197 def validate_400_response(self, body):
198 self.assertIn('errors', body)
199
200 errors = body['errors']
201
202 self.assertGreaterEqual(len(errors), 1)
203
204 for error in errors:
205 self.assertEqual(
206 {'reason', 'description'} | set(error.keys()),
207 {'reason', 'description'}
208 )
209
210 self.assertIn('reason', error)
211 self.assertIn(error['reason'], ERROR_REASONS)
212
213 if 'description' in error:
214 self.assertGreaterEqual(len(error['description']), 1)
215 self.assertLessEqual(len(error['description']), 255)
216
217 self.assertEqual(
218 len(errors),
219 len(set([e['reason'] for e in errors]))
220 )
221
222 @unittest.skipIf('successful_response' not in TEST_CASES, 'Test case not implemented')
223 def test_successful_response(self):
224 response, body = self._send_request(_get_request(TEST_CASES['successful_response']))
225
226 self.assertEqual(response.status_code, 200)
227
228 @unittest.skipIf('min_stay_violation' not in TEST_CASES, 'Test case not implemented')
229 def test_min_stay_violation(self):
230 response, body = self._send_request(_get_request(TEST_CASES['min_stay_violation']))
231
232 self.assertEqual(response.status_code, 200)
233
234 min_stay_violations = [
235 v for v in body['eligibility']['tripViolations']
236 if v['violationCode'] == 'MIN_STAY_VIOLATION'
237 ]
238
239 self.assertEqual(len(min_stay_violations), 1)
240
241 @unittest.skipIf('date_range_unavailable_violation' not in TEST_CASES, 'Test case not implemented')
242 def test_date_range_unavailable(self):
243 response, body = self._send_request(_get_request(TEST_CASES['date_range_unavailable_violation']))
244
245 self.assertEqual(response.status_code, 200)
246
247 date_range_unavailable_violations = [
248 v for v in body['eligibility']['tripViolations']
249 if v['violationCode'] == 'DATE_RANGE_UNAVAILABLE'
250 ]
251
252 self.assertEqual(len(date_range_unavailable_violations), 1)
253
254 @unittest.skipIf('turnday_violation' not in TEST_CASES, 'Test case not implemented')
255 def test_turnday(self):
256 response, body = self._send_request(_get_request(TEST_CASES['turnday_violation']))
257
258 self.assertEqual(response.status_code, 200)
259
260 turnover_violations = [
261 v for v in body['eligibility']['tripViolations']
262 if v['violationCode'] == 'TURNOVER_VIOLATION'
263 ]
264
265 self.assertEqual(len(turnover_violations), 1)
266
267 @unittest.skipIf('property_inactive_error' not in TEST_CASES, 'Test case not implemented')
268 def test_property_inactive_error(self):
269 response, body = self._send_request(_get_request(TEST_CASES['property_inactive_error']))
270
271 self.assertEqual(response.status_code, 400)
272
273 self.assertIn('errors', body)
274
275 property_inactive_errors = [
276 v for v in body['errors']
277 if v['reason'] == 'PROPERTY_INACTIVE'
278 ]
279
280 self.assertEqual(len(property_inactive_errors), 1)
281
282 @unittest.skipIf('date_range_invalid_error' not in TEST_CASES, 'Test case not implemented')
283 def test_date_range_invalid_error(self):
284 response, body = self._send_request(_get_request(TEST_CASES['date_range_invalid_error']))
285
286 self.assertEqual(response.status_code, 400)
287
288 self.assertIn('errors', body)
289
290 property_inactive_errors = [
291 v for v in body['errors']
292 if v['reason'] == 'DATE_RANGE_INVALID'
293 ]
294
295 self.assertEqual(len(property_inactive_errors), 1)
296
297 @unittest.skipIf('party_size_invalid_error' not in TEST_CASES, 'Test case not implemented')
298 def test_party_size_invalid_error(self):
299 response, body = self._send_request(_get_request(TEST_CASES['party_size_invalid_error']))
300
301 self.assertEqual(response.status_code, 400)
302
303 self.assertIn('errors', body)
304
305 property_inactive_errors = [
306 v for v in body['errors']
307 if v['reason'] == 'PARTY_SIZE_INVALID'
308 ]
309
310 self.assertEqual(len(property_inactive_errors), 1)
311
312 @unittest.skipIf('other_error' not in TEST_CASES, 'Test case not implemented')
313 def test_other_error(self):
314 response, body = self._send_request(_get_request(TEST_CASES['other_error']))
315
316 self.assertEqual(response.status_code, 400)
317
318 self.assertIn('errors', body)
319
320 property_inactive_errors = [
321 v for v in body['errors']
322 if v['reason'] == 'OTHER'
323 ]
324
325 self.assertGreaterEqual(len(property_inactive_errors), 1)
326
327
328def _get_request(query_parameters):
329 now = datetime.datetime.now(tz=pytz.UTC)
330 body = ''
331
332 query_string = QUERY_STRING_FORMAT.format(
333 guests=query_parameters.guests,
334 external_listing_reference=query_parameters.external_listing_reference,
335 external_account_reference=query_parameters.external_account_reference,
336 arrival=query_parameters.arrival,
337 departure=query_parameters.departure,
338 request_id='TA_TEST_REQUEST'
339 )
340
341 url = "{}{}?{}".format(BASE_URL, PATH, query_string)
342 r = requests.Request(
343 'GET',
344 url,
345 )
346
347 signature = SIGNATURE_FORMAT.format(
348 timestamp=now.strftime(TIMESTAMP_FORMAT),
349 client=CLIENT_NAME,
350 signature=_get_signature(
351 r.method,
352 PATH,
353 query_string,
354 now,
355 body,
356 )
357 )
358
359 r.headers['Authorization'] = signature
360
361 logging.info(
362 "Request {}".format(json.dumps({
363 'url': r.url,
364 'method': r.method,
365 'path': PATH,
366 'query_string': query_string,
367 'body': body,
368 'timestamp': now.strftime(TIMESTAMP_FORMAT),
369 'client': CLIENT_NAME,
370 'secret': SECRET_KEY,
371 'signature': signature,
372 }))
373 )
374
375 return r.prepare()
376
377
378def _get_signature(
379 method,
380 path,
381 query_string,
382 timestamp,
383 body
384):
385 print(query_string)
386 canonical_request = '\n'.join([
387 method,
388 path,
389 query_string,
390 timestamp.strftime(TIMESTAMP_FORMAT),
391 hashlib.sha512(body.encode('utf-8')).hexdigest()
392 ])
393
394 canonical_request_hash = hashlib.sha512(canonical_request.encode('utf-8')).hexdigest()
395
396 return hmac.new(SECRET_KEY.encode('utf-8'), canonical_request_hash.encode('utf-8'), hashlib.sha512).hexdigest()
397
398
399if __name__ == '__main__':
400 unittest.main()