· 5 years ago · Oct 09, 2020, 12:06 AM
1import sys
2import pathlib
3import argparse
4import base64
5import json
6import sqlite3
7from Crypto.Random import get_random_bytes
8from Crypto.Util import Padding
9from Crypto.PublicKey import RSA
10from Crypto.Cipher import AES
11from Crypto.Cipher import PKCS1_OAEP
12from Crypto.Signature import pkcs1_15
13from Crypto.Hash import SHA256
14
15CONFIG_DIR_PATH = pathlib.Path(pathlib.Path.home(), ".symep")
16KEYS_DIR_PATH = pathlib.Path(CONFIG_DIR_PATH, "keys")
17KEYCHAIN_PATH = pathlib.Path(CONFIG_DIR_PATH, "keychain.db")
18NICK_PATH = pathlib.Path(CONFIG_DIR_PATH, "nick.txt")
19
20# initialize argparse flags
21parser = argparse.ArgumentParser(description="simple commandline tool for symep",
22 formatter_class=argparse.RawTextHelpFormatter)
23commands = parser.add_mutually_exclusive_group()
24commands.add_argument("-gk", "--gen-keys", action="store_true",
25 help="generate new key pair")
26commands.add_argument("-pk", "--print-key", action="store_true",
27 help="print nick and public key")
28commands.add_argument("-ae", "--add-entry", nargs=2,
29 help="add or update keychain entry",
30 metavar=("NICK", "KEY"))
31commands.add_argument("-de", "--delete-entry", nargs=1,
32 help="remove keychain entry",
33 metavar="NICK")
34commands.add_argument("-le", "--list-entries", action="store_true",
35 help="list all keychain entries")
36commands.add_argument("-e", "--encrypt", nargs=2,
37 help="encrypt and sign message", type=str,
38 metavar=("NICK", "MESSAGE"))
39commands.add_argument("-d", "--decrypt", type=str, nargs=1,
40 help="decrypt and verify message",
41 metavar="MESSAGE")
42
43
44def main():
45 # check if configuration files exist
46 keychain_initialized = True
47 if not CONFIG_DIR_PATH.exists():
48 while True:
49 reply = input(f"config directory does not exist, create in {CONFIG_DIR_PATH}? (y/n) ")
50 if reply == "y":
51 CONFIG_DIR_PATH.mkdir()
52 print("creating empty key directory, run with -gk or --gen-keys flag to generate a key pair")
53 KEYS_DIR_PATH.mkdir()
54 print("creating empty keychain, run with -ae to add entries")
55 KEYCHAIN_PATH.touch()
56 keychain_initialized = False
57 NICK_PATH.touch()
58 with open(NICK_PATH, "w") as f:
59 f.write(input("choose a unique nick: "))
60 break
61 elif reply == "n":
62 exit()
63
64 db = sqlite3.connect(str(KEYCHAIN_PATH))
65 with open(NICK_PATH, "r") as f:
66 nick = f.read()
67
68 # initialize keychain db if newly created
69 if not keychain_initialized:
70 create_keychain_table_sql = """CREATE TABLE IF NOT EXISTS keychain
71 (
72 nick text PRIMARY KEY,
73 public_key text
74 );"""
75 create_keychain_index_sql = "CREATE INDEX public_key ON keychain (public_key);"
76 db.execute(create_keychain_table_sql)
77 db.execute(create_keychain_index_sql)
78 db.commit()
79
80 # parse arguments
81 args = parser.parse_args()
82 if args.gen_keys:
83 gen_keys()
84 elif args.print_key:
85 print_key(nick)
86 elif args.add_entry:
87 add_entry(db, args.add_entry)
88 elif args.delete_entry:
89 delete_entry(db, args.delete_entry)
90 elif args.list_entries:
91 list_entries(db, args.list_entries)
92 elif args.encrypt:
93 encrypt(db, nick, args.encrypt)
94 elif args.decrypt:
95 decrypt(db, args.decrypt)
96
97 db.close()
98
99
100def gen_keys():
101 print("generating keys, this may take a while...")
102 key = RSA.generate(4096)
103 print("done")
104 print("saving keys")
105 with open(pathlib.Path(KEYS_DIR_PATH, "private.pem"), "wb") as f:
106 f.write(key.export_key())
107 with open(pathlib.Path(KEYS_DIR_PATH, "public.pem"), "wb") as f:
108 f.write(key.publickey().export_key())
109
110
111def print_key(nick):
112 with open(pathlib.Path(KEYS_DIR_PATH, "public.pem"), "r") as f:
113 print(f"{nick},{f.read()}")
114
115
116def add_entry(db, args):
117 add_entry_sql = "REPLACE INTO keychain (nick, public_key) VALUES (?, ?)"
118 db.execute(add_entry_sql, args)
119 db.commit()
120
121
122def delete_entry(db, args):
123 delete_entry_sql = "DELETE FROM keychain WHERE nick = ?"
124 db.execute(delete_entry_sql, args)
125 db.commit()
126
127
128def list_entries(db, args):
129 list_entries_sql = "SELECT * FROM keychain"
130 rows = db.execute(list_entries_sql)
131 print("nick|key")
132 for row in rows:
133 print(f"{row[0]}|{row[1]}")
134
135
136def encrypt(db, nick, args):
137 recipient_nick = args[0]
138 plaintext_stream = args[1]
139
140 aes_key = get_random_bytes(32)
141 aes_cipher = AES.new(aes_key, AES.MODE_CBC)
142 iv = base64.b64encode(aes_cipher.iv).decode("utf-8")
143
144 # ghetto way to handle stdin, fix in new version
145 # need to create subparsers for each command
146 if plaintext_stream == "-":
147 plaintext = sys.stdin.read().encode("utf-8")
148 else:
149 with open(plaintext_stream, "rb") as f:
150 plaintext = f.read()
151 ciphertext = base64.b64encode(aes_cipher.encrypt(Padding.pad(plaintext, AES.block_size))).decode("utf-8")
152
153 with open(pathlib.Path(KEYS_DIR_PATH, "private.pem"), "rb") as f:
154 private = RSA.import_key(f.read())
155 signature = base64.b64encode(pkcs1_15.new(private).sign(SHA256.new(plaintext))).decode("utf-8")
156
157 # load recipients public key from database
158 recipient_public_key_sql = "SELECT * FROM keychain WHERE nick = ?"
159 row = db.execute(recipient_public_key_sql, (recipient_nick, )).fetchone()
160 if not row:
161 exit("recipient nick not in keychain, run with -ae to add a keychain entry")
162 rsa_key = RSA.import_key(row[1])
163 rsa_cipher = PKCS1_OAEP.new(rsa_key)
164 key_enc = base64.b64encode(rsa_cipher.encrypt(aes_key)).decode("utf-8")
165
166 dump = json.dumps(
167 {
168 "message": ciphertext,
169 "sender_nick": nick,
170 "key": key_enc,
171 "iv": iv,
172 "signature": signature
173 }
174 )
175
176 print(base64.b64encode(dump.encode("utf-8")).decode("utf-8"))
177
178
179def decrypt(db, args):
180 ciphertext_stream = args[0]
181
182 if ciphertext_stream == "-":
183 ciphertext = sys.stdin.read().encode("utf-8")
184 else:
185 with open(ciphertext_stream, "rb") as f:
186 ciphertext = f.read()
187
188 ciphertext = base64.b64decode(ciphertext)
189 ciphertext_dict = json.loads(ciphertext)
190
191 with open(pathlib.Path(KEYS_DIR_PATH, "private.pem"), "rb") as f:
192 private = RSA.import_key(f.read())
193 rsa_cipher = PKCS1_OAEP.new(private)
194
195 aes_key = rsa_cipher.decrypt(base64.b64decode(ciphertext_dict["key"]))
196 aes_iv = base64.b64decode(ciphertext_dict["iv"])
197 message_enc = base64.b64decode(ciphertext_dict["message"])
198
199 aes_cipher = AES.new(aes_key, AES.MODE_CBC, aes_iv)
200 message = Padding.unpad(aes_cipher.decrypt(message_enc), AES.block_size)
201
202 recipient_public_key_sql = "SELECT * FROM keychain WHERE nick = ?"
203 row = db.execute(recipient_public_key_sql, (ciphertext_dict["sender_nick"], )).fetchone()
204 if not row:
205 exit("recipient nick not in keychain, run with -ae to add a keychain entry")
206 rsa_key = RSA.import_key(row[1])
207 try:
208 pkcs1_15.new(rsa_key).verify(SHA256.new(message), base64.b64decode(ciphertext_dict["signature"]))
209 except (ValueError, TypeError):
210 exit("signature does not match")
211 sys.stdout.buffer.write(message)
212
213
214if __name__ == '__main__':
215 main()
216