· 4 years ago · May 17, 2021, 04:24 PM
1# -*- coding: utf-8 -*-
2
3import json
4import logging
5import msal
6import requests
7
8from typing import Any
9
10from prefect import Task
11from prefect.utilities.tasks import defaults_from_attrs
12
13
14"""
15TODO:
16 - Refresh tokens instead of grabbing a new one and manage its lifecycle correctly
17 - Allow for more flexible connection parameters
18 (user/password and client_id/client_secret flows
19"""
20
21
22class SharePointFetch(Task):
23 """
24 Task for fetching results from Sharepoint Graph API query
25
26 Information on how to setup Azure AD App-Only access
27 https://docs.microsoft.com/en-us/sharepoint/dev/solution-guidance/security-apponly-azuread
28
29 Parameters:
30
31 tennat_id (str): The ID of the tennat account we are fetching from
32 client_id (str): The applications client_id
33 thumbprint (str): Hex encoded thumbprint of the certificate
34 private_key (str): A PEM encoded certificate private key
35 uri (str, optional): The URI which to query
36 method (str, optional): The request method to use, one of "GET" or "POST"
37 defaults to "GET"
38 data (dict, optional): The data to include in a "POST" request
39 **kwargs (optional): additional keyword arguments to pass to the Task
40 constructor
41
42 Returns:
43
44 results (dict): request response
45 """
46
47 def __init__(
48 self,
49 tennat_id: str,
50 client_id: str,
51 thumbprint: str,
52 private_key: str,
53 uri: str = None,
54 method: str = "GET",
55 data: dict = {},
56 **kwargs: Any,
57 ):
58 self.tennat_id = tennat_id
59 self.client_id = client_id
60 self.thumbprint = thumbprint
61 self.private_key = private_key
62 self.uri = uri
63 self.method = method
64 self.data = data
65 super().__init__(**kwargs)
66
67 def obtain_access_token(self) -> str:
68 """
69 A method which returns an access token used to
70 authanticate against microsoft graph API
71
72 Returns:
73
74 token (str): access token
75 """
76 scope = ["https://graph.microsoft.com/.default"]
77 private_key_details = {
78 "thumbprint": self.thumbprint,
79 "private_key": self.private_key,
80 }
81 app = msal.ConfidentialClientApplication(
82 authority=f"https://login.microsoftonline.com/{self.tennat_id}",
83 client_id=self.client_id,
84 client_credential=private_key_details,
85 )
86
87 result = app.acquire_token_silent(scopes=scope, account=None)
88
89 if not result:
90 result = app.acquire_token_for_client(scopes=scope)
91
92 return result["access_token"]
93
94 @defaults_from_attrs("uri", "method", "data")
95 def run(self, uri: str, method: str = "GET", data: dict = {}) -> Any:
96 """
97 Task run method. Executes a request against provided URI and fetches results
98
99 Parameters:
100
101 uri (str, optional): The URI which to query
102 method (str, optional): The request method to use, one of "GET" or "POST"
103 defaults to "GET"
104 data (dict, optional): The data to include in a "POST" request
105
106 Returns:
107 results (dict): request response
108 """
109
110 site_url = "https://graph.microsoft.com/v1.0"
111 full_url = "{}{}".format(site_url, uri)
112
113 if not uri:
114 raise ValueError("A URI string must be provided")
115
116 if method is "GET" and data:
117 raise ValueError("Can not add data payload to GET request")
118
119 access_token = self.obtain_access_token()
120
121 session = requests.Session()
122 session.headers.update({"Authorization": "Bearer {}".format(access_token)})
123
124 logging.debug("Querying: %s", full_url)
125
126 try:
127 if method is "GET":
128 resp = session.get(full_url)
129
130 if method is "POST":
131 data = json.dumps(data)
132 resp = session.post(full_url, data=data)
133
134 resp.raise_for_status()
135
136 except requests.exceptions.HTTPError as e:
137 logging.critical(e.response.text)
138
139 return resp.json()
140