· 5 years ago · Sep 02, 2020, 12:46 PM
1from pathlib import Path
2from datetime import datetime, timezone
3import requests
4import dataclasses
5import json
6import subprocess
7import shutil
8import sys
9
10api_version = "v0a769"
11api = f"https://api.cloudflareclient.com/{api_version}"
12reg_url = f"{api}/reg"
13status_url = f"{api}/client_config"
14terms_of_service_url = "https://www.cloudflare.com/application/terms/"
15
16data_path = Path(".")
17identity_path = data_path.joinpath("wgcf-identity.json")
18config_path = data_path.joinpath("wgcf-profile.conf")
19
20default_headers = {"Accept-Encoding": "gzip",
21 "User-Agent": "okhttp/3.12.1"}
22
23# toggle to allow sniffing traffic
24debug = False
25
26
27def get_verify() -> bool:
28 return not debug
29
30
31def get_config_url(account_token: str) -> str:
32 return f"{reg_url}/{account_token}"
33
34
35@dataclasses.dataclass
36class AccountData():
37 account_id: str
38 access_token: str
39 private_key: str
40
41
42@dataclasses.dataclass
43class ConfigurationData():
44 local_address_ipv4: str
45 local_address_ipv6: str
46 endpoint_address_host: str
47 endpoint_address_ipv4: str
48 endpoint_address_ipv6: str
49 endpoint_public_key: str
50 warp_enabled: bool
51 account_type: str
52 warp_plus_enabled: bool
53
54
55def get_timestamp() -> str:
56 # SimpleDateFormat("yyyy-MM-dd\'T\'HH:mm:ss", Locale.US)
57 timestamp = datetime.now(tz=timezone.utc).astimezone(None).strftime("%Y-%m-%dT%H:%M:%S.%f%z")
58 # trim microseconds to 2 digits
59 timestamp = timestamp[:-10]+timestamp[-6:]
60 # separate timezone offset
61 timestamp = timestamp[:-2]+":"+timestamp[-2:]
62 return timestamp
63
64
65def gen_private_key() -> str:
66 privKey = input("Please enter private key:\n")
67 return privKey.strip()
68
69
70def gen_public_key(private_key: str) -> str:
71 pubKey = input("Please enter public key:\n")
72 return pubKey.strip()
73
74
75def do_register() -> AccountData:
76 timestamp = get_timestamp()
77 private_key = gen_private_key()
78 public_key = gen_public_key(private_key)
79 data = {"install_id": "", "tos": timestamp, "key": public_key, "fcm_token": "", "type": "Android",
80 "locale": "en_US"}
81
82 headers = default_headers.copy()
83 headers["Content-Type"] = "application/json; charset=UTF-8"
84
85 response = requests.post(reg_url, json=data, headers=headers, verify=get_verify())
86
87 response.raise_for_status()
88 response = response.json()
89 return AccountData(response["id"], response["token"], private_key)
90
91
92def save_identitiy(account_data: AccountData):
93 with open(identity_path, "w") as f:
94 f.write(json.dumps(dataclasses.asdict(account_data), indent=4))
95
96
97def load_identity() -> AccountData:
98 with open(identity_path, "r") as f:
99 account_data = AccountData(**json.loads(f.read()))
100 return account_data
101
102
103def enable_warp(account_data: AccountData):
104 data = {"warp_enabled": True}
105
106 headers = default_headers.copy()
107 headers["Authorization"] = f"Bearer {account_data.access_token}"
108 headers["Content-Type"] = "application/json; charset=UTF-8"
109
110 response = requests.patch(get_config_url(account_data.account_id), json=data, headers=headers, verify=get_verify())
111
112 response.raise_for_status()
113 response = json.loads(response.content)
114 assert response["warp_enabled"] == True
115
116
117def get_server_conf(account_data: AccountData) -> ConfigurationData:
118 headers = default_headers.copy()
119 headers["Authorization"] = f"Bearer {account_data.access_token}"
120
121 response = requests.get(get_config_url(account_data.account_id), headers=headers, verify=get_verify())
122
123 response.raise_for_status()
124 response = json.loads(response.content)
125
126 addresses = response["config"]["interface"]["addresses"]
127 peer = response["config"]["peers"][0]
128 endpoint = peer["endpoint"]
129
130 account = response["account"] if "account" in response else ""
131 account_type = account["account_type"] if account != "" else "free"
132 warp_plus = account["warp_plus"] if account != "" else False
133
134 return ConfigurationData(addresses["v4"], addresses["v6"], endpoint["host"], endpoint["v4"],
135 endpoint["v6"], peer["public_key"], response["warp_enabled"], account_type, warp_plus)
136
137
138def get_wireguard_conf(private_key: str, address_1: str, address_2: str, public_key: str, endpoint: str) -> str:
139 return f"""
140[Interface]
141PrivateKey = {private_key}
142DNS = 1.1.1.1
143Address = {address_1}/32
144Address = {address_2}/128
145[Peer]
146PublicKey = {public_key}
147AllowedIPs = 0.0.0.0/0
148AllowedIPs = ::/0
149Endpoint = {endpoint}
150"""[1:-1]
151
152
153def create_conf(account_data: AccountData, conf_data: ConfigurationData):
154 with open(config_path, "w") as f:
155 f.write(
156 get_wireguard_conf(account_data.private_key, conf_data.local_address_ipv4,
157 conf_data.local_address_ipv6, conf_data.endpoint_public_key,
158 conf_data.endpoint_address_host))
159
160
161if __name__ == "__main__":
162 data_path.mkdir(exist_ok=True)
163 account_data: AccountData
164
165
166 if not identity_path.exists():
167 print("This project is in no way affiliated with Cloudflare!")
168 print(f"Cloudflare's Terms of Service: {terms_of_service_url}")
169 if not input("Do you agree? (y/N): ").lower() == "y":
170 sys.exit(2)
171
172 print(f"Creating new identity...")
173 account_data = do_register()
174 save_identitiy(account_data)
175 else:
176 print(f"Loading existing identity...")
177 account_data = load_identity()
178
179 print(f"Getting configuration...")
180 conf_data = get_server_conf(account_data)
181
182 if not conf_data.warp_enabled:
183 print(f"Enabling Warp...")
184 enable_warp(account_data)
185 conf_data.warp_enabled = True
186
187 print(f"Account type: {conf_data.account_type}")
188 print(f"Warp+ enabled: {conf_data.warp_plus_enabled}")
189
190 print("Creating WireGuard configuration...")
191 create_conf(account_data, conf_data)
192
193 print("All done! Find your files here:")
194 print(identity_path.absolute())
195 print(config_path.absolute())