· 6 years ago · Nov 11, 2019, 05:06 AM
1#!/usr/bin/python
2
3import argparse
4import os
5import sys
6
7from OTXv2 import OTXv2
8import IndicatorTypes
9
10
11class SuricataClient(object):
12 ip_rule_template = "alert ip $HOME_NET any -> any any (msg:\"OTX internal host talking to host known in pulse\"; flow:to_server; iprep:dst,Pulse,>,30; sid:41414141; rev:1;)\n"
13 ip_category_template = "41,Pulse,OTX community identified IP address\n"
14 ip_rep_template = "{ip},41,127\n"
15
16 def __init__(self, api_key):
17 self.client = OTXv2(api_key=api_key, project="Suricata")
18 self.iprep_dir = "/etc/suricata/iprep"
19 self.rules_dir = "/etc/suricata/rules"
20
21 def get_destination(self, param, dir_type):
22 return open(self.get_path(param, dir_type), mode='w')
23
24 def get_path(self, param, dir_type):
25 if dir_type == "iprep":
26 return os.path.join(self.iprep_dir, param)
27 if dir_type == "rules":
28 return os.path.join(self.rules_dir, param)
29
30 def generate_rules(self):
31 with self.get_destination('reputation.list', 'iprep') as rep_file:
32 ip_count = 0
33 for pulse in self.client.getall_iter():
34 ip_list = []
35 for indicator in pulse["indicators"]:
36 type_ = indicator["type"]
37 if type_ in [IndicatorTypes.IPv4.name, IndicatorTypes.IPv6.name]:
38 ip_list.append(indicator["indicator"])
39 if len(ip_list) > 0:
40 self.add_iprep(rep_file, ip_list)
41 ip_count += len(ip_list)
42 if ip_count is not 0:
43 self.write_core_iprep_files()
44 sys.stdout.write("Wrote related iprep rules to {}\n".format('otx_iprep.rules'))
45 sys.stdout.write("Wrote {0} IPv4 & IPv6 to {1}\n".format(str(ip_count), rep_file.name))
46 sys.stdout.write("========== Start YAML Snippet ==========\n")
47 sys.stdout.write("reputation-categories-file: {}\n".format(self.get_path('categories.txt', 'iprep')))
48 sys.stdout.write("default-reputation-path: {}\n".format(self.iprep_dir))
49 sys.stdout.write("reputation-files:\n")
50 sys.stdout.write(" - reputation.list\n")
51 sys.stdout.write("rule-files:\n")
52 sys.stdout.write(" - {}\n".format(self.get_path('otx_iprep.rules', 'rules')))
53 sys.stdout.write("========== End YAML Snippet ==========\n")
54 else:
55 sys.stdout.write("No files written. No IP Addresses obtained from pulses.\n")
56
57 def add_iprep(self, rep_file, ip_list):
58 for ip in ip_list:
59 rep_file.write(SuricataClient.ip_rep_template.format(ip=ip))
60
61 def write_core_iprep_files(self):
62 with self.get_destination('categories.txt', 'iprep') as file:
63 file.write(SuricataClient.ip_category_template)
64 with self.get_destination('otx_iprep.rules', 'rules') as file:
65 file.write(SuricataClient.ip_rule_template)
66
67
68def getArgs():
69 parser = argparse.ArgumentParser()
70 parser.add_argument("--key", required=True, help="Your OTX API key (https://otx.alienvault.com/api)")
71 return parser.parse_args()
72
73
74if __name__ == '__main__':
75 print(sys.argv)
76 args = getArgs()
77 sclient = SuricataClient(args.key)
78 sclient.generate_rules()