· 7 years ago · Mar 08, 2018, 07:29 PM
1"""
2Demonstrates hybrid encryption (AES-GCM + RSA): encrypts a message
3under a public key and that can only be recovered using the corresponding
4secret key.
5
6Requires: Python 2; pycryptodomex
7"""
8from Crypto.Cipher import AES, PKCS1_OAEP
9from Crypto.PublicKey import RSA
10from Crypto.Random import get_random_bytes
11from base64 import urlsafe_b64encode as b64enc, urlsafe_b64decode as b64dec
12
13# Constants
14symmetricKeySizeBytes = 128/8
15encMsgKeyBytes = 384
16rsaKeySize = 3072
17
18publicKeyDecryptError = "This is an rsa PUBLIC key, but an rsa PRIVATE key is required for decryption."
19decryptionFailedError = "Decryption failed. Encrypted message is not valid."
20
21def test_encryptionRoundTrip():
22 # Make fresh keys
23 pubFilename, privFilename = createPubkeyPair("./test")
24 origMsg = "This is a super-secret message that needs to be protected"
25 print "Original message: '{}'\n".format(origMsg)
26
27 # Encrypt a message
28 ctext = publicKeyEncrypt(pubFilename, origMsg)
29 print "Ciphertext: {}\n".format(ctext)
30
31 # Recover the message
32 err, msg = publicKeyDecrypt(privFilename, ctext)
33 if err:
34 raise Exception(err)
35 print "Recovered message: '{}'".format(msg)
36 assert(origMsg == msg)
37
38def publicKeyEncrypt(recipientKeyfile, message):
39 """
40 Applies public key (hybrid) encryption to a given message when supplied
41 with a path to a public key (RSA in PEM format).
42 """
43 # Load the recipients pubkey from a PEM file
44 with open(recipientKeyfile, 'rb') as f:
45 recipientKey = RSA.import_key(f.read())
46
47 # Encrypt the message with AES-GCM using a newly selected key
48 messageKey, ctext = aesEncrypt(message)
49
50 # Encrypt the message key and prepend it to the ciphertext
51 cipher = PKCS1_OAEP.new(recipientKey)
52 encMsg = cipher.encrypt(messageKey) + ctext
53
54 # Format the message into b64
55 return b64enc(encMsg)
56
57def publicKeyDecrypt(privkeyFile, ctext):
58 """
59 Decrypts an encrypted message with a private (RSA) key.
60 Returns: (err, message)
61 """
62 privkey = None
63 with open(privkeyFile, 'rb') as f:
64 privkey = RSA.import_key(f.read())
65
66 # Verify that this is a private key
67 if not privkey.has_private():
68 return (publicKeyDecryptError, None)
69
70 # Verify the JEE and extract the encrypted message
71 encBytes = b64dec(ctext)
72
73 # Separate the encrypted message key from the symmetric-encrypted portion.
74 encKey, ctext = encBytes[:encMsgKeyBytes], encBytes[encMsgKeyBytes:]
75
76 # Recover the message key
77 msgKey = PKCS1_OAEP.new(privkey).decrypt(encKey)
78
79 # Recover the underlying message
80 try:
81 return (None, aesDescrypt(msgKey, ctext))
82 except ValueError:
83 return (decryptionFailedError, None)
84
85def createPubkeyPair(basename):
86 """
87 Creates a new secret/key pubkey pair and writes them to distinct files:
88 <basename>-public.pem
89 <basename>-private.pem
90 """
91 pubFilename = basename + "-public.pem"
92 privFilename = basename + "-private.pem"
93
94 # Create a new key and write both key versions to the correct file
95 privkey = RSA.generate(rsaKeySize)
96 pubkey = privkey.publickey()
97 _writePemFile(pubFilename, pubkey)
98 _writePemFile(privFilename, privkey)
99 return pubFilename, privFilename
100
101def _writePemFile(filename, key):
102 with open(filename, "w") as outfile:
103 outfile.write(key.exportKey(format='PEM'))
104
105def aesEncrypt(message):
106 """
107 Encrypts a message with a fresh key using AES-GCM.
108 Returns: (key, ciphertext)
109 """
110 key = get_random_bytes(symmetricKeySizeBytes)
111 cipher = AES.new(key, AES.MODE_GCM)
112 ctext, tag = cipher.encrypt_and_digest(message)
113
114 # Concatenate (nonce, tag, ctext) and return with key
115 return key, (cipher.nonce + tag + ctext)
116
117def aesDescrypt(key, ctext):
118 """
119 Decrypts and authenticates a ciphertext encrypted with with given key.
120 """
121 # Break the ctext into components, then decrypt
122 nonce,tag,ct = (ctext[:16], ctext[16:32], ctext[32:])
123 cipher = AES.new(key, AES.MODE_GCM, nonce)
124 return cipher.decrypt_and_verify(ct, tag)
125
126if __name__ == '__main__':
127 test_encryptionRoundTrip()