· 9 years ago · Dec 16, 2016, 12:06 PM
1##
2# application.py
3##
4from eve import Eve
5from hmac import HMACAuth
6
7SETTINGS = {
8 'DEBUG': True,
9 'MONGO_HOST': '127.0.0.1',
10 'MONGO_PORT': 27017,
11 'MONGO_DBNAME': 'testing',
12 'DOMAIN': {'test': {}},
13}
14
15app = Eve(auth=HMACAuth, settings=SETTINGS)
16
17
18if __name__ == '__main__':
19 app.run(use_reloader=True)
20
21##
22# hmac.py
23##
24import time
25import hmac
26from eve.auth import HMACAuth
27from flask import current_app as app
28from hashlib import sha256
29
30
31class HMACAuth(HMACAuth):
32
33 def check_auth(self, userid, hmac_hash, headers, data, allowed_roles, resource, method):
34 # get user from database
35 accounts = app.data.driver.db['accounts']
36 user = accounts.find_one({'userid': userid})
37 if user:
38 # user found, we have its secret_key and we can re-create the signature user sent us
39 check_sig = hmac.new(bytes(user['secret_key'], 'utf8'), b'', sha256)
40 check_sig.update(bytes(headers['TIMESTAMP'], 'utf-8'))
41 check_sig.update(data)
42 check_signature = check_sig.hexdigest()
43
44 # try to use python's hmac.compare_digest() to compare user's signature
45 # and the one we re-created
46 if hmac.compare_digest(check_signature, hmac_hash):
47 # signature seems fine, we have to check if the request was sent in past 30 seconds
48 # we are also checking for negative time because we have a test case with timestamp
49 # in the future, so time_diff ends up with a negative number
50 time_diff = int(time.time()) - int(headers['TIMESTAMP'])
51 if 0 <= time_diff <= 30:
52 # everything seems superfine!
53 return True
54 else:
55 # time_diff was either negative or more than 30
56 print('TIME ERROR ({}).'.format(time_diff))
57 else:
58 # hmac.compare_digest() failed!
59 print('WRONG HASH!')
60 else:
61 # user doesn't even exist in our db
62 print('USER DOES NOT EXIST.')
63
64 # probably a mongodb related problem, should be properly wrapped in try/except block
65 print('WAT?')
66 return False
67
68##
69# tests.py
70##
71import json
72import unittest
73import arrow
74import base64
75import hmac
76from hashlib import sha256
77from application import app
78
79
80def prepare_test_request(
81 test_user='testuser',
82 test_user_secret='xxwMXEqOGiY2TssVZ9hvOB4x6EVW3RW75hjAKEai4UBlxG0ts8Js8dsWOzDvAVq4',
83 seconds=0
84):
85
86 timestamp = arrow.utcnow().replace(seconds=+seconds)
87
88 payload = {'testing': 'data'}
89 payload_json = json.dumps(payload)
90 payload_bytes = bytes(payload_json, 'utf-8')
91 # is there any reason to send encoded payload to server or is json-formatted string just fine?
92 payload_encoded = base64.urlsafe_b64encode(payload_bytes) # not used at the moment
93
94 sig = hmac.new(bytes(test_user_secret, 'utf-8'), b'', sha256)
95 sig.update(bytes(str(timestamp.timestamp), 'utf-8'))
96 sig.update(payload_bytes)
97 signature = sig.hexdigest()
98
99 headers = {
100 'Authorization': '{}:{}'.format(test_user, signature),
101 'timestamp': str(timestamp.timestamp),
102 }
103 return payload_bytes, headers
104
105
106class BaseTest(unittest.TestCase):
107
108 def setUp(self):
109 app.config['TESTING'] = True
110 self.app = app.test_client()
111
112 def tearDown(self):
113 pass
114
115 def test_hmac(self):
116 data, headers = prepare_test_request() # plain request should return 200
117 req = self.app.get('http://127.0.0.1:5000/', data=data, headers=headers)
118 assert req.status_code == 200
119
120 def test_hmac_30(self):
121 data, headers = prepare_test_request(seconds=-30) # 30 seconds ago should return 200
122 req = self.app.get('http://127.0.0.1:5000/', data=data, headers=headers)
123 assert req.status_code == 200
124
125 def test_hmac_31(self):
126 data, headers = prepare_test_request(seconds=-31) # 31 seconds ago should return 401
127 req = self.app.get('http://127.0.0.1:5000/', data=data, headers=headers)
128 assert req.status_code == 401
129
130 def test_hmac_future(self):
131 data, headers = prepare_test_request(seconds=10) # 10 seconds in the future should return 401
132 req = self.app.get('http://127.0.0.1:5000/', data=data, headers=headers)
133 assert req.status_code == 401
134
135 def test_hmac_nonexisting_user(self):
136 data, headers = prepare_test_request(test_user='nonexisting')
137 req = self.app.get('http://127.0.0.1:5000/', data=data, headers=headers)
138 assert req.status_code == 401
139
140 def test_hmac_wrong_key(self):
141 data, headers = prepare_test_request(test_user_secret='wrong!')
142 req = self.app.get('http://127.0.0.1:5000/', data=data, headers=headers)
143 assert req.status_code == 401