· 9 years ago · Jan 17, 2017, 09:38 PM
1
2#!/usr/bin/env python
3
4import sys
5from enum import Enum
6import time
7import datetime
8import socket
9import Crypto.Cipher
10import signal
11from binascii import hexlify
12import base64
13
14#!/usr/bin/env python
15
16import sys
17from enum import Enum
18import time
19import datetime
20import socket
21import Crypto.Cipher
22import signal
23from binascii import hexlify
24import base64
25
26sys.path.append("./scapy-ssl_tls/")
27import logging
28logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
29import scapy
30from scapy.all import *
31from ssl_tls import *
32import ssl_tls_crypto
33
34from pyx509.pkcs7.asn1_models.X509_certificate import Certificate
35from pyx509.pkcs7_models import X509Certificate, PublicKeyInfo, ExtendedKeyUsageExt
36from pyx509.pkcs7.asn1_models.decoder_workarounds import decode
37
38import select
39
40SOCKET_TIMEOUT = 15
41SOCKET_RECV_SIZE = 80 * 1024
42
43CON_FAIL = "con fail"
44NO_STARTTLS = "no starttls"
45NO_TLS = "no tls"
46VULN = "vuln"
47
48def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
49 import signal
50
51 class TimeoutError(Exception):
52 pass
53
54 def handler(signum, frame):
55 raise TimeoutError()
56
57 # set the timeout handler
58 signal.signal(signal.SIGALRM, handler)
59 signal.alarm(timeout_duration)
60 try:
61 result = func(*args, **kwargs)
62 except TimeoutError as exc:
63 result = default
64 finally:
65 signal.alarm(0)
66
67 return result
68
69CHALLENGE = 'a' * 16
70CLEAR_KEY = '\0' * 11
71KEY_ARGUMENT = '\0' * 8
72
73class CipherSuite(object):
74 @classmethod
75 def get_string_description(cls):
76 raise NotImplementedError()
77
78 @classmethod
79 def get_constant(cls):
80 return eval("SSLv2CipherSuite." + cls.get_string_description())
81
82 @classmethod
83 def get_client_master_key(cls, encrypted_pms):
84 raise NotImplementedError()
85
86 @classmethod
87 def verify_key(cls, connection_id, server_finished):
88 raise NotImplementedError()
89
90 @classmethod
91 def get_encrypted_pms(cls, public_key, secret_key):
92 pkcs1_pubkey = Crypto.Cipher.PKCS1_v1_5.new(public_key)
93 encrypted_pms = pkcs1_pubkey.encrypt(secret_key)
94 return encrypted_pms
95
96class RC4Export(CipherSuite):
97 SECRET_KEY = 'b' * 5
98
99 @classmethod
100 def get_string_description(cls):
101 return "RC4_128_EXPORT40_WITH_MD5"
102
103 @classmethod
104 def get_client_master_key(cls, public_key):
105 client_master_key = SSLv2ClientMasterKey(cipher_suite=cls.get_constant(),
106 encrypted_key=cls.get_encrypted_pms(public_key, cls.SECRET_KEY),
107 clear_key=CLEAR_KEY)
108 return client_master_key
109
110 @classmethod
111 def verify_key(cls, connection_id, server_finished):
112 md5 = MD5.new(CLEAR_KEY + cls.SECRET_KEY + '0' + CHALLENGE + connection_id).digest()
113 rc4 = Crypto.Cipher.ARC4.new(md5)
114 if not rc4.decrypt(server_finished[2:]).endswith(CHALLENGE):
115 return False
116 return True
117
118class RC4(CipherSuite):
119 SECRET_KEY = 'b' * 16
120 CLEAR_KEY = 'a' * 15
121
122 @classmethod
123 def get_string_description(cls):
124 return "RC4_128_WITH_MD5"
125
126 @classmethod
127 def get_client_master_key(cls, public_key):
128 client_master_key = SSLv2ClientMasterKey(cipher_suite=cls.get_constant(),
129 encrypted_key=cls.get_encrypted_pms(public_key, cls.SECRET_KEY),
130 clear_key=cls.CLEAR_KEY)
131 return client_master_key
132
133 @classmethod
134 def verify_key(cls, connection_id, server_finished):
135 md5 = MD5.new((cls.CLEAR_KEY + cls.SECRET_KEY)[:16] + '0' + CHALLENGE + connection_id).digest()
136 rc4 = Crypto.Cipher.ARC4.new(md5)
137 if not rc4.decrypt(server_finished[2:]).endswith(CHALLENGE):
138 return False
139 return True
140
141class RC2Export(CipherSuite):
142 SECRET_KEY = 'b' * 5
143
144 @classmethod
145 def get_string_description(cls):
146 return "RC2_128_CBC_EXPORT40_WITH_MD5"
147
148 @classmethod
149 def get_client_master_key(cls, public_key):
150 client_master_key = SSLv2ClientMasterKey(cipher_suite=cls.get_constant(),
151 encrypted_key=cls.get_encrypted_pms(public_key, cls.SECRET_KEY),
152 key_argument=KEY_ARGUMENT,
153 clear_key=CLEAR_KEY)
154 return client_master_key
155
156 @classmethod
157 def verify_key(cls, connection_id, server_finished):
158 md5 = MD5.new(CLEAR_KEY + cls.SECRET_KEY + '0' + CHALLENGE + connection_id).digest()
159 rc2 = Crypto.Cipher.ARC2.new(md5, mode=Crypto.Cipher.ARC2.MODE_CBC, IV=KEY_ARGUMENT, effective_keylen=128)
160 try:
161 decryption = rc2.decrypt(server_finished[3:])
162 except ValueError, e:
163 return False
164 if decryption[17:].startswith(CHALLENGE):
165 return True
166 return False
167
168class DES(CipherSuite):
169 SECRET_KEY = 'b' * 8
170
171 @classmethod
172 def get_string_description(cls):
173 return "DES_64_CBC_WITH_MD5"
174
175 @classmethod
176 def get_client_master_key(cls, public_key):
177 client_master_key = SSLv2ClientMasterKey(cipher_suite=cls.get_constant(),
178 encrypted_key=cls.get_encrypted_pms(public_key, cls.SECRET_KEY),
179 key_argument=KEY_ARGUMENT)
180 return client_master_key
181
182 @classmethod
183 def verify_key(cls, connection_id, server_finished):
184 md5 = MD5.new(cls.SECRET_KEY + '0' + CHALLENGE + connection_id).digest()
185 des = Crypto.Cipher.DES.new(md5[:8], mode=Crypto.Cipher.DES.MODE_CBC, IV=KEY_ARGUMENT)
186 try:
187 decryption = des.decrypt(server_finished[3:])
188 except ValueError, e:
189 return False
190 if decryption[17:].startswith(CHALLENGE):
191 return True
192 return False
193
194cipher_suites = [RC2Export, RC4Export, RC4, DES]
195
196def parse_certificate(derData):
197 cert = decode(derData, asn1Spec=Certificate())[0]
198 x509cert = X509Certificate(cert)
199 tbs = x509cert.tbsCertificate
200
201 algType = tbs.pub_key_info.algType
202 algParams = tbs.pub_key_info.key
203
204 if (algType != PublicKeyInfo.RSA):
205 print 'Certificate algType is not RSA'
206 raise Exception()
207
208 return RSA.construct((long(hexlify(algParams["mod"]), 16), long(algParams["exp"])))
209
210class Protocol(Enum):
211 BARE_SSLv2 = 1
212 ESMTP = 2
213 IMAP = 3
214 POP3 = 4
215
216def sslv2_connect(ip, port, protocol, cipher_suite, result_additional_data):
217 s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
218 s.settimeout(SOCKET_TIMEOUT)
219 try:
220 s.connect((ip, port))
221 except socket.error, e:
222 try:
223 s.connect((ip, port))
224 except socket.error, e:
225 try:
226 s.connect((ip, port))
227 except socket.error, e:
228 print '%s: Case 1 - port is apparently closed (after 3 tries); Connect failed' % ip
229 return CON_FAIL
230
231 starttls_response = "n/a"
232 try:
233 if protocol == Protocol.ESMTP:
234 banner = s.recv(SOCKET_RECV_SIZE)
235 s.send("EHLO testing\r\n")
236 ehlo_response = s.recv(SOCKET_RECV_SIZE)
237 if "starttls" not in ehlo_response.lower():
238 print "%s: Case 2a; Server apparently doesn't support STARTTLS" % ip
239 return NO_STARTTLS
240 s.send("STARTTLS\r\n")
241 starttls_response = s.recv(SOCKET_RECV_SIZE)
242 if protocol == Protocol.IMAP:
243 banner = s.recv(SOCKET_RECV_SIZE)
244 s.send(". CAPABILITY\r\n")
245 banner = s.recv(SOCKET_RECV_SIZE)
246 if "starttls" not in banner.lower():
247 print "%s: Case 2b; Server apparently doesn't support STARTTLS" % ip
248 return NO_STARTTLS
249 s.send(". STARTTLS\r\n")
250 starttls_response = s.recv(SOCKET_RECV_SIZE)
251 if protocol == Protocol.POP3:
252 banner = s.recv(SOCKET_RECV_SIZE)
253 s.send("STLS\r\n")
254 starttls_response = s.recv(SOCKET_RECV_SIZE)
255 except socket.error, e:
256 print "Errorx: " + str(e) + " - starttls_response: '" + starttls_response + "'"
257 print '%s: Case 2c; starttls negotiation failed' % ip
258 return NO_STARTTLS
259
260
261 client_hello = SSLv2Record()/SSLv2ClientHello(cipher_suites=SSL2_CIPHER_SUITES.keys(),challenge=CHALLENGE)
262 s.sendall(str(client_hello))
263
264 rlist, wlist, xlist = select.select([s], [], [s], SOCKET_TIMEOUT)
265 if s in xlist or not s in rlist:
266 print '%s: Case 3a; Server did not response properly to client hello' % ip
267 s.close()
268 return "3a: %s" % NO_TLS
269
270 try:
271 server_hello_raw = s.recv(SOCKET_RECV_SIZE)
272 except socket.error, e:
273 print '%s: Case 3b; Connection reset by peer when waiting for server hello' % ip
274 s.close()
275 return "3b: %s" % NO_TLS
276
277 server_hello = timeout(SSL, (server_hello_raw,), timeout_duration=SOCKET_TIMEOUT)
278 if server_hello == None:
279 print '%s: Case 3c; Timeout on parsing server hello' % ip
280 s.close()
281 return "3c: %s" % NO_TLS
282
283 if not SSLv2ServerHello in server_hello:
284 print '%s: Case 3d; Server hello did not contain server hello' % ip
285 s.close()
286 return "3d: %s" % NO_TLS
287
288 parsed_server_hello = server_hello[SSLv2ServerHello]
289 connection_id = parsed_server_hello.connection_id
290 cert = parsed_server_hello.certificates
291
292 try:
293 public_key = parse_certificate(cert)
294 except:
295 # Server could still be vulnerable, we just can't tell, so we assume it isn't
296 print '%s: Case 4a; Could not extract public key from DER' % ip
297 s.close()
298 return "4a: %s" % NO_TLS
299
300 server_advertised_cipher_suites = parsed_server_hello.fields["cipher_suites"]
301 cipher_suite_advertised = cipher_suite.get_constant() in server_advertised_cipher_suites
302
303 client_master_key = cipher_suite.get_client_master_key(public_key)
304 client_key_exchange = SSLv2Record()/client_master_key
305
306 s.sendall(str(client_key_exchange))
307
308 rlist, wlist, xlist = select.select([s], [], [s], SOCKET_TIMEOUT)
309 if s in xlist:
310 print '%s: Case 4b; Exception on socket after sending client key exchange' % ip
311 s.close()
312 return "4b: %s" % NO_TLS
313 if not s in rlist:
314 print '%s: Case 5; Server did not send finished in time' % ip
315 s.close()
316 return "5: %s" % NO_TLS
317 try:
318 server_finished = s.recv(SOCKET_RECV_SIZE)
319 except socket.error, e:
320 print '%s: Case 4c; Connection reset by peer when waiting for server finished' % ip
321 s.close()
322 return "4c: %s" % NO_TLS
323
324 if server_finished == '':
325 print '%s: Case 4d; Empty server_finished' % ip
326 s.close()
327 return "4d: %s" % NO_TLS
328
329 if not cipher_suite.verify_key(connection_id, server_finished):
330 print '%s: Case 7; Symmetric key did not successfully verify on server finished message' % ip
331 return "7: %s" % NO_TLS
332
333 s.close()
334
335 result_additional_data['cipher_suite_advertised'] = cipher_suite_advertised
336 return "%s:%s" % (VULN, base64.b64encode(public_key.exportKey(format='DER')))
337
338if __name__ == '__main__':
339 ip = sys.argv[1]
340 port = int(sys.argv[2])
341 scan_id = os.getcwd()
342 dtime = datetime.datetime.now()
343 print 'Testing %s on port %s' % (ip, port)
344
345 protocol = Protocol.BARE_SSLv2
346 if len(sys.argv) >= 4:
347 if sys.argv[3] == '-esmtp':
348 protocol = Protocol.ESMTP
349 elif sys.argv[3] == '-imap':
350 protocol = Protocol.IMAP
351 elif sys.argv[3] == '-pop3':
352 protocol = Protocol.POP3
353 elif sys.argv[3] == '-bare':
354 protocol = Protocol.BARE_SSLv2
355 else:
356 print 'You gave 3 arguments, argument 3 is not a recognized protocol. Bailing out'
357 sys.exit(1)
358
359 vulns = []
360 for cipher_suite in cipher_suites:
361
362 string_description = cipher_suite.get_string_description()
363 ret_additional_data = {}
364 ret = sslv2_connect(ip, port, protocol, cipher_suite, ret_additional_data)
365
366 if ret.startswith(VULN):
367 pub_key = ret.replace('%s:' % VULN, '')
368
369 cve_string = ""
370 if not ret_additional_data['cipher_suite_advertised']:
371 cve_string = " to CVE-2015-3197"
372 if string_description == "RC4_128_WITH_MD5":
373 if cve_string == "":
374 cve_string = " to CVE-2016-0703"
375 else:
376 cve_string += " and CVE-2016-0703"
377
378 print '%s: Server is vulnerable%s, with cipher %s\n' % (ip, cve_string, string_description)
379 else:
380 print '%s: Server is NOT vulnerable with cipher %s, Message: %s\n' % (ip, string_description, ret)