· 6 years ago · Nov 15, 2019, 09:20 AM
1'''
2Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
4Licensed under the Amazon Software License (the "License"). You may not use this file except in compliance with the License. A copy of the License is located at
5
6 http://aws.amazon.com/asl/
7
8or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions and limitations under the License.
9'''
10
11import json
12import urllib
13import boto3
14import gzip
15import datetime
16import time
17import math
18from os import environ
19
20from urllib2 import Request
21from urllib2 import urlopen
22
23print("Loading function")
24
25#======================================================================================================================
26# Constants
27#======================================================================================================================
28API_CALL_NUM_RETRIES = 3
29BLOCK_ERROR_CODES = ['400','403','404','405'] # error codes to parse logs for
30OUTPUT_FILE_NAME = 'aws-waf-security-automations-current-blocked-ips.json'
31
32# CloudFront Access Logs
33# http://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/AccessLogs.html#BasicDistributionFileFormat
34LINE_FORMAT_CLOUD_FRONT = {
35 'delimiter': '\t',
36 'date': 0,
37 'time' : 1,
38 'source_ip' : 4,
39 'code' : 8
40}
41# ALB Access Logs
42# http://docs.aws.amazon.com/elasticloadbalancing/latest/application/load-balancer-access-logs.html
43LINE_FORMAT_ALB = {
44 'delimiter': ' ',
45 'timestamp': 1,
46 'source_ip' : 3,
47 'code' : 8
48}
49
50
51REQUEST_COUNTER_INDEX = 0
52ERROR_COUNTER_INDEX = 1
53
54waf = None
55
56#======================================================================================================================
57# Auxiliary Functions
58#======================================================================================================================
59def get_outstanding_requesters(bucket_name, key_name):
60 print("[get_outstanding_requesters] Start")
61
62 outstanding_requesters = {}
63 outstanding_requesters['block'] = {}
64 result = {}
65 num_requests = 0
66 try:
67 if int(environ['ERROR_PER_MINUTE_LIMIT']) < 0:
68 return outstanding_requesters, num_requests
69
70 #--------------------------------------------------------------------------------------------------------------
71 print("[get_outstanding_requesters] \tDownload file from S3")
72 #--------------------------------------------------------------------------------------------------------------
73 local_file_path = '/tmp/' + key_name.split('/')[-1]
74 s3 = boto3.client('s3')
75 s3.download_file(bucket_name, key_name, local_file_path)
76
77 #--------------------------------------------------------------------------------------------------------------
78 print("[get_outstanding_requesters] \tRead file content")
79 #--------------------------------------------------------------------------------------------------------------
80 with gzip.open(local_file_path,'r') as content:
81 for line in content:
82 try:
83 if line.startswith('#'):
84 continue
85
86 return_code_index = None
87 if environ['LOG_TYPE'] == 'cloudfront':
88 line_data = line.split(LINE_FORMAT_CLOUD_FRONT['delimiter'])
89 request_key = line_data[LINE_FORMAT_CLOUD_FRONT['date']]
90 request_key += '-' + line_data[LINE_FORMAT_CLOUD_FRONT['time']][:-3]
91 request_key += '-' + line_data[LINE_FORMAT_CLOUD_FRONT['source_ip']]
92 return_code_index = LINE_FORMAT_CLOUD_FRONT['code']
93 elif environ['LOG_TYPE'] == 'alb':
94 line_data = line.split(LINE_FORMAT_ALB['delimiter'])
95 #print("Line Data:::::::::")
96 #print(line_data)
97 request_key = line_data[LINE_FORMAT_ALB['timestamp']].rsplit(':', 1)[0]
98 request_key += '-' + line_data[LINE_FORMAT_ALB['source_ip']].split(':')[0]
99 #print(request_key)
100 return_code_index = LINE_FORMAT_ALB['code']
101 else:
102 return outstanding_requesters, num_requests
103
104 if request_key in result.keys():
105 result[request_key][REQUEST_COUNTER_INDEX] += 1
106 else:
107 result[request_key] = [1,0]
108
109 if line_data[return_code_index] in BLOCK_ERROR_CODES:
110 result[request_key][ERROR_COUNTER_INDEX] += 1
111
112 num_requests += 1
113
114 except Exception, e:
115 print ("[get_outstanding_requesters] \t\tError to process line: %s"%line)
116
117 #--------------------------------------------------------------------------------------------------------------
118 print("[get_outstanding_requesters] \tKeep only outstanding requesters")
119 #--------------------------------------------------------------------------------------------------------------
120 print(":::::::::::::::::::::::::::")
121 print(result)
122 now_timestamp_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
123 for k, v in result.iteritems():
124 k = k.split('-')[-1]
125 if int(environ['REQUEST_PER_MINUTE_LIMIT']) >= 0 and v[REQUEST_COUNTER_INDEX] >= int(environ['REQUEST_PER_MINUTE_LIMIT']):
126 if k not in outstanding_requesters['block'].keys() or (
127 outstanding_requesters['block'][k]['max_req_per_min'] < v[REQUEST_COUNTER_INDEX] or
128 outstanding_requesters['block'][k]['max_err_per_min'] < v[ERROR_COUNTER_INDEX]
129 ):
130 outstanding_requesters['block'][k] = {
131 'max_req_per_min': v[REQUEST_COUNTER_INDEX],
132 'max_err_per_min': v[ERROR_COUNTER_INDEX],
133 'updated_at': now_timestamp_str
134 }
135
136 except Exception, e:
137 print("[get_outstanding_requesters] \tError to read input file")
138
139 print("[get_outstanding_requesters] End")
140 return outstanding_requesters, num_requests
141
142def merge_current_blocked_requesters(key_name, outstanding_requesters):
143 print("[merge_current_blocked_requesters] Start")
144
145 expired = False
146 last_update_age = 0
147
148 try:
149 remote_outstanding_requesters = {}
150
151 #--------------------------------------------------------------------------------------------------------------
152 print("[merge_current_blocked_requesters] \tCalculate Last Update Age")
153 #--------------------------------------------------------------------------------------------------------------
154 s3 = boto3.client('s3')
155 local_file_path = '/tmp/' + key_name.split('/')[-1] + '_REMOTE.json'
156 response = s3.head_object(Bucket=environ['OUTPUT_BUCKET'], Key=OUTPUT_FILE_NAME)
157 now_timestamp = datetime.datetime.now(response['LastModified'].tzinfo)
158 now_timestamp_str = now_timestamp.strftime("%Y-%m-%d %H:%M:%S")
159 last_update_age = int(((now_timestamp - response['LastModified']).total_seconds())/60)
160
161 #--------------------------------------------------------------------------------------------------------------
162 print("[merge_current_blocked_requesters] \tDownload current blocked IPs")
163 #--------------------------------------------------------------------------------------------------------------
164 s3.download_file(environ['OUTPUT_BUCKET'], OUTPUT_FILE_NAME, local_file_path)
165
166 with open(local_file_path, 'r') as file_content:
167 remote_outstanding_requesters = json.loads(file_content.read())
168
169 #----------------------------------------------------------------------------------------------------------
170 print("[merge_current_blocked_requesters] \tExpire Block IP rules")
171 #----------------------------------------------------------------------------------------------------------
172 for k, v in remote_outstanding_requesters['block'].iteritems():
173 try:
174 if int(environ['REQUEST_PER_MINUTE_LIMIT']) >= 0 and v['max_req_per_min'] >= int(environ['REQUEST_PER_MINUTE_LIMIT']):
175 if k in outstanding_requesters['block'].keys():
176 print("[merge_current_blocked_requesters] \t\tUpdating data of BLOCK %s rule"%k)
177 outstanding_requesters['block'][k]['updated_at'] = now_timestamp_str
178 if v['max_req_per_min'] > outstanding_requesters['block'][k]['max_req_per_min']:
179 outstanding_requesters['block'][k]['max_req_per_min'] = v['max_req_per_min']
180 if v['max_err_per_min'] > outstanding_requesters['block'][k]['max_err_per_min']:
181 outstanding_requesters['block'][k]['max_err_per_min'] = v['max_err_per_min']
182
183 else:
184 prev_updated_at = datetime.datetime.strptime(v['updated_at'], "%Y-%m-%d %H:%M:%S")
185 prev_updated_at = prev_updated_at.replace(tzinfo=response['LastModified'].tzinfo)
186 total_diff_min = ((now_timestamp - prev_updated_at).total_seconds())/60
187 if total_diff_min < int(environ['BLACKLIST_BLOCK_PERIOD']):
188 print("[merge_current_blocked_requesters] \t\tKeeping %s rule"%k)
189 outstanding_requesters['block'][k] = v
190 else:
191 expired = True
192 print("[merge_current_blocked_requesters] \t\tExpired %s rule"%k)
193
194 except Exception, e:
195 print("[merge_current_blocked_requesters] \tError merging %s rule"%k)
196 print(e)
197
198 except Exception, e:
199 print("[merge_current_blocked_requesters] \tError merging data")
200 print(e)
201
202 need_update = (expired or last_update_age > int(environ['MAX_AGE_TO_UPDATE']) or len(outstanding_requesters['block']) > 0)
203
204 print("[merge_current_blocked_requesters] End")
205 return outstanding_requesters, need_update
206
207def write_output(key_name, outstanding_requesters):
208 print("[write_output] Start")
209
210 try:
211 current_data = '/tmp/' + key_name.split('/')[-1] + '_LOCAL.json'
212 with open(current_data, 'w') as outfile:
213 json.dump(outstanding_requesters, outfile)
214
215 s3 = boto3.client('s3')
216 s3.upload_file(current_data, environ['OUTPUT_BUCKET'], OUTPUT_FILE_NAME, ExtraArgs={'ContentType': "application/json"})
217
218 except Exception, e:
219 print("[write_output] \tError to write output file")
220
221 print("[write_output] End")
222
223def waf_get_ip_set(ip_set_id):
224 response = None
225
226 for attempt in range(API_CALL_NUM_RETRIES):
227 try:
228 response = waf.get_ip_set(IPSetId=ip_set_id)
229 except Exception, e:
230 print(e)
231 delay = math.pow(2, attempt)
232 print("[waf_get_ip_set] Retrying in %d seconds..." % (delay))
233 time.sleep(delay)
234 else:
235 break
236 else:
237 print("[waf_get_ip_set] Failed ALL attempts to call API")
238
239 return response
240
241def waf_update_ip_set(ip_set_id, updates_list):
242 response = None
243
244 if updates_list != []:
245 for attempt in range(API_CALL_NUM_RETRIES):
246 try:
247 response = waf.update_ip_set(IPSetId=ip_set_id,
248 ChangeToken=waf.get_change_token()['ChangeToken'],
249 Updates=updates_list)
250 except Exception, e:
251 delay = math.pow(2, attempt)
252 print("[waf_update_ip_set] Retrying in %d seconds..." % (delay))
253 time.sleep(delay)
254 else:
255 break
256 else:
257 print("[waf_update_ip_set] Failed ALL attempts to call API")
258
259 return response
260
261def get_ip_set_already_blocked():
262 print("[get_ip_set_already_blocked] Start")
263 ip_set_already_blocked = []
264 try:
265 if environ['IP_SET_ID_BLACKLIST'] != None:
266 response = waf_get_ip_set(environ['IP_SET_ID_BLACKLIST'])
267 if response != None:
268 for k in response['IPSet']['IPSetDescriptors']:
269 ip_set_already_blocked.append(k['Value'])
270 except Exception, e:
271 print("[get_ip_set_already_blocked] Error getting WAF IP set")
272 print(e)
273
274 print("[get_ip_set_already_blocked] End")
275 return ip_set_already_blocked
276
277def is_already_blocked(ip, ip_set):
278 result = False
279
280 try:
281 for net in ip_set:
282 ipaddr = int(''.join([ '%02x' % int(x) for x in ip.split('.') ]), 16)
283 netstr, bits = net.split('/')
284 netaddr = int(''.join([ '%02x' % int(x) for x in netstr.split('.') ]), 16)
285 mask = (0xffffffff << (32 - int(bits))) & 0xffffffff
286
287 if (ipaddr & mask) == (netaddr & mask):
288 result = True
289 break
290 except Exception, e:
291 pass
292
293 return result
294
295def update_waf_ip_set(outstanding_requesters, ip_set_id, ip_set_already_blocked):
296 print("[update_waf_ip_set] Start")
297
298 counter = 0
299 try:
300 if ip_set_id == None:
301 print("[update_waf_ip_set] Ignore process when ip_set_id is None")
302 return
303
304 updates_list = []
305
306 #--------------------------------------------------------------------------------------------------------------
307 print("[update_waf_ip_set] \tTruncate [if necessary] list to respect WAF limit")
308 #--------------------------------------------------------------------------------------------------------------
309 top_outstanding_requesters = {}
310 for key, value in sorted(outstanding_requesters.items(), key=lambda kv: kv[1]['max_req_per_min'], reverse=True):
311 if counter < int(environ['LIMIT_IP_ADDRESS_RANGES_PER_IP_MATCH_CONDITION']):
312 if not is_already_blocked(key, ip_set_already_blocked):
313 top_outstanding_requesters[key] = value
314 counter += 1
315 else:
316 break
317
318 #--------------------------------------------------------------------------------------------------------------
319 print("[update_waf_ip_set] \tRemove IPs that are not in current outstanding requesters list")
320 #--------------------------------------------------------------------------------------------------------------
321 response = waf_get_ip_set(ip_set_id)
322 if response != None:
323 for k in response['IPSet']['IPSetDescriptors']:
324 ip_value = k['Value'].split('/')[0]
325 if ip_value not in top_outstanding_requesters.keys():
326 updates_list.append({
327 'Action': 'DELETE',
328 'IPSetDescriptor': {
329 'Type': 'IPV4',
330 'Value': k['Value']
331 }
332 })
333 else:
334 # Dont block an already blocked IP
335 top_outstanding_requesters.pop(ip_value, None)
336
337 #--------------------------------------------------------------------------------------------------------------
338 print("[update_waf_ip_set] \tBlock remaining outstanding requesters")
339 #--------------------------------------------------------------------------------------------------------------
340 for k in top_outstanding_requesters.keys():
341 updates_list.append({
342 'Action': 'INSERT',
343 'IPSetDescriptor': {
344 'Type': 'IPV4',
345 'Value': "%s/32"%k
346 }
347 })
348
349 #--------------------------------------------------------------------------------------------------------------
350 print("[update_waf_ip_set] \tCommit changes in WAF IP set")
351 #--------------------------------------------------------------------------------------------------------------
352 response = waf_update_ip_set(ip_set_id, updates_list)
353
354 except Exception, e:
355 print("[update_waf_ip_set] Error to update waf ip set")
356 print(e)
357
358 print("[update_waf_ip_set] End")
359 return counter
360
361def send_anonymous_usage_data():
362 if environ['SEND_ANONYMOUS_USAGE_DATA'] != 'yes':
363 return
364
365 try:
366 print("[send_anonymous_usage_data] Start")
367 auto_block_ip_set_size = 0
368 blacklist_set_size = 0
369 allowed_requests = 0
370 blocked_requests_all = 0
371 blocked_requests_auto_block = 0
372 blocked_requests_blacklist = 0
373
374 #--------------------------------------------------------------------------------------------------------------
375 print("[send_anonymous_usage_data] Get Auto Block IP Set Size")
376 #--------------------------------------------------------------------------------------------------------------
377 response = waf_get_ip_set(environ['IP_SET_ID_AUTO_BLOCK'])
378 if response != None:
379 auto_block_ip_set_size = len(response['IPSet']['IPSetDescriptors'])
380
381 #--------------------------------------------------------------------------------------------------------------
382 print("[send_anonymous_usage_data] Get Blacklist IP Set Size")
383 #--------------------------------------------------------------------------------------------------------------
384 response = waf_get_ip_set(environ['IP_SET_ID_BLACKLIST'])
385 if response != None:
386 blacklist_set_size = len(response['IPSet']['IPSetDescriptors'])
387
388 #--------------------------------------------------------------------------------------------------------------
389 print("[send_anonymous_usage_data] Get Num Allowed Requests")
390 #--------------------------------------------------------------------------------------------------------------
391 try:
392 cw = boto3.client('cloudwatch')
393 response = cw.get_metric_statistics(
394 MetricName='AllowedRequests',
395 Namespace='WAF',
396 Statistics=['Sum'],
397 Period=12*3600,
398 StartTime=datetime.datetime.utcnow() - datetime.timedelta(seconds=12*3600),
399 EndTime=datetime.datetime.utcnow(),
400 Dimensions=[
401 {
402 "Name": "Rule",
403 "Value": "ALL"
404 },
405 {
406 "Name": "WebACL",
407 "Value": "SecurityAutomationsMaliciousRequesters"
408 }
409 ]
410 )
411 allowed_requests = response['Datapoints'][0]['Sum']
412 except Exception, e:
413 print("[send_anonymous_usage_data] Error to get Num Allowed Requests")
414
415 #--------------------------------------------------------------------------------------------------------------
416 print("[send_anonymous_usage_data] Get Num Blocked Requests - All Rules")
417 #--------------------------------------------------------------------------------------------------------------
418 try:
419 cw = boto3.client('cloudwatch')
420 response = cw.get_metric_statistics(
421 MetricName='BlockedRequests',
422 Namespace='WAF',
423 Statistics=['Sum'],
424 Period=12*3600,
425 StartTime=datetime.datetime.utcnow() - datetime.timedelta(seconds=12*3600),
426 EndTime=datetime.datetime.utcnow(),
427 Dimensions=[
428 {
429 "Name": "Rule",
430 "Value": "ALL"
431 },
432 {
433 "Name": "WebACL",
434 "Value": "SecurityAutomationsMaliciousRequesters"
435 }
436 ]
437 )
438 blocked_requests_all = response['Datapoints'][0]['Sum']
439 except Exception, e:
440 print("[send_anonymous_usage_data] Error to get Num Blocked Requests")
441
442 #--------------------------------------------------------------------------------------------------------------
443 print("[send_anonymous_usage_data] Get Num Blocked Requests - Auto Block Rule")
444 #--------------------------------------------------------------------------------------------------------------
445 try:
446 cw = boto3.client('cloudwatch')
447 response = cw.get_metric_statistics(
448 MetricName='BlockedRequests',
449 Namespace='WAF',
450 Statistics=['Sum'],
451 Period=12*3600,
452 StartTime=datetime.datetime.utcnow() - datetime.timedelta(seconds=12*3600),
453 EndTime=datetime.datetime.utcnow(),
454 Dimensions=[
455 {
456 "Name": "Rule",
457 "Value": "SecurityAutomationsAutoBlockRule"
458 },
459 {
460 "Name": "WebACL",
461 "Value": "SecurityAutomationsMaliciousRequesters"
462 }
463 ]
464 )
465 blocked_requests_auto_block = response['Datapoints'][0]['Sum']
466 except Exception, e:
467 print("[send_anonymous_usage_data] Error to get Num Blocked Requests")
468
469 #--------------------------------------------------------------------------------------------------------------
470 print("[send_anonymous_usage_data] Get Num Blocked Requests - Blacklist Rule")
471 #--------------------------------------------------------------------------------------------------------------
472 try:
473 cw = boto3.client('cloudwatch')
474 response = cw.get_metric_statistics(
475 MetricName='BlockedRequests',
476 Namespace='WAF',
477 Statistics=['Sum'],
478 Period=12*3600,
479 StartTime=datetime.datetime.utcnow() - datetime.timedelta(seconds=12*3600),
480 EndTime=datetime.datetime.utcnow(),
481 Dimensions=[
482 {
483 "Name": "Rule",
484 "Value": "SecurityAutomationsBlacklistRule"
485 },
486 {
487 "Name": "WebACL",
488 "Value": "SecurityAutomationsMaliciousRequesters"
489 }
490 ]
491 )
492 blocked_requests_blacklist = response['Datapoints'][0]['Sum']
493 except Exception, e:
494 print("[send_anonymous_usage_data] Error to get Num Blocked Requests")
495
496 #--------------------------------------------------------------------------------------------------------------
497 print("[send_anonymous_usage_data] Send Data")
498 #--------------------------------------------------------------------------------------------------------------
499 time_now = datetime.datetime.utcnow().isoformat()
500 time_stamp = str(time_now)
501 usage_data = {
502 "Solution": "SO0006",
503 "UUID": environ['UUID'],
504 "TimeStamp": time_stamp,
505 "Data":
506 {
507 "data_type" : "http_flood_scanner_probe",
508 "blacklist_set_size" : blacklist_set_size,
509 "auto_block_ip_set_size" : auto_block_ip_set_size,
510 "allowed_requests" : allowed_requests,
511 "blocked_requests_all" : blocked_requests_all,
512 "blocked_requests_auto_block" : blocked_requests_auto_block,
513 "blocked_requests_blacklist" : blocked_requests_blacklist,
514 "waf_type" : environ['LOG_TYPE']
515 }
516 }
517
518 url = 'https://metrics.awssolutionsbuilder.com/generic'
519 data = json.dumps(usage_data)
520 headers = {'content-type': 'application/json'}
521 print("[send_anonymous_usage_data] %s"%data)
522 req = Request(url, data, headers)
523 rsp = urlopen(req)
524 content = rsp.read()
525 rspcode = rsp.getcode()
526 print('[send_anonymous_usage_data] Response Code: {}'.format(rspcode))
527 print('[send_anonymous_usage_data] Response Content: {}'.format(content))
528
529 print("[send_anonymous_usage_data] End")
530 except Exception, e:
531 print("[send_anonymous_usage_data] Failed to Send Data")
532
533#======================================================================================================================
534# Lambda Entry Point
535#======================================================================================================================
536def lambda_handler(event, context):
537 print("[lambda_handler] Start")
538
539 outstanding_requesters = {}
540
541 try:
542 bucket_name = event['Records'][0]['s3']['bucket']['name']
543 key_name = urllib.unquote_plus(event['Records'][0]['s3']['object']['key']).decode('utf8')
544
545 if key_name == OUTPUT_FILE_NAME:
546 print("[lambda_handler] \tIgnore processing output file")
547 return
548
549 global waf
550 if environ['LOG_TYPE'] == 'alb':
551 session = boto3.session.Session(region_name=environ['REGION'])
552 waf = session.client('waf-regional')
553 else:
554 waf = boto3.client('waf')
555
556 #--------------------------------------------------------------------------------------------------------------
557 print("[lambda_handler] \tReading input data and get outstanding requesters")
558 #--------------------------------------------------------------------------------------------------------------
559 outstanding_requesters, num_requests = get_outstanding_requesters(bucket_name, key_name)
560 print("[outstanding_requesters]")
561 print(outstanding_requesters)
562 print(num_requests)
563
564 #--------------------------------------------------------------------------------------------------------------
565 print("[lambda_handler] \tMerge with current blocked requesters")
566 #--------------------------------------------------------------------------------------------------------------
567 outstanding_requesters, need_update = merge_current_blocked_requesters(key_name, outstanding_requesters)
568
569 if need_update:
570 #----------------------------------------------------------------------------------------------------------
571 print("[lambda_handler] \tUpdate new blocked requesters list to S3")
572 #----------------------------------------------------------------------------------------------------------
573 write_output(key_name, outstanding_requesters)
574
575 #----------------------------------------------------------------------------------------------------------
576 print("[lambda_handler] \tUpdate WAF IP Set")
577 #----------------------------------------------------------------------------------------------------------
578 ip_set_already_blocked = get_ip_set_already_blocked()
579 num_blocked = update_waf_ip_set(outstanding_requesters['block'], environ['IP_SET_ID_AUTO_BLOCK'], ip_set_already_blocked)
580
581 send_anonymous_usage_data()
582
583 else:
584 #----------------------------------------------------------------------------------------------------------
585 print("[lambda_handler] \tNo changes identified")
586 #----------------------------------------------------------------------------------------------------------
587
588 except Exception as e:
589 raise e
590
591 print("[lambda_handler] End")
592 return outstanding_requesters