· 6 years ago · Nov 01, 2019, 04:52 PM
1import json
2import requests
3import sys
4import argparse
5import re
6from requests.packages.urllib3.exceptions import InsecureRequestWarning
7from colorama import init
8from colorama import Fore, Back, Style
9from datetime import datetime
10# disable warning HTTPS
11requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
12
13# USER API KEYS
14SPY_ON_WEB_API_KEY="P9xZyryu2Vwz"
15
16class Swamp(object):
17
18 def __init__(self, cli=False, outfile=None, api="urlscan", token=None):
19 self.cli = cli
20 self.outfile = outfile
21 self.urlscan = False
22 self.spyonweb = False
23 if isinstance(api,list):
24 api_list = api
25 elif isinstance(api,str):
26 api_list = api.split(',')
27 else:
28 raise ValueError('api must be either a string or list of strings')
29
30 if api_list[0] == "all":
31 self.urlscan = True
32 self.spyonweb = True
33 if "spyonweb" in api_list:
34 self.spyonweb = True
35 if "urlscan" in api_list:
36 self.urlscan = True
37
38 # ensure api_key is given if needed
39 if self.spyonweb:
40 # if a token is passed in, use it (allows me to test without putting my key on the internet)
41 if token != None:
42 self.api_key = token
43 # if not, and the api key is not defined, warn the user and disable spyoneweb
44 elif SPY_ON_WEB_API_KEY == "":
45 print(Fore.RED + "SpyOnWeb API is enabled and an API Key has not been supplied. Set 'SPY_ON_WEB_API_KEY' at the top of swamp.py")
46 self.spyonweb = False
47 # otherwise, use the API key
48 else:
49 self.api_key = SPY_ON_WEB_API_KEY
50
51 def run(self,id=None,url=None):
52 gid = id
53 if self.outfile != None:
54 # write date and time to file to initialize
55 with open(self.outfile,'w') as fObj:
56 dt = datetime.utcnow().strftime("%Y-%m-%d %H:%M")
57 fObj.write("{}\n".format(dt))
58
59 if gid != None:
60 urls = self.scan_gid(gid)
61 if not self.cli:
62 return urls
63
64 elif url != None:
65 gids = self.get_gids_from_url(self.handle_url_protocol(url))
66 urls = self.scan_gids(gids)
67 if not self.cli:
68 return urls
69
70 else:
71 if self.cli:
72 print(Fore.RED + "You must pass in either '-url <webpage url>' or '-id <google tracking id>'")
73 print(Style.RESET_ALL)
74 else:
75 assert False, "You must pass in either url=<webpage url string> or id=<google tracking id string>"
76
77 def show_banner(self):
78 if self.cli:
79 print()
80 print(Fore.YELLOW +
81 """
82 .d8888b. 888 888 d8888888b d8888888888b.
83 d88P Y88b888 o 888 d888888888b d8888888 Y88b
84 Y88b. 888 d8b 888 d88P88888888b.d88888888 888
85 "Y888b. 888 d888b 888 d88P 888888Y88888P888888 d88P
86 "Y88b.888d88888b888 d88P 888888 Y888P 8888888888P"
87 "88888888P Y88888 d88P 888888 Y8P 888888
88 Y88b d88P8888P Y8888 d8888888888888 " 888888
89 "Y8888P" 888P Y888d88P 888888 888888
90 """)
91
92 print()
93 print(Fore.GREEN + "An OSINT tool for Google Analytics ID Reverse lookup")
94 print(Fore.RED + "By Jake Creps | With help from Francesco Poldi, WebBreacher and Mark Ditsworth")
95 print(Fore.WHITE)
96
97 def handle_url_protocol(self,url):
98 pattern = re.compile('^http[s]?\://')
99 if pattern.match(url):
100 # input string is okay, but make sure it is valid url
101 validated_url = self.validate_url(url)
102 if not validated_url:
103 raise ValueError("{} is not a valid URL.".format(url))
104 else:
105 return validated_url
106 else:
107 if self.cli:
108 print(Fore.YELLOW + "Protocol not given. Will try HTTPS and then HTTP.")
109 # test if https will work
110 https_url = 'https://' + url
111 validated_https_url = self.validate_url(https_url)
112 if not validated_https_url:
113 # try http
114 if self.cli:
115 print(Fore.RED + "Failed.")
116 http_url = 'http://' + url
117 validated_http_url = self.validate_url(http_url)
118 if not validated_http_url:
119 raise ValueError("{} is not a valid URL".format(url))
120 else:
121 return validated_http_url
122 else:
123 return validated_https_url
124
125 def validate_url(self,url):
126 if self.cli:
127 print(Fore.GREEN + "Validating {}".format(url) + Fore.WHITE)
128 try:
129 check = requests.head(url)
130 except requests.exceptions.ConnectionError:
131 print(Fore.RED + "Unable to access {}".format(url) + Style.RESET_ALL)
132 return False
133
134 if check.status_code < 400:
135 # if redirected, return the redirected url
136 if check.status_code // 100 == 3:
137 if self.cli:
138 print(Fore.YELLOW + "Redirected to " + Fore.WHITE + "{}".format(check.headers['Location']))
139 return check.headers['Location']
140 else:
141 return url
142 else:
143 return False
144
145 def get_gids_from_url(self,url):
146 if self.cli:
147 print(Fore.GREEN + "Analyzing {}...".format(url) + Style.RESET_ALL)
148
149 if self.outfile != None:
150 with open(self.outfile,'a') as fObj:
151 fObj.write("Anlaysis for {}\n".format(url))
152
153 urlresponse = requests.get(url,verify=False)
154 gids_list = re.findall('UA\-[0-9]+\-[0-9]+',urlresponse.text)
155 # drop duplicate ids
156 gids_list = set(gids_list)
157
158 for gid in gids_list:
159 if self.cli:
160 print(Fore.GREEN + "Discovered " + Fore.YELLOW + "{}".format(gid) + Fore.GREEN + " Google Tracking ID in " + Fore.WHITE + "{}".format(url))
161 return gids_list
162
163 def scan_gids(self, ids):
164 if self.cli:
165 for _id in ids:
166 self.scan_gid(_id)
167 else:
168 urls = {}
169 for _id in ids:
170 urls[_id] = self.scan_gid(_id)
171 return urls
172
173 def query_api(self,url):
174 try:
175 # Make web request for that URL and don't verify SSL/TLS certs
176 response = requests.get(url, verify=False)
177 except Exception as e:
178 print(Fore.RED + "[ !!! ] ERROR - {}".format(str(e)))
179 sys.exit(1)
180
181 if self.cli:
182 print(Fore.YELLOW + "[+] " + Fore.RED + "Searching for associated URLs...")
183
184 return response
185
186 def query_urlscan(self, id):
187 url = 'https://urlscan.io/api/v1/search/?q={}'.format(id)
188
189 response = self.query_api(url)
190
191 # Output is already JSON so we just need to load and parse it
192 j = json.loads(response.text)
193
194 # Create an empty set to store the URLs so we only get unique ones
195 uniqueurls = set([])
196
197 # Extract every URL and add to the set
198 for entry in j['results']:
199 uniqueurls.add((entry['page']['url']))
200 return uniqueurls
201
202 # Returns a limit of 100 results
203 # ToD0: Support setting the limit
204 # ToDo: Support getting more results with iterative requests
205 # ToDo: de-duplicate results (e.g. example.com and www.example.com will be returned
206 def query_spyonweb(self,id,api_key):
207 url = 'https://api.spyonweb.com/v1/analytics/{}?access_token={}'.format(id,api_key)
208
209 # the id, less the last set of numbers, is used to get the results from the returned json
210 id_key = '-'.join(id.split('-')[:2])
211
212 response = self.query_api(url)
213
214 j = json.loads(response.text)
215 if j['status'] != "found":
216 print(Fore.RED + "Error accessing API." + Style.RESET_ALL)
217 sys.exit(1)
218 else:
219 urls = j['result']['analytics'][id_key]['items'].keys()
220 return set(urls)
221
222 def output_api_results(self, id, urls):
223 if self.cli:
224 print(Fore.YELLOW + "[+] " + Fore.RED + "Outputting discovered URLs associate to {}...".format(id))
225
226 if self.outfile != None:
227 with open(self.outfile,'a') as fObj:
228 fObj.write("Outputting discovered URLs associate to {}\n".format(id))
229
230 # Sort the set and print
231 for url in sorted(urls):
232 if self.cli:
233 print(Fore.YELLOW + '[!]' + Fore.GREEN + " URL: " + Fore.WHITE + url)
234 if self.outfile != None:
235 with open(self.outfile,'a') as fObj:
236 fObj.write("URL: {}\n".format(url))
237
238 print(Style.RESET_ALL)
239 return list(urls)
240
241 def scan_gid(self, id):
242 if self.cli:
243 print()
244 print(Fore.GREEN + "Using {} for Reverse Lookup".format(id))
245
246 URLs = {}
247 if self.spyonweb:
248 if self.cli:
249 print(Fore.GREEN + "Querying SpyOnWeb")
250 uniqueurls = self.query_spyonweb(id,self.api_key)
251 URLs['spyonweb'] = self.output_api_results(id,uniqueurls)
252
253 if self.urlscan:
254 if self.cli:
255 print(Fore.GREEN + "Querying urlscan")
256 uniqueurls = self.query_urlscan(id)
257 URLs['urlscan'] = self.output_api_results(id,uniqueurls)
258
259 return URLs
260
261
262 def url_to_domain(self,url):
263 pattern = re.compile("^http[s]?\://[^/]+")
264 domain = pattern.match(url)
265 return domain[0]
266
267 def urls_to_domains(self,url_iter):
268 domain_set = set([])
269 for url in url_iter:
270 domain_set.add((self.url_to_domain(url)))
271 return list(domain_set)
272
273if __name__ == '__main__':
274 ap = argparse.ArgumentParser(prog="swamp", usage="python %(prog)s [options]")
275 ap.add_argument('-id', help="Google Analytics ID", action="store")
276 ap.add_argument('-url', help="Website URL", action="store")
277 ap.add_argument('-o', help="Output file for results", action="store")
278 ap.add_argument('-urlscan',help="Use the urlscan API for reverse lookup", action="store_true")
279 ap.add_argument('-spyonweb',help="Use the SpyOnWeb API for reverse lookup", action="store_true")
280 ap.add_argument('-token',help="API key or token", action="store")
281 args = ap.parse_args()
282
283 # set api based on user input. defaults to urlscan
284 api_choice = []
285 if args.urlscan:
286 api_choice.append('urlscan')
287 if args.spyonweb:
288 api_choice.append('spyonweb')
289 if not args.spyonweb and not args.urlscan:
290 api_choice = "all"
291
292 SwampApp = Swamp(cli=True, outfile=args.o, api=api_choice, token=args.token)
293 SwampApp.show_banner()
294 SwampApp.run(id=args.id,url=args.url)