· 6 years ago · Nov 05, 2018, 08:46 AM
1import base64
2import urllib.parse
3from tlslite.utils import keyfactory
4import oauth2 as oauth
5
6class SignatureMethod_RSA_SHA1(oauth.SignatureMethod):
7 name = 'RSA-SHA1'
8
9 def signing_base(self, request, consumer, token):
10 if not hasattr(request, 'normalized_url') or request.normalized_url is None:
11 raise ValueError("Base URL for request is not set.")
12
13 sig = (
14 oauth.escape(request.method),
15 oauth.escape(request.normalized_url),
16 oauth.escape(request.get_normalized_parameters()),
17 )
18
19 key = '%s&' % oauth.escape(consumer.secret)
20 if token:
21 key += oauth.escape(token.secret)
22 raw = '&'.join(sig)
23 return key, raw
24
25 def sign(self, request, consumer, token):
26 """Builds the base signature string."""
27 key, raw = self.signing_base(request, consumer, token)
28
29 with open('MYLINK/rsa.pem', 'r') as f:
30 data = f.read()
31 privateKeyString = data.strip()
32
33 privatekey = keyfactory.parsePrivateKey(privateKeyString)
34 signature = privatekey.hashAndSign(raw)
35
36 return base64.b64encode(signature)
37
38
39
40
41consumer_key = 'python-oauth-key1'
42consumer_secret = 'Python OAuth'
43
44request_token_url = 'MYLINK/jira/plugins/servlet/oauth/request-token'
45access_token_url = 'MYLINK/jira/plugins/servlet/oauth/access-token'
46authorize_url = 'MYLINK/jira/plugins/servlet/oauth/authorize'
47
48data_url = 'MYLINK/rest/api/2/issue/NTS-27/'
49
50consumer = oauth.Consumer(consumer_key, consumer_secret)
51client = oauth.Client(consumer)
52
53# Lets try to access a JIRA issue (BULK-1). We should get a 401.
54resp, content = client.request(data_url, "GET")
55if resp['status'] != '401':
56 raise Exception("Should have no access!")
57
58consumer = oauth.Consumer(consumer_key, consumer_secret)
59client = oauth.Client(consumer)
60client.set_signature_method(SignatureMethod_RSA_SHA1())
61
62# Step 1: Get a request token. This is a temporary token that is used for
63# having the user authorize an access token and to sign the request to obtain
64# said access token.
65
66resp, content = client.request(request_token_url, "POST")
67if resp['status'] != '200':
68 raise Exception("Invalid response %s: %s" % (resp['status'], content))
69
70request_token = dict(urllib.parse.parse_qsl(content))
71
72print ("Request Token:")
73print (" - oauth_token = %s") % request_token['oauth_token']
74print (" - oauth_token_secret = %s") % request_token['oauth_token_secret']
75print
76
77# Step 2: Redirect to the provider. Since this is a CLI script we do not
78# redirect. In a web application you would redirect the user to the URL
79# below.
80
81 print ("Go to the following link in your browser:")
82 print ("%s?oauth_token=%s") % (authorize_url, request_token['oauth_token'])
83 print
84
85 # After the user has granted access to you, the consumer, the provider will
86 # redirect you to whatever URL you have told them to redirect to. You can
87 # usually define this in the oauth_callback argument as well.
88 accepted = 'n'
89 while accepted.lower() == 'n':
90 accepted = input('Have you authorized me? (y/n) ')
91 # oauth_verifier = raw_input('What is the PIN? ')
92
93 # Step 3: Once the consumer has redirected the user back to the oauth_callback
94 # URL you can request the access token the user has approved. You use the
95 # request token to sign this request. After this is done you throw away the
96 # request token and use the access token returned. You should store this
97 # access token somewhere safe, like a database, for future use.
98 token = oauth.Token(request_token['oauth_token'],
99 request_token['oauth_token_secret'])
100 #token.set_verifier(oauth_verifier)
101 client = oauth.Client(consumer, token)
102 client.set_signature_method(SignatureMethod_RSA_SHA1())
103
104 resp, content = client.request(access_token_url, "POST")
105 access_token = dict(urllib.parse.parse_qsl(content))
106
107 print ("Access Token:")
108 print (" - oauth_token = %s") % access_token['oauth_token']
109 print (" - oauth_token_secret = %s") % access_token['oauth_token_secret']
110 print
111 print ("You may now access protected resources using the access tokens above.")
112 print
113
114
115 # Now lets try to access the same issue again with the access token. We should get a 200!
116 accessToken = oauth.Token(access_token['oauth_token'], access_token['oauth_token_secret'])
117 client = oauth.Client(consumer, accessToken)
118 client.set_signature_method(SignatureMethod_RSA_SHA1())
119
120 resp, content = client.request(data_url, "GET")
121 if resp['status'] != '200':
122 raise Exception("Should have access!")