· 6 years ago · Nov 18, 2019, 06:30 AM
1#!/usr/bin/python
2import json
3import os
4import requests
5import sys
6
7### EDIT THESE: Configuration values ###
8
9# URL to acme-dns instance
10ACMEDNS_URL = "https://auth.acme-dns.io"
11# Path for acme-dns credential storage
12STORAGE_PATH = "/etc/letsencrypt/acmedns.json"
13# Whitelist for address ranges to allow the updates from
14# Example: ALLOW_FROM = ["192.168.10.0/24", "::1/128"]
15ALLOW_FROM = []
16# Force re-registration. Overwrites the already existing acme-dns accounts.
17FORCE_REGISTER = False
18
19### DO NOT EDIT BELOW THIS POINT ###
20### HERE BE DRAGONS ###
21
22DOMAIN = os.environ["CERTBOT_DOMAIN"]
23if DOMAIN.startswith("*."):
24 DOMAIN = DOMAIN[2:]
25VALIDATION_DOMAIN = "_acme-challenge."+DOMAIN
26VALIDATION_TOKEN = os.environ["CERTBOT_VALIDATION"]
27
28
29class AcmeDnsClient(object):
30 """
31 Handles the communication with ACME-DNS API
32 """
33
34 def __init__(self, acmedns_url):
35 self.acmedns_url = acmedns_url
36
37 def register_account(self, allowfrom):
38 """Registers a new ACME-DNS account"""
39
40 if allowfrom:
41 # Include whitelisted networks to the registration call
42 reg_data = {"allowfrom": allowfrom}
43 res = requests.post(self.acmedns_url+"/register",
44 data=json.dumps(reg_data))
45 else:
46 res = requests.post(self.acmedns_url+"/register")
47 if res.status_code == 201:
48 # The request was successful
49 return res.json()
50 else:
51 # Encountered an error
52 msg = ("Encountered an error while trying to register a new acme-dns "
53 "account. HTTP status {}, Response body: {}")
54 print(msg.format(res.status_code, res.text))
55 sys.exit(1)
56
57 def update_txt_record(self, account, txt):
58 """Updates the TXT challenge record to ACME-DNS subdomain."""
59 update = {"subdomain": account['subdomain'], "txt": txt}
60 headers = {"X-Api-User": account['username'],
61 "X-Api-Key": account['password'],
62 "Content-Type": "application/json"}
63 res = requests.post(self.acmedns_url+"/update",
64 headers=headers,
65 data=json.dumps(update))
66 if res.status_code == 200:
67 # Successful update
68 return
69 else:
70 msg = ("Encountered an error while trying to update TXT record in "
71 "acme-dns. \n"
72 "------- Request headers:\n{}\n"
73 "------- Request body:\n{}\n"
74 "------- Response HTTP status: {}\n"
75 "------- Response body: {}")
76 s_headers = json.dumps(headers, indent=2, sort_keys=True)
77 s_update = json.dumps(update, indent=2, sort_keys=True)
78 s_body = json.dumps(res.json(), indent=2, sort_keys=True)
79 print(msg.format(s_headers, s_update, res.status_code, s_body))
80 sys.exit(1)
81
82class Storage(object):
83 def __init__(self, storagepath):
84 self.storagepath = storagepath
85 self._data = self.load()
86
87 def load(self):
88 """Reads the storage content from the disk to a dict structure"""
89 data = dict()
90 filedata = ""
91 try:
92 with open(self.storagepath, 'r') as fh:
93 filedata = fh.read()
94 except IOError as e:
95 if os.path.isfile(self.storagepath):
96 # Only error out if file exists, but cannot be read
97 print("ERROR: Storage file exists but cannot be read")
98 sys.exit(1)
99 try:
100 data = json.loads(filedata)
101 except ValueError:
102 if len(filedata) > 0:
103 # Storage file is corrupted
104 print("ERROR: Storage JSON is corrupted")
105 sys.exit(1)
106 return data
107
108 def save(self):
109 """Saves the storage content to disk"""
110 serialized = json.dumps(self._data)
111 try:
112 with os.fdopen(os.open(self.storagepath,
113 os.O_WRONLY | os.O_CREAT, 0o600), 'w') as fh:
114 fh.truncate()
115 fh.write(serialized)
116 except IOError as e:
117 print("ERROR: Could not write storage file.")
118 sys.exit(1)
119
120 def put(self, key, value):
121 """Puts the configuration value to storage and sanitize it"""
122 # If wildcard domain, remove the wildcard part as this will use the
123 # same validation record name as the base domain
124 if key.startswith("*."):
125 key = key[2:]
126 self._data[key] = value
127
128 def fetch(self, key):
129 """Gets configuration value from storage"""
130 try:
131 return self._data[key]
132 except KeyError:
133 return None
134
135if __name__ == "__main__":
136 # Init
137 client = AcmeDnsClient(ACMEDNS_URL)
138 storage = Storage(STORAGE_PATH)
139
140 # Check if an account already exists in storage
141 account = storage.fetch(DOMAIN)
142 if FORCE_REGISTER or not account:
143 # Create and save the new account
144 account = client.register_account(ALLOW_FROM)
145 storage.put(DOMAIN, account)
146 storage.save()
147
148 # Display the notification for the user to update the main zone
149 msg = "Please add the following TXT record to your main DNS zone:\n{}"
150 cname = "{} TXT {}.".format(VALIDATION_DOMAIN, account["fulldomain"])
151 print(msg.format(cname))
152
153 # Update the TXT record in acme-dns instance
154 client.update_txt_record(account, VALIDATION_TOKEN)