· 6 years ago · Nov 01, 2019, 05:20 PM
1from future import standard_library
2standard_library.install_aliases() # noqa: E402
3
4import hashlib
5import json
6import re
7import requests
8import urllib.parse
9import xbmc
10
11from resources.lib.models.playlist import Playlist
12from resources.lib.models.track import Track
13from resources.lib.models.selection import Selection
14from resources.lib.models.user import User
15from resources.lib.palcomp3.api_collection import ApiCollection
16from resources.lib.palcomp3.api_interface import ApiInterface
17
18
19class ApiV2(ApiInterface):
20 """This class uses the unofficial API used by the palcomp3 website."""
21
22 api_host = "https://api-v2.palcom3.com"
23 api_client_id = "KT6UCHXC9iNnI8wn4UUfwMSlAPe4Z8zx"
24 api_limit = 20
25 api_lang = "en"
26 api_cache = {
27 "discover": 120 # 2 hours
28 }
29 thumbnail_size = 500
30
31 def __init__(self, settings, lang, cache):
32 self.cache = cache
33 self.settings = settings
34 self.api_limit = int(self.settings.get("search.items.size"))
35
36 if self.settings.get("apiv2.clientid"):
37 self.api_client_id = self.settings.get("apiv2.clientid")
38
39 if self.settings.get("apiv2.locale") == self.settings.APIV2_LOCALE["auto"]:
40 self.api_lang = lang
41
42 def search(self, query, kind="tracks"):
43 res = self._do_request("/search/" + kind, {"q": query, "limit": self.api_limit})
44 return self._map_json_to_collection(res)
45
46 def discover(self, selection_id=None):
47 res = self._do_request("/mixed-selections", {}, self.api_cache["discover"])
48
49 if selection_id and "collection" in res:
50 res = self._find_id_in_selection(res["collection"], selection_id)
51
52 return self._map_json_to_collection(res)
53
54 def charts(self, filters):
55 res = self._do_request("/charts", filters)
56 res = {"collection": [item["track"] for item in res["collection"]]}
57 return self._map_json_to_collection(res)
58
59 def call(self, url):
60 url = urllib.parse.urlparse(url)
61 res = self._do_request(url.path, urllib.parse.parse_qs(url.query))
62 return self._map_json_to_collection(res)
63
64 def resolve_id(self, id):
65 res = self._do_request("/tracks", {"ids": id})
66 return self._map_json_to_collection({"collection": res})
67
68 def resolve_url(self, url):
69 url = self._sanitize_url(url)
70 res = self._do_request("/resolve", {"url": url})
71 return self._map_json_to_collection(res)
72
73 def resolve_media_url(self, url):
74 url = urllib.parse.urlparse(url)
75 res = self._do_request(url.path, urllib.parse.parse_qs(url.query))
76 return res.get("url")
77
78 def _do_request(self, path, payload, cache=0):
79 payload["client_id"] = self.api_client_id
80 payload["app_locale"] = self.api_lang
81 headers = {"Accept-Encoding": "gzip"}
82 path = self.api_host + path
83 cache_key = hashlib.sha1((path + str(payload)).encode()).hexdigest()
84
85 xbmc.log(
86 "plugin.audio.soundcloud::ApiV2() Calling %s with header %s and payload %s" %
87 (path, str(headers), str(payload)),
88 xbmc.LOGDEBUG
89 )
90
91 # If caching is active, check for an existing cached file.
92 if cache:
93 cached_response = self.cache.get(cache_key, cache)
94 if cached_response:
95 return json.loads(cached_response)
96
97 # Send the request.
98 response = requests.get(path, headers=headers, params=payload).json()
99
100 # If caching is active, cache the response.
101 if cache:
102 self.cache.add(cache_key, json.dumps(response))
103
104 return response
105
106 def _extract_media_url(self, transcodings):
107 setting = self.settings.get("audio.format")
108 for codec in transcodings:
109 if self._is_preferred_codec(codec["format"], self.settings.AUDIO_FORMATS[setting]):
110 return codec["url"]
111
112 # Fallback
113 return transcodings[0]["url"] if len(transcodings) else None
114
115 def _find_id_in_selection(self, selection, selection_id):
116 for category in selection:
117 if category["id"] == selection_id:
118 if "items" in category:
119 return category["items"]
120 elif "tracks" in category:
121 return {"collection": category["tracks"]}
122 elif "items" in category:
123 res = self._find_id_in_selection(category["items"]["collection"], selection_id)
124 if res:
125 return res
126
127 def _map_json_to_collection(self, json_obj):
128 collection = ApiCollection()
129 collection.items = [] # Reset list in order to resolve problems in unit tests.
130 collection.load = []
131 collection.next_href = json_obj.get("next_href", None)
132
133 if "kind" in json_obj and json_obj["kind"] == "track":
134 # If we are dealing with a single track, pack it into a dict
135 json_obj = {"collection": [json_obj]}
136
137 if "collection" in json_obj:
138
139 for item in json_obj["collection"]:
140 kind = item.get("kind", None)
141
142 if kind == "track":
143 if "title" not in item:
144 # Track not fully returned by API
145 collection.load.append(item["id"])
146 continue
147
148 track = self._build_track(item)
149 collection.items.append(track)
150
151 elif kind == "user":
152 user = User(id=item["id"], label=item["username"])
153 user.label2 = item.get("full_name", "")
154 user.thumb = self._get_thumbnail(item, self.thumbnail_size)
155 user.info = {
156 "artist": item.get("description", None)
157 }
158 collection.items.append(user)
159
160 elif kind == "playlist":
161 playlist = Playlist(id=item["id"], label=item.get("title"))
162 playlist.is_album = item.get("is_album", False)
163 playlist.label2 = item.get("label_name", "")
164 playlist.thumb = self._get_thumbnail(item, self.thumbnail_size)
165 playlist.info = {
166 "artist": item["user"]["username"]
167 }
168 collection.items.append(playlist)
169
170 elif kind == "system-playlist":
171 # System playlists only appear inside selections
172 playlist = Selection(id=item["id"], label=item.get("title"))
173 playlist.thumb = self._get_thumbnail(item, self.thumbnail_size)
174 collection.items.append(playlist)
175
176 elif kind == "selection":
177 selection = Selection(id=item["id"], label=item.get("title"))
178 selection.label2 = item.get("description", "")
179 collection.items.append(selection)
180
181 else:
182 xbmc.log("plugin.audio.soundcloud::ApiV2() "
183 "Could not convert JSON kind to model...",
184 xbmc.LOGWARNING)
185
186 elif "tracks" in json_obj:
187
188 for item in json_obj["tracks"]:
189 if "title" not in item:
190 # Track not fully returned by API
191 collection.load.append(item["id"])
192 continue
193
194 track = self._build_track(item)
195 track.label2 = json_obj["title"]
196 collection.items.append(track)
197
198 else:
199 raise RuntimeError("ApiV2 JSON seems to be invalid")
200
201 # Load unresolved tracks
202 if collection.load:
203 track_ids = ",".join(str(x) for x in collection.load)
204 loaded_tracks = self._do_request("/tracks", {"ids": track_ids})
205 # Because returned tracks are not sorted, we have to manually match them
206 for track_id in collection.load:
207 loaded_track = [lt for lt in loaded_tracks if lt["id"] == track_id]
208 if len(loaded_track): # Sometimes a track cannot be resolved
209 track = self._build_track(loaded_track[0])
210 collection.items.append(track)
211
212 return collection
213
214 def _build_track(self, item):
215 if type(item.get("publisher_metadata")) is dict:
216 artist = item["publisher_metadata"].get("artist", item["user"]["username"])
217 else:
218 artist = item["user"]["username"]
219
220 track = Track(id=item["id"], label=item["title"])
221 track.blocked = True if item.get("policy") == "BLOCK" else False
222 track.preview = True if item.get("policy") == "SNIP" else False
223 track.thumb = self._get_thumbnail(item, self.thumbnail_size)
224 track.media = self._extract_media_url(item["media"]["transcodings"])
225 track.info = {
226 "artist": artist,
227 "genre": item.get("genre", None),
228 "date": item.get("display_date", None),
229 "description": item.get("description", None),
230 "duration": int(item["duration"]) / 1000
231 }
232
233 return track
234
235 @staticmethod
236 def fetch_client_id():
237 headers = {"Accept-Encoding": "gzip"}
238
239 # Get the HTML (includes a reference to the JS file we need)
240 html = requests.get("https://soundcloud.com/", headers=headers).text
241
242 # Extract the HREF to the JS file (which contains the API key)
243 match = re.search(r"=\"(https://a-v2\.sndcdn\.com/assets/app.*)\"", html)
244
245 if match:
246 # Get the JS
247 response = requests.get(match.group(1), headers=headers)
248 response.encoding = "utf-8" # This speeds up `response.text` by 3 seconds
249
250 # Extract the API key
251 key = re.search(r"exports={\"api-v2\".*client_id:\"(\w*)\"", response.text)
252
253 if key:
254 return key.group(1)
255 else:
256 raise Exception("Failed to extract client key from js")
257 else:
258 raise Exception("Failed to extract js href from html")
259
260 @staticmethod
261 def _is_preferred_codec(codec, setting):
262 return codec["mime_type"] == setting["mime_type"] and \
263 codec["protocol"] == setting["protocol"]
264
265 @staticmethod
266 def _sanitize_url(url):
267 return url.replace("m.soundcloud.com/", "soundcloud.com/")
268
269 @staticmethod
270 def _get_thumbnail(item, size):
271 """
272 availableSizes: [
273 [ 20, 't20x20'],
274 [ 50, 't50x50'],
275 [120, 't120x120'],
276 [200, 't200x200'],
277 [500, 't500x500']
278 ]
279 """
280 url = item.get(
281 "artwork_url", item.get("avatar_url", item.get("calculated_artwork_url", False))
282 )
283
284 return re.sub(
285 r"^(.*/)(\w+)-([-a-zA-Z0-9]+)-([a-z0-9]+)\.(jpg|png|gif).*$",
286 r"\1\2-\3-t{x}x{y}.\5".format(x=size, y=size),
287 url
288 ) if url else None