· 5 years ago · Oct 07, 2020, 08:48 PM
1import sys
2import pathlib
3import argparse
4import base64
5import json
6import sqlite3
7from Crypto.Random import get_random_bytes
8from Crypto.Util import Padding
9from Crypto.PublicKey import RSA
10from Crypto.Cipher import AES
11from Crypto.Cipher import PKCS1_OAEP
12from Crypto.Signature import pkcs1_15
13from Crypto.Hash import SHA256
14
15CONFIG_DIR_PATH = pathlib.Path(pathlib.Path.home(), ".symep")
16KEYS_DIR_PATH = pathlib.Path(CONFIG_DIR_PATH, "keys")
17KEYCHAIN_PATH = pathlib.Path(CONFIG_DIR_PATH, "keychain.db")
18NICK_PATH = pathlib.Path(CONFIG_DIR_PATH, "nick.txt")
19
20# initialize argparse flags
21parser = argparse.ArgumentParser(description="simple commandline tool for symep",
22 formatter_class=argparse.RawTextHelpFormatter)
23commands = parser.add_mutually_exclusive_group()
24commands.add_argument("-gk", "--gen-keys", action="store_true",
25 help="generate new key pair")
26commands.add_argument("-pk", "--print-key", action="store_true",
27 help="print nick and public key")
28commands.add_argument("-ae", "--add-entry", nargs=2,
29 help="add or update keychain entry",
30 metavar=("NICK", "KEY"))
31commands.add_argument("-de", "--delete-entry", nargs=1,
32 help="remove keychain entry",
33 metavar="NICK")
34commands.add_argument("-le", "--list-entries", action="store_true",
35 help="list all keychain entries")
36commands.add_argument("-e", "--encrypt", nargs=2,
37 help="encrypt and sign message", type=str,
38 metavar=("NICK", "MESSAGE"))
39commands.add_argument("-d", "--decrypt", type=str, nargs=1,
40 help="decrypt and verify message",
41 metavar="MESSAGE")
42
43
44def main():
45 # check if configuration files exist
46 keychain_initialized = True
47 if not CONFIG_DIR_PATH.exists():
48 while True:
49 reply = input(f"config directory does not exist, create in {CONFIG_DIR_PATH}? (y/n) ")
50 if reply == "y":
51 CONFIG_DIR_PATH.mkdir()
52 print("creating empty key directory, run with -gk or --gen-keys flag to generate a key pair")
53 KEYS_DIR_PATH.mkdir()
54 print("creating empty keychain, run with -ae to add entries")
55 KEYCHAIN_PATH.touch()
56 keychain_initialized = False
57 NICK_PATH.touch()
58 with open(NICK_PATH, "w") as f:
59 f.write(input("choose a unique nick: "))
60 break
61 elif reply == "n":
62 exit()
63
64 db = sqlite3.connect(str(KEYCHAIN_PATH))
65
66 # initialize keychain db if newly created
67 if not keychain_initialized:
68 create_keychain_table_sql = """CREATE TABLE IF NOT EXISTS keychain
69 (
70 nick text PRIMARY KEY,
71 public_key text
72 );"""
73 create_keychain_index_sql = "CREATE INDEX public_key ON keychain (public_key);"
74 db.execute(create_keychain_table_sql)
75 db.execute(create_keychain_index_sql)
76 db.commit()
77
78 # parse arguments
79 args = parser.parse_args()
80 if args.gen_keys:
81 gen_keys()
82 elif args.print_key:
83 print_key()
84 elif args.add_entry:
85 add_entry(db, args.add_entry)
86 elif args.delete_entry:
87 delete_entry(db, args.delete_entry)
88 elif args.list_entries:
89 list_entries(db, args.list_entries)
90 elif args.encrypt:
91 encrypt(db, args.encrypt)
92 elif args.decrypt:
93 decrypt(db, args.decrypt)
94
95 db.close()
96
97
98def gen_keys():
99 print("generating keys, this may take a while...")
100 key = RSA.generate(4096)
101 print("done")
102 print("saving keys")
103 with open(pathlib.Path(KEYS_DIR_PATH, "private.pem"), "wb") as f:
104 f.write(key.export_key())
105 with open(pathlib.Path(KEYS_DIR_PATH, "public.pem"), "wb") as f:
106 f.write(key.publickey().export_key())
107
108
109def print_key():
110 with open(pathlib.Path(KEYS_DIR_PATH, "public.pem"), "r") as f:
111 print(f.read())
112
113
114def add_entry(db, args):
115 add_entry_sql = "REPLACE INTO keychain (nick, public_key) VALUES (?, ?)"
116 db.execute(add_entry_sql, args)
117 db.commit()
118
119
120def delete_entry(db, args):
121 delete_entry_sql = "DELETE FROM keychain WHERE nick = ?"
122 db.execute(delete_entry_sql, args)
123 db.commit()
124
125
126def list_entries(db, args):
127 list_entries_sql = "SELECT * FROM keychain"
128 rows = db.execute(list_entries_sql)
129 print("nick|key")
130 for row in rows:
131 print(f"{row[0]}|{row[1]}")
132
133
134def encrypt(db, args):
135 recipient_nick = args[0]
136 plaintext_stream = args[1]
137
138 aes_key = get_random_bytes(32)
139 aes_cipher = AES.new(aes_key, AES.MODE_CBC)
140 iv = base64.b64encode(aes_cipher.iv).decode("utf-8")
141
142 # ghetto way to handle stdin, fix in new version
143 # need to create subparsers for each command
144 if plaintext_stream == "-":
145 plaintext = sys.stdin.read().encode("utf-8")
146 else:
147 with open(plaintext_stream, "rb") as f:
148 plaintext = f.read()
149 ciphertext = base64.b64encode(aes_cipher.encrypt(Padding.pad(plaintext, AES.block_size))).decode("utf-8")
150
151 with open(pathlib.Path(KEYS_DIR_PATH, "private.pem"), "rb") as f:
152 private = RSA.import_key(f.read())
153 signature = base64.b64encode(pkcs1_15.new(private).sign(SHA256.new(plaintext))).decode("utf-8")
154
155 # load recipients public key from database
156 recipient_public_key_sql = "SELECT * FROM keychain WHERE nick = ?"
157 row = db.execute(recipient_public_key_sql, (recipient_nick, )).fetchone()
158 if not row:
159 exit("recipient nick not in keychain, run with -ae to add a keychain entry")
160 rsa_key = RSA.import_key(row[1])
161 rsa_cipher = PKCS1_OAEP.new(rsa_key)
162 key_enc = base64.b64encode(rsa_cipher.encrypt(aes_key)).decode("utf-8")
163
164 with open(NICK_PATH, "r") as f:
165 nick = f.read()
166
167 dump = json.dumps(
168 {
169 "message": ciphertext,
170 "sender_nick": nick,
171 "key": key_enc,
172 "iv": iv,
173 "signature": signature
174 }
175 )
176
177 print(base64.b64encode(dump.encode("utf-8")).decode("utf-8"))
178
179
180def decrypt(db, args):
181 ciphertext_stream = args[0]
182
183 if ciphertext_stream == "-":
184 ciphertext = sys.stdin.read().encode("utf-8")
185 else:
186 with open(ciphertext_stream, "rb") as f:
187 ciphertext = f.read()
188
189 ciphertext = base64.b64decode(ciphertext)
190 ciphertext_dict = json.loads(ciphertext)
191
192 with open(pathlib.Path(KEYS_DIR_PATH, "private.pem"), "rb") as f:
193 private = RSA.import_key(f.read())
194 rsa_cipher = PKCS1_OAEP.new(private)
195
196 aes_key = rsa_cipher.decrypt(base64.b64decode(ciphertext_dict["key"]))
197 aes_iv = base64.b64decode(ciphertext_dict["iv"])
198 message_enc = base64.b64decode(ciphertext_dict["message"])
199
200 aes_cipher = AES.new(aes_key, AES.MODE_CBC, aes_iv)
201 message = Padding.unpad(aes_cipher.decrypt(message_enc), AES.block_size)
202
203 recipient_public_key_sql = "SELECT * FROM keychain WHERE nick = ?"
204 row = db.execute(recipient_public_key_sql, (ciphertext_dict["sender_nick"], )).fetchone()
205 if not row:
206 exit("recipient nick not in keychain, run with -ae to add a keychain entry")
207 rsa_key = RSA.import_key(row[1])
208 try:
209 pkcs1_15.new(rsa_key).verify(SHA256.new(message), base64.b64decode(ciphertext_dict["signature"]))
210 except (ValueError, TypeError):
211 exit("signature does not match")
212 sys.stdout.buffer.write(message)
213
214
215if __name__ == '__main__':
216 main()
217