· 5 years ago · Oct 19, 2020, 01:38 PM
1'''
2Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
4Licensed under the Amazon Software License (the "License"). You may not use this file except in compliance with the License. A copy of the License is located at
5
6 http://aws.amazon.com/asl/
7
8or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions and limitations under the License.
9'''
10
11import json
12import urllib
13import boto3
14import gzip
15import datetime
16import time
17import math
18
19print('Loading function')
20
21#======================================================================================================================
22# Contants
23#======================================================================================================================
24# Configurables
25OUTPUT_BUCKET = None
26IP_SET_ID_MANUAL_BLOCK = None
27IP_SET_ID_AUTO_BLOCK = None
28IP_SET_ID_AUTO_COUNT = None
29
30BLACKLIST_BLOCK_PERIOD = 240 # in minutes
31BLACKLIST_COUNT_PERIOD = 240 # in minutes
32REQUEST_PER_MINUTE_LIMIT = 400
33
34LIMIT_IP_ADDRESS_RANGES_PER_IP_MATCH_CONDITION = 1000
35API_CALL_NUM_RETRIES = 3
36
37OUTPUT_FILE_NAME = 'current_outstanding_requesters.json'
38
39# Fixed
40LINE_FORMAT = {
41 'date': 0,
42 'time' : 1,
43 'source_ip' : 4
44}
45
46#======================================================================================================================
47# Auxiliary Functions
48#======================================================================================================================
49def get_outstanding_requesters(bucket_name, key_name):
50 print '[get_outstanding_requesters] Start'
51
52 outstanding_requesters = {}
53 outstanding_requesters['block'] = {}
54 outstanding_requesters['count'] = {}
55 result = {}
56 num_requests = 0
57 try:
58 #--------------------------------------------------------------------------------------------------------------
59 print '[get_outstanding_requesters] \tDownload file from S3'
60 #--------------------------------------------------------------------------------------------------------------
61 local_file_path = '/tmp/' + key_name.split('/')[-1]
62 s3 = boto3.client('s3')
63 s3.download_file(bucket_name, key_name, local_file_path)
64
65 #--------------------------------------------------------------------------------------------------------------
66 print '[get_outstanding_requesters] \tRead file contet'
67 #--------------------------------------------------------------------------------------------------------------
68 with gzip.open(local_file_path,'r') as content:
69 for line in content:
70 try:
71 if line.startswith('#'):
72 continue
73
74 line_data = line.split('\t')
75 request_key = line_data[LINE_FORMAT['date']]
76 request_key += '-' + line_data[LINE_FORMAT['time']][:-3]
77 request_key += '-' + line_data[LINE_FORMAT['source_ip']]
78 if request_key in result.keys():
79 result[request_key] += 1
80 else:
81 result[request_key] = 1
82
83 num_requests += 1
84
85 except Exception, e:
86 print ("[get_outstanding_requesters] \t\tError to process line: %s"%line)
87
88 #--------------------------------------------------------------------------------------------------------------
89 print '[get_outstanding_requesters] \tKeep only outstanding requesters'
90 #--------------------------------------------------------------------------------------------------------------
91 now_timestamp_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
92 for k, v in result.iteritems():
93 k = k.split('-')[-1]
94 if v > REQUEST_PER_MINUTE_LIMIT:
95 if k not in outstanding_requesters['block'].keys() or outstanding_requesters['block'][k] < v:
96 outstanding_requesters['block'][k] = { 'max_req_per_min': v, 'updated_at': now_timestamp_str }
97
98 except Exception, e:
99 print "[get_outstanding_requesters] \tError to read input file"
100
101 print '[get_outstanding_requesters] End'
102 return outstanding_requesters, num_requests
103
104def merge_current_blocked_requesters(key_name, outstanding_requesters):
105 print "[merge_current_blocked_requesters] Start"
106
107 try:
108 now_timestamp = datetime.datetime.now()
109 now_timestamp_str = now_timestamp.strftime("%Y-%m-%d %H:%M:%S")
110 remote_outstanding_requesters = {}
111
112 #--------------------------------------------------------------------------------------------------------------
113 print "[merge_current_blocked_requesters] \tDownload current blocked IPs"
114 #--------------------------------------------------------------------------------------------------------------
115 try:
116 local_file_path = '/tmp/' + key_name.split('/')[-1] + '_REMOTE.json'
117 s3 = boto3.client('s3')
118 s3.download_file(OUTPUT_BUCKET, OUTPUT_FILE_NAME, local_file_path)
119
120 with open(local_file_path, 'r') as file_content:
121 remote_outstanding_requesters = json.loads(file_content.read())
122
123 except Exception, e:
124 print "[merge_current_blocked_requesters] \t\tFailed to download current blocked IPs"
125
126 #--------------------------------------------------------------------------------------------------------------
127 print "[merge_current_blocked_requesters] \tExpire Block IP rules"
128 #--------------------------------------------------------------------------------------------------------------
129 for k, v in remote_outstanding_requesters['block'].iteritems():
130 if v['max_req_per_min'] > REQUEST_PER_MINUTE_LIMIT:
131 if k in outstanding_requesters['block'].keys():
132 print "[merge_current_blocked_requesters] \t\tUpdating data of BLOCK %s rule"%k
133 max_v = v['max_req_per_min']
134 if outstanding_requesters['block'][k]['max_req_per_min'] > max_v:
135 max_v = outstanding_requesters['block'][k]['max_req_per_min']
136 outstanding_requesters['block'][k] = { 'max_req_per_min': max_v, 'updated_at': now_timestamp_str }
137 else:
138 prev_updated_at = datetime.datetime.strptime(v['updated_at'], "%Y-%m-%d %H:%M:%S")
139 total_diff_min = ((now_timestamp - prev_updated_at).total_seconds())/60
140 if total_diff_min > (BLACKLIST_BLOCK_PERIOD + BLACKLIST_COUNT_PERIOD):
141 print "[merge_current_blocked_requesters] \t\tExpired BLOCK and COUNT %s rule"%k
142 elif total_diff_min > (BLACKLIST_BLOCK_PERIOD):
143 print "[merge_current_blocked_requesters] \t\tExpired BLOCK %s rule"%k
144 outstanding_requesters['count'][k] = v
145 else:
146 print "[merge_current_blocked_requesters] \t\tKeeping data of BLOCK %s rule"%k
147 outstanding_requesters['block'][k] = v
148
149 #--------------------------------------------------------------------------------------------------------------
150 print "[merge_current_blocked_requesters] \tExpire Count IP rules"
151 #--------------------------------------------------------------------------------------------------------------
152 for k, v in remote_outstanding_requesters['count'].iteritems():
153 if v['max_req_per_min'] > REQUEST_PER_MINUTE_LIMIT:
154 if k in outstanding_requesters['block'].keys():
155 print "[merge_current_blocked_requesters] \t\tUpdating data of COUNT %s rule"%k
156 max_v = v['max_req_per_min']
157 if outstanding_requesters['block'][k]['max_req_per_min'] > max_v:
158 max_v = outstanding_requesters['block'][k]['max_req_per_min']
159 outstanding_requesters['block'][k] = { 'max_req_per_min': max_v, 'updated_at': now_timestamp_str }
160 else:
161 prev_updated_at = datetime.datetime.strptime(v['updated_at'], "%Y-%m-%d %H:%M:%S")
162 total_diff_min = ((now_timestamp - prev_updated_at).total_seconds())/60
163 if total_diff_min > (BLACKLIST_BLOCK_PERIOD + BLACKLIST_COUNT_PERIOD):
164 print "[merge_current_blocked_requesters] \t\tExpired COUNT %s rule"%k
165 else:
166 print "[merge_current_blocked_requesters] \t\tKeeping data of COUNT %s rule"%k
167 outstanding_requesters['count'][k] = v
168
169 except Exception, e:
170 print "[merge_current_blocked_requesters] \tError to merge data"
171
172 print "[merge_current_blocked_requesters] End"
173 return outstanding_requesters
174
175def write_output(key_name, outstanding_requesters):
176 print "[write_output] Start"
177
178 try:
179 current_data = '/tmp/' + key_name.split('/')[-1] + '_LOCAL.json'
180 with open(current_data, 'w') as outfile:
181 json.dump(outstanding_requesters, outfile)
182
183 s3 = boto3.client('s3')
184 s3.upload_file(current_data, OUTPUT_BUCKET, OUTPUT_FILE_NAME, ExtraArgs={'ContentType': "application/json"})
185
186 except Exception, e:
187 print "[write_output] \tError to write output file"
188
189 print "[write_output] End"
190
191def waf_get_ip_set(ip_set_id):
192 response = None
193 waf = boto3.client('waf')
194
195 for attempt in range(API_CALL_NUM_RETRIES):
196 try:
197 response = waf.get_ip_set(IPSetId=ip_set_id)
198 except Exception, e:
199 print e
200 delay = math.pow(2, attempt)
201 print "[waf_get_ip_set] Retrying in %d seconds..." % (delay)
202 time.sleep(delay)
203 else:
204 break
205 else:
206 print "[waf_get_ip_set] Failed ALL attempts to call API"
207
208 return response
209
210def waf_update_ip_set(ip_set_id, updates_list):
211 response = None
212
213 if updates_list != []:
214 waf = boto3.client('waf')
215 for attempt in range(API_CALL_NUM_RETRIES):
216 try:
217 response = waf.update_ip_set(IPSetId=ip_set_id,
218 ChangeToken=waf.get_change_token()['ChangeToken'],
219 Updates=updates_list)
220 except Exception, e:
221 delay = math.pow(2, attempt)
222 print "[waf_update_ip_set] Retrying in %d seconds..." % (delay)
223 time.sleep(delay)
224 else:
225 break
226 else:
227 print "[waf_update_ip_set] Failed ALL attempts to call API"
228
229 return response
230
231def get_ip_set_already_blocked():
232 print "[get_ip_set_already_blocked] Start"
233 ip_set_already_blocked = []
234 try:
235 if IP_SET_ID_MANUAL_BLOCK != None:
236 response = waf_get_ip_set(IP_SET_ID_MANUAL_BLOCK)
237 if response != None:
238 for k in response['IPSet']['IPSetDescriptors']:
239 ip_set_already_blocked.append(k['Value'])
240 except Exception, e:
241 print "[get_ip_set_already_blocked] Error to get waf ip set"
242 print e
243
244 print "[get_ip_set_already_blocked] End"
245 return ip_set_already_blocked
246
247def is_already_blocked(ip, ip_set):
248 result = False
249
250 try:
251 for net in ip_set:
252 ipaddr = int(''.join([ '%02x' % int(x) for x in ip.split('.') ]), 16)
253 netstr, bits = net.split('/')
254 netaddr = int(''.join([ '%02x' % int(x) for x in netstr.split('.') ]), 16)
255 mask = (0xffffffff << (32 - int(bits))) & 0xffffffff
256
257 if (ipaddr & mask) == (netaddr & mask):
258 result = True
259 break
260 except Exception, e:
261 pass
262
263 return result
264
265def update_waf_ip_set(outstanding_requesters, ip_set_id, ip_set_already_blocked):
266 print "[update_waf_ip_set] Start"
267
268 counter = 0
269 try:
270 if ip_set_id == None:
271 print "[update_waf_ip_set] Igone process when ip_set_id is None"
272 return
273
274 updates_list = []
275 waf = boto3.client('waf')
276
277 #--------------------------------------------------------------------------------------------------------------
278 print "[update_waf_ip_set] \tTruncate [if necessary] list to respect WAF limit"
279 #--------------------------------------------------------------------------------------------------------------
280 top_outstanding_requesters = {}
281 for key, value in sorted(outstanding_requesters.items(), key=lambda kv: kv[1]['max_req_per_min'], reverse=True):
282 if counter < LIMIT_IP_ADDRESS_RANGES_PER_IP_MATCH_CONDITION:
283 if not is_already_blocked(key, ip_set_already_blocked):
284 top_outstanding_requesters[key] = value
285 counter += 1
286 else:
287 break
288
289 #--------------------------------------------------------------------------------------------------------------
290 print "[update_waf_ip_set] \tRemove IPs that are not in current outstanding requesters list"
291 #--------------------------------------------------------------------------------------------------------------
292 response = waf_get_ip_set(ip_set_id)
293 if response != None:
294 for k in response['IPSet']['IPSetDescriptors']:
295 ip_value = k['Value'].split('/')[0]
296 if ip_value not in top_outstanding_requesters.keys():
297 updates_list.append({
298 'Action': 'DELETE',
299 'IPSetDescriptor': {
300 'Type': 'IPV4',
301 'Value': k['Value']
302 }
303 })
304 else:
305 # Dont block an already blocked IP
306 top_outstanding_requesters.pop(ip_value, None)
307
308 #--------------------------------------------------------------------------------------------------------------
309 print "[update_waf_ip_set] \tBlock remaining outstanding requesters"
310 #--------------------------------------------------------------------------------------------------------------
311 for k in top_outstanding_requesters.keys():
312 updates_list.append({
313 'Action': 'INSERT',
314 'IPSetDescriptor': {
315 'Type': 'IPV4',
316 'Value': "%s/32"%k
317 }
318 })
319
320 #--------------------------------------------------------------------------------------------------------------
321 print "[update_waf_ip_set] \tCommit changes in WAF IP set"
322 #--------------------------------------------------------------------------------------------------------------
323 response = waf_update_ip_set(ip_set_id, updates_list)
324
325 except Exception, e:
326 print "[update_waf_ip_set] Error to update waf ip set"
327 print e
328
329 print "[update_waf_ip_set] End"
330 return counter
331
332#======================================================================================================================
333# Lambda Entry Point
334#======================================================================================================================
335def lambda_handler(event, context):
336 print '[lambda_handler] Start'
337 bucket_name = event['Records'][0]['s3']['bucket']['name']
338 key_name = urllib.unquote_plus(event['Records'][0]['s3']['object']['key']).decode('utf8')
339
340 try:
341 if key_name == OUTPUT_FILE_NAME:
342 print '[lambda_handler] \tIgnore processinf output file'
343 return
344
345 #--------------------------------------------------------------------------------------------------------------
346 print "[lambda_handler] \tReading (if necessary) CloudFromation output values"
347 #--------------------------------------------------------------------------------------------------------------
348 global OUTPUT_BUCKET
349 global IP_SET_ID_MANUAL_BLOCK
350 global IP_SET_ID_AUTO_BLOCK
351 global IP_SET_ID_AUTO_COUNT
352 global BLACKLIST_BLOCK_PERIOD
353 global BLACKLIST_COUNT_PERIOD
354 global REQUEST_PER_MINUTE_LIMIT
355
356 if (OUTPUT_BUCKET == None or IP_SET_ID_MANUAL_BLOCK == None or
357 IP_SET_ID_AUTO_BLOCK == None or IP_SET_ID_AUTO_COUNT == None or
358 BLACKLIST_BLOCK_PERIOD == None or BLACKLIST_COUNT_PERIOD == None or
359 REQUEST_PER_MINUTE_LIMIT == None):
360
361 outputs = {}
362 cf = boto3.client('cloudformation')
363 stack_name = context.invoked_function_arn.split(':')[6].rsplit('-', 2)[0]
364 response = cf.describe_stacks(StackName=stack_name)
365 for e in response['Stacks'][0]['Outputs']:
366 outputs[e['OutputKey']] = e['OutputValue']
367
368 if OUTPUT_BUCKET == None:
369 if 'CloudFrontAccessLogBucket' in outputs.keys():
370 OUTPUT_BUCKET = outputs['CloudFrontAccessLogBucket']
371 else:
372 OUTPUT_BUCKET = bucket_name
373 if IP_SET_ID_MANUAL_BLOCK == None:
374 IP_SET_ID_MANUAL_BLOCK = outputs['ManualBlockIPSetID']
375 if IP_SET_ID_AUTO_BLOCK == None:
376 IP_SET_ID_AUTO_BLOCK = outputs['AutoBlockIPSetID']
377 if IP_SET_ID_AUTO_COUNT == None:
378 IP_SET_ID_AUTO_COUNT = outputs['AutoCountIPSetID']
379 if BLACKLIST_BLOCK_PERIOD == None:
380 BLACKLIST_BLOCK_PERIOD = int(outputs['WAFBlockPeriod']) # in seconds
381 if BLACKLIST_COUNT_PERIOD == None:
382 BLACKLIST_COUNT_PERIOD = int(outputs['WAFQuarantinePeriod']) # in seconds
383 if REQUEST_PER_MINUTE_LIMIT == None:
384 REQUEST_PER_MINUTE_LIMIT = int(outputs['RequestThreshold'])
385
386 print "[lambda_handler] \t\tOUTPUT_BUCKET = %s"%OUTPUT_BUCKET
387 print "[lambda_handler] \t\tIP_SET_ID_MANUAL_BLOCK = %s"%IP_SET_ID_MANUAL_BLOCK
388 print "[lambda_handler] \t\tIP_SET_ID_AUTO_BLOCK = %s"%IP_SET_ID_AUTO_BLOCK
389 print "[lambda_handler] \t\tIP_SET_ID_AUTO_COUNT = %s"%IP_SET_ID_AUTO_COUNT
390 print "[lambda_handler] \t\tBLACKLIST_BLOCK_PERIOD = %d"%BLACKLIST_BLOCK_PERIOD
391 print "[lambda_handler] \t\tBLACKLIST_COUNT_PERIOD = %d"%BLACKLIST_COUNT_PERIOD
392 print "[lambda_handler] \t\tREQUEST_PER_MINUTE_LIMIT = %d"%REQUEST_PER_MINUTE_LIMIT
393
394 #--------------------------------------------------------------------------------------------------------------
395 print "[lambda_handler] \tReading input data and get outstanding requesters"
396 #--------------------------------------------------------------------------------------------------------------
397 outstanding_requesters, num_requests = get_outstanding_requesters(bucket_name, key_name)
398
399 #--------------------------------------------------------------------------------------------------------------
400 print "[lambda_handler] \tMerge with current blocked requesters"
401 #--------------------------------------------------------------------------------------------------------------
402 outstanding_requesters = merge_current_blocked_requesters(key_name, outstanding_requesters)
403
404 #--------------------------------------------------------------------------------------------------------------
405 print "[lambda_handler] \tUpdate new blocked requesters list to S3s"
406 #--------------------------------------------------------------------------------------------------------------
407 write_output(key_name, outstanding_requesters)
408
409 #--------------------------------------------------------------------------------------------------------------
410 print "[lambda_handler] \tUpdate WAF IP Set"
411 #--------------------------------------------------------------------------------------------------------------
412 ip_set_already_blocked = get_ip_set_already_blocked()
413 num_blocked = update_waf_ip_set(outstanding_requesters['block'], IP_SET_ID_AUTO_BLOCK, ip_set_already_blocked)
414 num_quarantined = update_waf_ip_set(outstanding_requesters['count'], IP_SET_ID_AUTO_COUNT, ip_set_already_blocked)
415
416 cw = boto3.client('cloudwatch')
417 response = cw.put_metric_data(
418 Namespace='WAFReactiveBlacklist-%s'%OUTPUT_BUCKET,
419 MetricData=[
420 {
421 'MetricName': 'IPBlocked',
422 'Timestamp': datetime.datetime.now(),
423 'Value': num_blocked,
424 'Unit': 'Count'
425 },
426 {
427 'MetricName': 'IPQuarantined',
428 'Timestamp': datetime.datetime.now(),
429 'Value': num_quarantined,
430 'Unit': 'Count'
431 },
432 {
433 'MetricName': 'NumRequests',
434 'Timestamp': datetime.datetime.now(),
435 'Value': num_requests,
436 'Unit': 'Count'
437 }
438 ]
439 )
440
441 return outstanding_requesters
442 except Exception as e:
443 raise e
444 print '[main] End'
445