· 6 years ago · Jan 01, 2020, 08:02 PM
1import base64
2import datetime
3import hashlib
4import hmac
5import json
6import logging
7import os
8import random
9import sys
10import time
11import uuid
12
13import pytz
14import requests
15import requests.utils
16import six.moves.urllib as urllib
17from requests_toolbelt import MultipartEncoder
18from tqdm import tqdm
19
20from . import config, devices
21from .api_login import (
22 change_device_simulation,
23 generate_all_uuids,
24 load_uuid_and_cookie,
25 login_flow,
26 pre_login_flow,
27 reinstall_app_simulation,
28 save_uuid_and_cookie,
29 set_device,
30 sync_device_features,
31 sync_launcher,
32 sync_user_features,
33)
34from .api_photo import configure_photo, download_photo, upload_photo
35from .api_story import configure_story, download_story, upload_story_photo
36from .api_video import configure_video, download_video, upload_video
37from .prepare import delete_credentials, get_credentials
38
39try:
40 from json.decoder import JSONDecodeError
41except ImportError:
42 JSONDecodeError = ValueError
43
44
45PY2 = sys.version_info[0] == 2
46
47
48class API(object):
49 def __init__(self, device=None, base_path="", save_logfile=True, log_filename=None):
50 # Setup device and user_agent
51 self.device = device or devices.DEFAULT_DEVICE
52
53 self.cookie_fname = None
54 self.base_path = base_path
55
56 self.is_logged_in = False
57 self.last_login = None
58
59 self.last_response = None
60 self.total_requests = 0
61
62 # Setup logging
63 self.logger = logging.getLogger("[instabot_{}]".format(id(self)))
64
65 if not os.path.exists("./config/"):
66 os.makedirs("./config/") # create base_path if not exists
67
68 if not os.path.exists("./log/"):
69 os.makedirs("./log/") # create log folder if not exists
70
71 if save_logfile is True:
72 if log_filename is None:
73 log_filename = os.path.join(
74 base_path, "./log/instabot_{}.log".format(id(self))
75 )
76
77 fh = logging.FileHandler(filename=log_filename)
78 fh.setLevel(logging.INFO)
79 fh.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
80
81 self.logger.addHandler(fh)
82
83 ch = logging.StreamHandler()
84 ch.setLevel(logging.DEBUG)
85 ch.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
86
87 self.logger.addHandler(ch)
88 self.logger.setLevel(logging.DEBUG)
89
90 self.last_json = None
91
92 def set_user(self, username, password, generate_all_uuids=True, set_device=True):
93 self.username = username
94 self.password = password
95
96 self.logger = logging.getLogger("[instabot_{}]".format(self.username))
97
98 if set_device is True:
99 self.set_device()
100
101 if generate_all_uuids is True:
102 self.generate_all_uuids()
103
104 def set_contact_point_prefill(self, usage="prefill"):
105 data = json.dumps(
106 {
107 "id": self.uuid,
108 "phone_id": self.phone_id,
109 "_csrftoken": self.token,
110 "usage": usage,
111 }
112 )
113 return self.send_request("accounts/contact_point_prefill/", data, login=True)
114
115 def get_suggested_searches(self, _type="users"):
116 return self.send_request(
117 "fbsearch/suggested_searches/", self.json_data({"type": _type})
118 )
119
120 def read_msisdn_header(self, usage="default"):
121 data = json.dumps({"device_id": self.uuid, "mobile_subno_usage": usage})
122 return self.send_request(
123 "accounts/read_msisdn_header/",
124 data,
125 login=True,
126 headers={"X-DEVICE-ID": self.uuid},
127 )
128
129 def log_attribution(self, usage="default"):
130 data = json.dumps({"adid": self.advertising_id})
131 return self.send_request("attribution/log_attribution/", data, login=True)
132
133 # ====== ALL METHODS IMPORT FROM api_login ====== #
134 def sync_device_features(self, login=False):
135 return sync_device_features(self, login)
136
137 def sync_launcher(self, login=False):
138 return sync_launcher(self, login)
139
140 def sync_user_features(self):
141 return sync_user_features(self)
142
143 def pre_login_flow(self):
144 return pre_login_flow(self)
145
146 def login_flow(self, just_logged_in=False, app_refresh_interval=1800):
147 return login_flow(self, just_logged_in, app_refresh_interval)
148
149 def set_device(self):
150 return set_device(self)
151
152 def generate_all_uuids(self):
153 return generate_all_uuids(self)
154
155 def reinstall_app_simulation(self):
156 return reinstall_app_simulation(self)
157
158 def change_device_simulation(self):
159 return change_device_simulation(self)
160
161 def load_uuid_and_cookie(self, load_uuid=True, load_cookie=True):
162 return load_uuid_and_cookie(self, load_uuid=load_uuid, load_cookie=load_cookie)
163
164 def save_uuid_and_cookie(self):
165 return save_uuid_and_cookie(self)
166
167 def login(
168 self,
169 username=None,
170 password=None,
171 force=False,
172 proxy=None,
173 use_cookie=True,
174 use_uuid=True,
175 cookie_fname=None,
176 ask_for_code=False,
177 set_device=True,
178 generate_all_uuids=True,
179 is_threaded=False,
180 ):
181 if password is None:
182 username, password = get_credentials(username=username)
183
184 set_device = generate_all_uuids = True
185 self.set_user(username, password)
186 self.session = requests.Session()
187
188 self.proxy = proxy
189 self.set_proxy() # Only happens if `self.proxy`
190
191 self.cookie_fname = cookie_fname
192 if self.cookie_fname is None:
193 fmt = "{username}_uuid_and_cookie.json"
194 cookie_fname = fmt.format(username=username)
195 self.cookie_fname = os.path.join(self.base_path, cookie_fname)
196
197 cookie_is_loaded = False
198 msg = "Login flow failed, the cookie is broken. Relogin again."
199
200 if use_cookie is True:
201 # try:
202 if (
203 self.load_uuid_and_cookie(load_cookie=use_cookie, load_uuid=use_uuid)
204 is True
205 ):
206 # Check if the token loaded is valid.
207 if self.login_flow(False) is True:
208 cookie_is_loaded = True
209 self.save_successful_login()
210 else:
211 self.logger.info(msg)
212 set_device = generate_all_uuids = False
213 force = True
214
215 if not cookie_is_loaded and (not self.is_logged_in or force):
216 self.session = requests.Session()
217 if use_uuid is True:
218 if (
219 self.load_uuid_and_cookie(
220 load_cookie=use_cookie, load_uuid=use_uuid
221 )
222 is False
223 ):
224 if set_device is True:
225 self.set_device()
226 if generate_all_uuids is True:
227 self.generate_all_uuids()
228
229 self.pre_login_flow()
230 data = json.dumps(
231 {
232 "phone_id": self.phone_id,
233 "_csrftoken": self.token,
234 "username": self.username,
235 "guid": self.uuid,
236 "device_id": self.device_id,
237 "password": self.password,
238 "login_attempt_count": "0",
239 }
240 )
241
242 if self.send_request("accounts/login/", data, True):
243 self.save_successful_login()
244 self.login_flow(True)
245 return True
246
247 elif (
248 self.last_json.get("error_type", "") == "checkpoint_challenge_required"
249 ):
250 self.logger.info("Checkpoint challenge required...")
251 if ask_for_code is True:
252 solved = self.solve_challenge()
253 if solved:
254 self.save_successful_login()
255 self.login_flow(True)
256 return True
257 else:
258 self.save_failed_login()
259 return False
260 else:
261 return False
262 elif self.last_json.get("two_factor_required"):
263 if self.two_factor_auth():
264 self.save_successful_login()
265 self.login_flow(True)
266 return True
267 else:
268 self.save_failed_login()
269 return False
270 else:
271 self.save_failed_login()
272 return False
273
274 def two_factor_auth(self):
275 self.logger.info("Two-factor authentication required")
276 two_factor_code = input("Enter 2FA verification code: ")
277 two_factor_id = self.last_json["two_factor_info"]["two_factor_identifier"]
278
279 login = self.session.post(
280 config.API_URL + "accounts/two_factor_login/",
281 data={
282 "username": self.username,
283 "verification_code": two_factor_code,
284 "two_factor_identifier": two_factor_id,
285 "password": self.password,
286 "device_id": self.device_id,
287 "ig_sig_key_version": 4,
288 },
289 allow_redirects=True,
290 )
291
292 if login.status_code == 200:
293 resp_json = json.loads(login.text)
294 if resp_json["status"] != "ok":
295 if "message" in resp_json:
296 self.logger.error("Login error: {}".format(resp_json["message"]))
297 else:
298 self.logger.error(
299 ('Login error: "{}" status and' " message {}.").format(
300 resp_json["status"], login.text
301 )
302 )
303 return False
304 return True
305 else:
306 self.logger.error(
307 (
308 "Two-factor authentication request returns "
309 "{} error with message {} !"
310 ).format(login.status_code, login.text)
311 )
312 return False
313
314 def save_successful_login(self):
315 self.is_logged_in = True
316 self.last_login = time.time()
317 self.logger.info("Logged-in successfully as '{}'!".format(self.username))
318
319 def save_failed_login(self):
320 self.logger.info("Username or password is incorrect.")
321 delete_credentials()
322
323 def solve_challenge(self):
324 challenge_url = self.last_json["challenge"]["api_path"][1:]
325 try:
326 self.send_request(challenge_url, None, login=True, with_signature=False)
327 except Exception as e:
328 self.logger.error("solve_challenge; {}".format(e))
329 return False
330
331 choices = self.get_challenge_choices()
332 for choice in choices:
333 print(choice)
334 code = input("Insert choice: ")
335
336 data = json.dumps({"choice": code})
337 try:
338 self.send_request(challenge_url, data, login=True)
339 except Exception as e:
340 self.logger.error(e)
341 return False
342
343 print("A code has been sent to the method selected, please check.")
344 code = input("Insert code: ")
345
346 data = json.dumps({"security_code": code})
347 try:
348 self.send_request(challenge_url, data, login=True)
349 except Exception as e:
350 self.logger.error(e)
351 return False
352
353 worked = (
354 ("logged_in_user" in self.last_json)
355 and (self.last_json.get("action", "") == "close")
356 and (self.last_json.get("status", "") == "ok")
357 )
358
359 if worked:
360 return True
361
362 self.logger.error("Not possible to log in. Reset and try again")
363 return False
364
365 def get_challenge_choices(self):
366 last_json = self.last_json
367 choices = []
368
369 if last_json.get("step_name", "") == "select_verify_method":
370 choices.append("Checkpoint challenge received")
371 if "phone_number" in last_json["step_data"]:
372 choices.append("0 - Phone")
373 if "email" in last_json["step_data"]:
374 choices.append("1 - Email")
375
376 if last_json.get("step_name", "") == "delta_login_review":
377 choices.append("Login attempt challenge received")
378 choices.append("0 - It was me")
379 choices.append("0 - It wasn't me")
380
381 if not choices:
382 choices.append(
383 '"{}" challenge received'.format(last_json.get("step_name", "Unknown"))
384 )
385 choices.append("0 - Default")
386
387 return choices
388
389 def logout(self, *args, **kwargs):
390 if not self.is_logged_in:
391 return True
392 data = json.dumps({})
393 self.is_logged_in = not self.send_request(
394 "accounts/logout/", data, with_signature=False
395 )
396 return not self.is_logged_in
397
398 def set_proxy(self):
399 if getattr(self, "proxy", None):
400 parsed = urllib.parse.urlparse(self.proxy)
401 scheme = "http://" if not parsed.scheme else ""
402 self.session.proxies["http"] = scheme + self.proxy
403 self.session.proxies["https"] = scheme + self.proxy
404
405 def send_request( # noqa: C901
406 self,
407 endpoint,
408 post=None,
409 login=False,
410 with_signature=True,
411 headers=None,
412 extra_sig=None,
413 ):
414 self.set_proxy() # Only happens if `self.proxy`
415 if not self.is_logged_in and not login:
416 msg = "Not logged in!"
417 self.logger.critical(msg)
418 raise Exception(msg)
419
420 self.session.headers.update(config.REQUEST_HEADERS)
421 self.session.headers.update(
422 {
423 "User-Agent": self.user_agent,
424 "X-IG-Connection-Speed": "-1kbps",
425 "X-IG-Bandwidth-Speed-KBPS": str(random.randint(7000, 10000)),
426 "X-IG-Bandwidth-TotalBytes-B": str(random.randint(500000, 900000)),
427 "X-IG-Bandwidth-TotalTime-MS": str(random.randint(50, 150)),
428 }
429 )
430 if headers:
431 self.session.headers.update(headers)
432 try:
433 self.total_requests += 1
434 if post is not None: # POST
435 if with_signature:
436 post = self.generate_signature(
437 post
438 ) # Only `send_direct_item` doesn't need a signature
439 if extra_sig is not None and extra_sig != []:
440 post += "&".join(extra_sig)
441 response = self.session.post(config.API_URL + endpoint, data=post)
442 else: # GET
443 response = self.session.get(config.API_URL + endpoint)
444 except Exception as e:
445 self.logger.warning(str(e))
446 return False
447
448 self.last_response = response
449 self.logger.debug(response)
450 if response.status_code == 200:
451 try:
452 self.last_json = json.loads(response.text)
453 return True
454 except JSONDecodeError:
455 return False
456 else:
457 # print(endpoint, post, response.content)
458 if response.status_code != 404 and response.status_code != "404":
459 self.logger.error(
460 "Request returns {} error!".format(response.status_code)
461 )
462 try:
463 response_data = json.loads(response.text)
464 if response_data.get(
465 "message"
466 ) is not None and "feedback_required" in str(
467 response_data.get("message").encode("utf-8")
468 ):
469 self.logger.error(
470 "ATTENTION!: `feedback_required`"
471 + str(response_data.get("feedback_message").encode("utf-8"))
472 )
473 try:
474 self.last_response = response
475 self.last_json = json.loads(response.text)
476 except Exception:
477 pass
478 return "feedback_required"
479 except ValueError:
480 self.logger.error(
481 "Error checking for `feedback_required`, "
482 "response text is not JSON"
483 )
484 self.logger.info("Full Response: {}".format(str(response)))
485 try:
486 self.logger.info("Response Text: {}".format(str(response.text)))
487 except Exception:
488 pass
489
490 if response.status_code == 429:
491 sleep_minutes = 5
492 self.logger.warning(
493 "That means 'too many requests'. I'll go to sleep "
494 "for {} minutes.".format(sleep_minutes)
495 )
496 time.sleep(sleep_minutes * 60)
497 elif response.status_code == 400:
498 response_data = json.loads(response.text)
499
500 # PERFORM Interactive Two-Factor Authentication
501 if response_data.get("two_factor_required"):
502 try:
503 self.last_response = response
504 self.last_json = json.loads(response.text)
505 except Exception:
506 pass
507 return self.two_factor_auth()
508 # End of Interactive Two-Factor Authentication
509 else:
510 msg = "Instagram's error message: {}"
511 self.logger.info(msg.format(response_data.get("message")))
512 if "error_type" in response_data:
513 msg = "Error type: {}".format(response_data["error_type"])
514 self.logger.info(msg)
515
516 # For debugging
517 try:
518 self.last_response = response
519 self.last_json = json.loads(response.text)
520 except Exception:
521 pass
522 return False
523
524 @property
525 def cookie_dict(self):
526 return self.session.cookies.get_dict()
527
528 @property
529 def token(self):
530 return self.cookie_dict["csrftoken"]
531
532 @property
533 def user_id(self):
534 return self.cookie_dict["ds_user_id"]
535
536 @property
537 def rank_token(self):
538 return "{}_{}".format(self.user_id, self.uuid)
539
540 @property
541 def default_data(self):
542 return {"_uuid": self.uuid, "_uid": self.user_id, "_csrftoken": self.token}
543
544 def json_data(self, data=None):
545 """Adds the default_data to data and dumps it to a json."""
546 if data is None:
547 data = {}
548 data.update(self.default_data)
549 return json.dumps(data)
550
551 def action_data(self, data):
552 _data = {"radio_type": "wifi-none", "device_id": self.device_id}
553 data.update(_data)
554 return data
555
556 def auto_complete_user_list(self):
557 return self.send_request("friendships/autocomplete_user_list/")
558
559 def batch_fetch(self):
560 data = {
561 "scale": 3,
562 "version": 1,
563 "vc_policy": "default",
564 "surfaces_to_triggers": '{"5734":["instagram_feed_prompt"],'
565 + '"4715":["instagram_feed_header"],'
566 + '"5858":["instagram_feed_tool_tip"]}', # noqa
567 "surfaces_to_queries": (
568 '{"5734":"viewer() {eligible_promotions.trigger_context_v2(<tr'
569 "igger_context_v2>).ig_parameters(<ig_parameters>).trigger_nam"
570 "e(<trigger_name>).surface_nux_id(<surface>).external_gating_p"
571 "ermitted_qps(<external_gating_permitted_qps>).supports_client"
572 "_filters(true).include_holdouts(true) {edges {client_ttl_seco"
573 "nds,log_eligibility_waterfall,is_holdout,priority,time_range "
574 "{start,end},node {id,promotion_id,logging_data,max_impression"
575 "s,triggers,contextual_filters {clause_type,filters {filter_ty"
576 "pe,unknown_action,value {name,required,bool_value,int_value,s"
577 "tring_value},extra_datas {name,required,bool_value,int_value,"
578 "string_value}},clauses {clause_type,filters {filter_type,unkn"
579 "own_action,value {name,required,bool_value,int_value,string_v"
580 "alue},extra_datas {name,required,bool_value,int_value,string_"
581 "value}},clauses {clause_type,filters {filter_type,unknown_act"
582 "ion,value {name,required,bool_value,int_value,string_value},e"
583 "xtra_datas {name,required,bool_value,int_value,string_value}}"
584 ",clauses {clause_type,filters {filter_type,unknown_action,val"
585 "ue {name,required,bool_value,int_value,string_value},extra_da"
586 "tas {name,required,bool_value,int_value,string_value}}}}}},is"
587 "_uncancelable,template {name,parameters {name,required,bool_v"
588 "alue,string_value,color_value,}},creatives {title {text},cont"
589 "ent {text},footer {text},social_context {text},social_context"
590 "_images,primary_action{title {text},url,limit,dismiss_promoti"
591 "on},secondary_action{title {text},url,limit,dismiss_promotion"
592 "},dismiss_action{title {text},url,limit,dismiss_promotion},im"
593 'age.scale(<scale>) {uri,width,height}}}}}}","4715":"viewer() '
594 "{eligible_promotions.trigger_context_v2(<trigger_context_v2>)"
595 ".ig_parameters(<ig_parameters>).trigger_name(<trigger_name>)."
596 "surface_nux_id(<surface>).external_gating_permitted_qps(<exte"
597 "rnal_gating_permitted_qps>).supports_client_filters(true).inc"
598 "lude_holdouts(true) {edges {client_ttl_seconds,log_eligibilit"
599 "y_waterfall,is_holdout,priority,time_range {start,end},node {"
600 "id,promotion_id,logging_data,max_impressions,triggers,context"
601 "ual_filters {clause_type,filters {filter_type,unknown_action,"
602 "value {name,required,bool_value,int_value,string_value},extra"
603 "_datas {name,required,bool_value,int_value,string_value}},cla"
604 "uses {clause_type,filters {filter_type,unknown_action,value {"
605 "name,required,bool_value,int_value,string_value},extra_datas "
606 "{name,required,bool_value,int_value,string_value}},clauses {c"
607 "lause_type,filters {filter_type,unknown_action,value {name,re"
608 "quired,bool_value,int_value,string_value},extra_datas {name,r"
609 "equired,bool_value,int_value,string_value}},clauses {clause_t"
610 "ype,filters {filter_type,unknown_action,value {name,required,"
611 "bool_value,int_value,string_value},extra_datas {name,required"
612 ",bool_value,int_value,string_value}}}}}},is_uncancelable,temp"
613 "late {name,parameters {name,required,bool_value,string_value,"
614 "color_value,}},creatives {title {text},content {text},footer "
615 "{text},social_context {text},social_context_images,primary_ac"
616 "tion{title {text},url,limit,dismiss_promotion},secondary_acti"
617 "on{title {text},url,limit,dismiss_promotion},dismiss_action{t"
618 "itle {text},url,limit,dismiss_promotion},image.scale(<scale>)"
619 ' {uri,width,height}}}}}}","5858":"viewer() {eligible_promotio'
620 "ns.trigger_context_v2(<trigger_context_v2>).ig_parameters(<ig"
621 "_parameters>).trigger_name(<trigger_name>).surface_nux_id(<su"
622 "rface>).external_gating_permitted_qps(<external_gating_permit"
623 "ted_qps>).supports_client_filters(true).include_holdouts(true"
624 ") {edges {client_ttl_seconds,log_eligibility_waterfall,is_hol"
625 "dout,priority,time_range {start,end},node {id,promotion_id,lo"
626 "gging_data,max_impressions,triggers,contextual_filters {claus"
627 "e_type,filters {filter_type,unknown_action,value {name,requir"
628 "ed,bool_value,int_value,string_value},extra_datas {name,requi"
629 "red,bool_value,int_value,string_value}},clauses {clause_type,"
630 "filters {filter_type,unknown_action,value {name,required,bool"
631 "_value,int_value,string_value},extra_datas {name,required,boo"
632 "l_value,int_value,string_value}},clauses {clause_type,filters"
633 " {filter_type,unknown_action,value {name,required,bool_value,"
634 "int_value,string_value},extra_datas {name,required,bool_value"
635 ",int_value,string_value}},clauses {clause_type,filters {filte"
636 "r_type,unknown_action,value {name,required,bool_value,int_val"
637 "ue,string_value},extra_datas {name,required,bool_value,int_va"
638 "lue,string_value}}}}}},is_uncancelable,template {name,paramet"
639 "ers {name,required,bool_value,string_value,color_value,}},cre"
640 "atives {title {text},content {text},footer {text},social_cont"
641 "ext {text},social_context_images,primary_action{title {text},"
642 "url,limit,dismiss_promotion},secondary_action{title {text},ur"
643 "l,limit,dismiss_promotion},dismiss_action{title {text},url,li"
644 "mit,dismiss_promotion},image.scale(<scale>) {uri,width,height"
645 '}}}}}}"}'
646 ), # noqa (Just copied from request)
647 }
648 data = self.json_data(data)
649 return self.send_request("qp/batch_fetch/", data)
650
651 def get_timeline_feed(self, options=[]):
652 headers = {"X-Ads-Opt-Out": "0", "X-DEVICE-ID": self.uuid}
653 data = {
654 "_csrftoken": self.token,
655 "_uuid": self.uuid,
656 "is_prefetch": 0,
657 "phone_id": self.phone_id,
658 "device_id": self.uuid,
659 "client_session_id": self.client_session_id,
660 "battery_level": random.randint(25, 100),
661 "is_charging": random.randint(0, 1),
662 "will_sound_on": random.randint(0, 1),
663 "is_on_screen": True,
664 "timezone_offset": datetime.datetime.now(pytz.timezone("CET")).strftime(
665 "%z"
666 ),
667 }
668
669 if "is_pull_to_refresh" in options:
670 data["reason"] = "pull_to_refresh"
671 data["is_pull_to_refresh"] = "1"
672 elif "is_pull_to_refresh" not in options:
673 data["reason"] = "cold_start_fetch"
674 data["is_pull_to_refresh"] = "0"
675
676 # unseen_posts
677 # feed_view_info
678 # seen_posts
679
680 if "push_disabled" in options:
681 data["push_disabled"] = "true"
682
683 if "recovered_from_crash" in options:
684 data["recovered_from_crash"] = "1"
685
686 data = json.dumps(data)
687 return self.send_request(
688 "feed/timeline/", data, with_signature=False, headers=headers
689 )
690
691 def get_megaphone_log(self):
692 return self.send_request("megaphone/log/")
693
694 def expose(self):
695 data = self.json_data(
696 {"id": self.uuid, "experiment": "ig_android_profile_contextual_feed"}
697 )
698 return self.send_request("qe/expose/", data)
699
700 # ====== PHOTO METHODS ====== #
701 def upload_photo(
702 self,
703 photo,
704 caption=None,
705 upload_id=None,
706 from_video=False,
707 force_resize=False,
708 options={},
709 ):
710 """Upload photo to Instagram
711
712 @param photo Path to photo file (String)
713 @param caption Media description (String)
714 @param upload_id Unique upload_id (String). When None, then
715 generate automatically
716 @param from_video A flag that signals whether the photo is loaded
717 from the video or by itself
718 (Boolean, DEPRECATED: not used)
719 @param force_resize Force photo resize (Boolean)
720 @param options Object with difference options, e.g.
721 configure_timeout, rename (Dict)
722 Designed to reduce the number of function
723 arguments! This is the simplest request object.
724
725 @return Boolean
726 """
727 return upload_photo(
728 self, photo, caption, upload_id, from_video, force_resize, options
729 )
730
731 def download_photo(self, media_id, filename, media=False, folder="photos"):
732 return download_photo(self, media_id, filename, media, folder)
733
734 def configure_photo(self, upload_id, photo, caption=""):
735 return configure_photo(self, upload_id, photo, caption)
736
737 # ====== STORY METHODS ====== #
738 def download_story(self, filename, story_url, username):
739 return download_story(self, filename, story_url, username)
740
741 def upload_story_photo(self, photo, upload_id=None):
742 return upload_story_photo(self, photo, upload_id)
743
744 def configure_story(self, upload_id, photo):
745 return configure_story(self, upload_id, photo)
746
747 # ====== VIDEO METHODS ====== #
748 def upload_video(
749 self, video, caption=None, upload_id=None, thumbnail=None, options={}
750 ):
751 """Upload video to Instagram
752
753 @param video Path to video file (String)
754 @param caption Media description (String)
755 @param upload_id Unique upload_id (String). When None, then
756 generate automatically
757 @param thumbnail Path to thumbnail for video (String). When None,
758 then thumbnail is generate automatically
759 @param options Object with difference options, e.g.
760 configure_timeout, rename_thumbnail, rename (Dict)
761 Designed to reduce the number of function arguments!
762 This is the simplest request object.
763
764 @return Object with state of uploading to
765 Instagram (or False)
766 """
767 return upload_video(self, video, caption, upload_id, thumbnail, options)
768
769 def download_video(self, media_id, filename, media=False, folder="video"):
770 return download_video(self, media_id, filename, media, folder)
771
772 def configure_video(
773 self,
774 upload_id,
775 video,
776 thumbnail,
777 width,
778 height,
779 duration,
780 caption="",
781 options={},
782 ):
783 """Post Configure Video
784 (send caption, thumbnail andmore else to Instagram)
785
786 @param upload_id Unique upload_id (String). Received
787 from "upload_video"
788 @param video Path to video file (String)
789 @param thumbnail Path to thumbnail for video (String). When None,
790 then thumbnail is generate automatically
791 @param width Width in px (Integer)
792 @param height Height in px (Integer)
793 @param duration Duration in seconds (Integer)
794 @param caption Media description (String)
795 @param options Object with difference options, e.g.
796 configure_timeout, rename_thumbnail, rename (Dict)
797 Designed to reduce the number of function arguments!
798 This is the simplest request object.
799 """
800 return configure_video(
801 self, upload_id, video, thumbnail, width, height, duration, caption, options
802 )
803
804 # ====== MEDIA METHODS ====== #
805 def edit_media(self, media_id, captionText=""):
806 data = self.json_data({"caption_text": captionText})
807 url = "media/{media_id}/edit_media/".format(media_id=media_id)
808 return self.send_request(url, data)
809
810 def remove_self_tag(self, media_id):
811 data = self.json_data()
812 url = "media/{media_id}/remove/".format(media_id=media_id)
813 return self.send_request(url, data)
814
815 def media_info(self, media_id):
816 # data = self.json_data({'media_id': media_id})
817 url = "media/{media_id}/info/".format(media_id=media_id)
818 return self.send_request(url)
819
820 def archive_media(self, media, undo=False):
821 action = "only_me" if not undo else "undo_only_me"
822 data = self.json_data({"media_id": media["id"]})
823 url = "media/{media_id}/{action}/?media_type={media_type}".format(
824 media_id=media["id"], action=action, media_type=media["media_type"]
825 )
826 return self.send_request(url, data)
827
828 def delete_media(self, media):
829 data = self.json_data({"media_id": media.get("id")})
830 url = "media/{media_id}/delete/".format(media_id=media.get("id"))
831 return self.send_request(url, data)
832
833 def gen_user_breadcrumb(self, size):
834 key = "iN4$aGr0m"
835 dt = int(time.time() * 1000)
836
837 time_elapsed = random.randint(500, 1500) + size * random.randint(500, 1500)
838 text_change_event_count = max(1, size / random.randint(3, 5))
839
840 data = "{size!s} {elapsed!s} {count!s} {dt!s}".format(
841 **{
842 "size": size,
843 "elapsed": time_elapsed,
844 "count": text_change_event_count,
845 "dt": dt,
846 }
847 )
848 return "{!s}\n{!s}\n".format(
849 base64.b64encode(
850 hmac.new(
851 key.encode("ascii"), data.encode("ascii"), digestmod=hashlib.sha256
852 ).digest()
853 ),
854 base64.b64encode(data.encode("ascii")),
855 )
856
857 def comment(self, media_id, comment_text):
858 return self.send_request(
859 endpoint="media/{media_id}/comment/".format(media_id=media_id),
860 post=self.json_data(
861 self.action_data(
862 {
863 "container_module": "comments_v2",
864 "user_breadcrumb": self.gen_user_breadcrumb(len(comment_text)),
865 "idempotence_token": self.generate_UUID(True),
866 "comment_text": comment_text,
867 }
868 )
869 ),
870 )
871
872 def reply_to_comment(self, media_id, comment_text, parent_comment_id):
873 data = self.json_data(
874 {"comment_text": comment_text, "replied_to_comment_id": parent_comment_id}
875 )
876 url = "media/{media_id}/comment/".format(media_id=media_id)
877 return self.send_request(url, data)
878
879 def delete_comment(self, media_id, comment_id):
880 data = self.json_data()
881 url = "media/{media_id}/comment/{comment_id}/delete/"
882 url = url.format(media_id=media_id, comment_id=comment_id)
883 return self.send_request(url, data)
884
885 def get_comment_likers(self, comment_id):
886 url = "media/{comment_id}/comment_likers/?".format(comment_id=comment_id)
887 return self.send_request(url)
888
889 def get_media_likers(self, media_id):
890 url = "media/{media_id}/likers/?".format(media_id=media_id)
891 return self.send_request(url)
892
893 def like_comment(self, comment_id):
894 data = self.json_data(
895 {
896 "is_carousel_bumped_post": "false",
897 "container_module": "comments_v2",
898 "feed_position": "0",
899 }
900 )
901 url = "media/{comment_id}/comment_like/".format(comment_id=comment_id)
902 return self.send_request(url, data)
903
904 def unlike_comment(self, comment_id):
905 data = self.json_data(
906 {
907 "is_carousel_bumped_post": "false",
908 "container_module": "comments_v2",
909 "feed_position": "0",
910 }
911 )
912 url = "media/{comment_id}/comment_unlike/".format(comment_id=comment_id)
913 return self.send_request(url, data)
914
915 # From profile => "is_carousel_bumped_post":"false",
916 # "container_module":"feed_contextual_profile", "feed_position":"0" # noqa
917 # From home/feed => "inventory_source":"media_or_ad",
918 # "is_carousel_bumped_post":"false", "container_module":"feed_timeline",
919 # "feed_position":"0" # noqa
920 def like(
921 self,
922 media_id,
923 double_tap=None,
924 container_module="feed_short_url",
925 feed_position=0,
926 username=None,
927 user_id=None,
928 hashtag_name=None,
929 hashtag_id=None,
930 entity_page_name=None,
931 entity_page_id=None,
932 ):
933
934 data = self.action_data(
935 {
936 "media_id": media_id,
937 "container_module": container_module,
938 "feed_position": str(feed_position),
939 "is_carousel_bumped_post": "false",
940 }
941 )
942 if container_module == "feed_timeline":
943 data.update({"inventory_source": "media_or_ad"})
944 if username:
945 data.update({"username": username, "user_id": user_id})
946 if hashtag_name:
947 data.update({"hashtag_name": hashtag_name, "hashtag_id": hashtag_id})
948 if entity_page_name:
949 data.update(
950 {"entity_page_name": entity_page_name, "entity_page_id": entity_page_id}
951 )
952 if double_tap is None:
953 double_tap = random.randint(0, 1)
954 json_data = self.json_data(data)
955 # TODO: comment out debug log out when done
956 self.logger.debug("post data: {}".format(json_data))
957 return self.send_request(
958 endpoint="media/{media_id}/like/".format(media_id=media_id),
959 post=json_data,
960 extra_sig=["d={}".format(double_tap)],
961 )
962
963 def unlike(self, media_id):
964 data = self.json_data(
965 {
966 "media_id": media_id,
967 "radio_type": "wifi-none",
968 "is_carousel_bumped_post": "false",
969 "container_module": "photo_view_other",
970 "feed_position": "0",
971 }
972 )
973 url = "media/{media_id}/unlike/".format(media_id=media_id)
974 return self.send_request(url, data)
975
976 def get_media_comments(self, media_id, max_id=""):
977 url = "media/{media_id}/comments/".format(media_id=media_id)
978 if max_id:
979 url += "?max_id={max_id}".format(max_id=max_id)
980 return self.send_request(url)
981
982 def explore(self, is_prefetch=False):
983 data = {
984 "is_prefetch": is_prefetch,
985 "is_from_promote": False,
986 "timezone_offset": datetime.datetime.now(pytz.timezone("CET")).strftime(
987 "%z"
988 ),
989 "session_id": self.client_session_id,
990 "supported_capabilities_new": config.SUPPORTED_CAPABILITIES,
991 }
992 if is_prefetch:
993 data["max_id"] = 0
994 data["module"] = "explore_popular"
995 data = json.dumps(data)
996 return self.send_request("discover/explore/", data)
997
998 def get_username_info(self, user_id):
999 url = "users/{user_id}/info/".format(user_id=user_id)
1000 return self.send_request(url)
1001
1002 def get_self_username_info(self):
1003 return self.get_username_info(self.user_id)
1004
1005 def get_recent_activity(self):
1006 return self.send_request("news/inbox")
1007
1008 def get_following_recent_activity(self):
1009 return self.send_request("news")
1010
1011 def get_user_tags(self, user_id):
1012 url = (
1013 "usertags/{user_id}/feed/?rank_token=" "{rank_token}&ranked_content=true&"
1014 ).format(user_id=user_id, rank_token=self.rank_token)
1015 return self.send_request(url)
1016
1017 def get_self_user_tags(self):
1018 return self.get_user_tags(self.user_id)
1019
1020 def get_geo_media(self, user_id):
1021 url = "maps/user/{user_id}/".format(user_id=user_id)
1022 return self.send_request(url)
1023
1024 def get_self_geo_media(self):
1025 return self.get_geo_media(self.user_id)
1026
1027 def sync_from_adress_book(self, contacts):
1028 url = "address_book/link/?include=extra_display_name,thumbnails"
1029 return self.send_request(url, "contacts=" + json.dumps(contacts))
1030
1031 # ====== FEED METHODS ====== #
1032 def tag_feed(self, tag):
1033 url = "feed/tag/{tag}/?rank_token={rank_token}&ranked_content=true&"
1034 return self.send_request(url.format(tag=tag, rank_token=self.rank_token))
1035
1036 def get_timeline(self):
1037 url = "feed/timeline/?rank_token={rank_token}&ranked_content=true&"
1038 return self.send_request(url.format(rank_token=self.rank_token))
1039
1040 def get_archive_feed(self):
1041 url = "feed/only_me_feed/?rank_token={rank_token}&ranked_content=true&"
1042 return self.send_request(url.format(rank_token=self.rank_token))
1043
1044 def get_user_feed(self, user_id, max_id="", min_timestamp=None):
1045 url = (
1046 "feed/user/{user_id}/?max_id={max_id}&min_timestamp="
1047 "{min_timestamp}&rank_token={rank_token}&ranked_content=true"
1048 # noqa
1049 ).format(
1050 user_id=user_id,
1051 max_id=max_id,
1052 min_timestamp=min_timestamp,
1053 rank_token=self.rank_token,
1054 )
1055 return self.send_request(url)
1056
1057 def get_self_user_feed(self, max_id="", min_timestamp=None):
1058 return self.get_user_feed(self.user_id, max_id, min_timestamp)
1059
1060 def get_hashtag_feed(self, hashtag, max_id=""):
1061 url = (
1062 "feed/tag/{hashtag}/?max_id={max_id}"
1063 "&rank_token={rank_token}&ranked_content=true&"
1064 ).format(hashtag=hashtag, max_id=max_id, rank_token=self.rank_token)
1065 return self.send_request(url)
1066
1067 def get_location_feed(self, location_id, max_id=""):
1068 url = (
1069 "feed/location/{location_id}/?max_id={max_id}"
1070 "&rank_token={rank_token}&ranked_content=true&"
1071 ).format(location_id=location_id, max_id=max_id, rank_token=self.rank_token)
1072 return self.send_request(url)
1073
1074 def get_popular_feed(self):
1075 url = (
1076 "feed/popular/?people_teaser_supported=1"
1077 "&rank_token={rank_token}&ranked_content=true&"
1078 )
1079 return self.send_request(url.format(rank_token=self.rank_token))
1080
1081 def get_liked_media(self, max_id=""):
1082 url = "feed/liked/?max_id={max_id}".format(max_id=max_id)
1083 return self.send_request(url)
1084
1085 # ====== FRIENDSHIPS METHODS ====== #
1086 def get_user_followings(self, user_id, max_id=""):
1087 url = (
1088 "friendships/{user_id}/following/?max_id={max_id}"
1089 "&ig_sig_key_version={sig_key}&rank_token={rank_token}"
1090 ).format(
1091 user_id=user_id,
1092 max_id=max_id,
1093 sig_key=config.SIG_KEY_VERSION,
1094 rank_token=self.rank_token,
1095 )
1096 return self.send_request(url)
1097
1098 def get_self_users_following(self):
1099 return self.get_user_followings(self.user_id)
1100
1101 def get_user_followers(self, user_id, max_id=""):
1102 url = "friendships/{user_id}/followers/?rank_token={rank_token}"
1103 url = url.format(user_id=user_id, rank_token=self.rank_token)
1104 if max_id:
1105 url += "&max_id={max_id}".format(max_id=max_id)
1106 return self.send_request(url)
1107
1108 def get_self_user_followers(self):
1109 return self.followers
1110
1111 def follow(self, user_id):
1112 data = self.json_data(self.action_data({"user_id": user_id}))
1113 self.logger.debug("post data: {}".format(data))
1114 url = "friendships/create/{user_id}/".format(user_id=user_id)
1115 return self.send_request(url, data)
1116
1117 def unfollow(self, user_id):
1118 data = self.json_data({"user_id": user_id, "radio_type": "wifi-none"})
1119 url = "friendships/destroy/{user_id}/".format(user_id=user_id)
1120 return self.send_request(url, data)
1121
1122 def remove_follower(self, user_id):
1123 data = self.json_data({"user_id": user_id})
1124 url = "friendships/remove_follower/{user_id}/".format(user_id=user_id)
1125 return self.send_request(url, data)
1126
1127 def block(self, user_id):
1128 data = self.json_data({"user_id": user_id})
1129 url = "friendships/block/{user_id}/".format(user_id=user_id)
1130 return self.send_request(url, data)
1131
1132 def unblock(self, user_id):
1133 data = self.json_data({"user_id": user_id})
1134 url = "friendships/unblock/{user_id}/".format(user_id=user_id)
1135 return self.send_request(url, data)
1136
1137 def user_friendship(self, user_id):
1138 data = self.json_data({"user_id": user_id})
1139 url = "friendships/show/{user_id}/".format(user_id=user_id)
1140 return self.send_request(url, data)
1141
1142 def mute_user(self, user, mute_story=False, mute_posts=False):
1143 data_dict = {}
1144 if mute_posts:
1145 data_dict["target_posts_author_id"] = user
1146 if mute_story:
1147 data_dict["target_reel_author_id"] = user
1148 data = self.json_data(data_dict)
1149 url = "friendships/mute_posts_or_story_from_follow/"
1150 return self.send_request(url, data)
1151
1152 def get_muted_friends(self, muted_content):
1153 # ToDo update endpoints for posts
1154 if muted_content == "stories":
1155 url = "friendships/muted_reels"
1156 elif muted_content == "posts":
1157 raise NotImplementedError(
1158 "API does not support getting friends "
1159 "with muted {}".format(muted_content)
1160 )
1161 else:
1162 raise NotImplementedError(
1163 "API does not support getting friends"
1164 " with muted {}".format(muted_content)
1165 )
1166
1167 return self.send_request(url)
1168
1169 def unmute_user(self, user, unmute_posts=False, unmute_stories=False):
1170 data_dict = {}
1171 if unmute_posts:
1172 data_dict["target_posts_author_id"] = user
1173 if unmute_stories:
1174 data_dict["target_reel_author_id"] = user
1175 data = self.json_data(data_dict)
1176 url = "friendships/unmute_posts_or_story_from_follow/"
1177 return self.send_request(url, data)
1178
1179 def get_pending_friendships(self):
1180 """Get pending follow requests"""
1181 url = "friendships/pending/"
1182 return self.send_request(url)
1183
1184 def approve_pending_friendship(self, user_id):
1185 data = self.json_data(
1186 {
1187 "_uuid": self.uuid,
1188 "_uid": self.user_id,
1189 "user_id": user_id,
1190 "_csrftoken": self.token,
1191 }
1192 )
1193 url = "friendships/approve/{}/".format(user_id)
1194 return self.send_request(url, post=data)
1195
1196 def reject_pending_friendship(self, user_id):
1197 data = self.json_data(
1198 {
1199 "_uuid": self.uuid,
1200 "_uid": self.user_id,
1201 "user_id": user_id,
1202 "_csrftoken": self.token,
1203 }
1204 )
1205 url = "friendships/ignore/{}/".format(user_id)
1206 return self.send_request(url, post=data)
1207
1208 def get_direct_share(self):
1209 return self.send_request("direct_share/inbox/?")
1210
1211 @staticmethod
1212 def _prepare_recipients(users, thread_id=None, use_quotes=False):
1213 if not isinstance(users, list):
1214 print("Users must be an list")
1215 return False
1216 result = {"users": "[[{}]]".format(",".join(users))}
1217 if thread_id:
1218 template = '["{}"]' if use_quotes else "[{}]"
1219 result["thread"] = template.format(thread_id)
1220 return result
1221
1222 @staticmethod
1223 def generate_signature(data):
1224 body = (
1225 hmac.new(
1226 config.IG_SIG_KEY.encode("utf-8"), data.encode("utf-8"), hashlib.sha256
1227 ).hexdigest()
1228 + "."
1229 + urllib.parse.quote(data)
1230 )
1231 signature = "ig_sig_key_version={sig_key}&signed_body={body}"
1232 return signature.format(sig_key=config.SIG_KEY_VERSION, body=body)
1233
1234 @staticmethod
1235 def generate_device_id(seed):
1236 volatile_seed = "12345"
1237 m = hashlib.md5()
1238 m.update(seed.encode("utf-8") + volatile_seed.encode("utf-8"))
1239 return "android-" + m.hexdigest()[:16]
1240
1241 @staticmethod
1242 def get_seed(*args):
1243 m = hashlib.md5()
1244 m.update(b"".join([arg.encode("utf-8") for arg in args]))
1245 return m.hexdigest()
1246
1247 @staticmethod
1248 def generate_UUID(uuid_type):
1249 generated_uuid = str(uuid.uuid4())
1250 if uuid_type:
1251 return generated_uuid
1252 else:
1253 return generated_uuid.replace("-", "")
1254
1255 def get_total_followers_or_followings( # noqa: C901
1256 self,
1257 user_id,
1258 amount=None,
1259 which="followers",
1260 filter_private=False,
1261 filter_business=False,
1262 filter_verified=False,
1263 usernames=False,
1264 to_file=None,
1265 overwrite=False,
1266 ):
1267 from io import StringIO
1268
1269 if which == "followers":
1270 key = "follower_count"
1271 get = self.get_user_followers
1272 elif which == "followings":
1273 key = "following_count"
1274 get = self.get_user_followings
1275
1276 sleep_track = 0
1277 result = []
1278 next_max_id = ""
1279 self.get_username_info(user_id)
1280 username_info = self.last_json
1281 if "user" in username_info:
1282 total = amount or username_info["user"][key]
1283
1284 if total > 200000:
1285 print(
1286 "Consider temporarily saving the result of this big "
1287 "operation. This will take a while.\n"
1288 )
1289 else:
1290 return False
1291 if filter_business:
1292 print(
1293 "--> You are going to filter business accounts. "
1294 "This will take time! <--"
1295 )
1296 if to_file is not None:
1297 if os.path.isfile(to_file):
1298 if not overwrite:
1299 print("File `{}` already exists. Not overwriting.".format(to_file))
1300 return False
1301 else:
1302 print("Overwriting file `{}`".format(to_file))
1303 with open(to_file, "w"):
1304 pass
1305 desc = "Getting {} of {}".format(which, user_id)
1306 with tqdm(total=total, desc=desc, leave=True) as pbar:
1307 while True:
1308 get(user_id, next_max_id)
1309 last_json = self.last_json
1310 try:
1311 with open(to_file, "a") if to_file is not None else StringIO() as f:
1312 for item in last_json["users"]:
1313 if filter_private and item["is_private"]:
1314 continue
1315 if filter_business:
1316 time.sleep(2 * random.random())
1317 self.get_username_info(item["pk"])
1318 item_info = self.last_json
1319 if item_info["user"]["is_business"]:
1320 continue
1321 if filter_verified and item["is_verified"]:
1322 continue
1323 if to_file is not None:
1324 if usernames:
1325 f.write("{}\n".format(item["username"]))
1326 else:
1327 f.write("{}\n".format(item["pk"]))
1328 result.append(item)
1329 pbar.update(1)
1330 sleep_track += 1
1331 if sleep_track >= 20000:
1332 sleep_time = random.uniform(120, 180)
1333 msg = (
1334 "\nWaiting {:.2f} min. " "due to too many requests."
1335 ).format(sleep_time / 60)
1336 print(msg)
1337 time.sleep(sleep_time)
1338 sleep_track = 0
1339 if not last_json["users"] or len(result) >= total:
1340 return result[:total]
1341 except Exception as e:
1342 print("ERROR: {}".format(e))
1343 return result[:total]
1344
1345 if last_json["big_list"] is False:
1346 return result[:total]
1347
1348 next_max_id = last_json.get("next_max_id", "")
1349
1350 def get_total_followers(self, user_id, amount=None):
1351 return self.get_total_followers_or_followings(user_id, amount, "followers")
1352
1353 def get_total_followings(self, user_id, amount=None):
1354 return self.get_total_followers_or_followings(user_id, amount, "followings")
1355
1356 def get_total_user_feed(self, user_id, min_timestamp=None):
1357 return self.get_last_user_feed(
1358 user_id, amount=float("inf"), min_timestamp=min_timestamp
1359 )
1360
1361 def get_last_user_feed(self, user_id, amount, min_timestamp=None):
1362 user_feed = []
1363 next_max_id = ""
1364 while True:
1365 if len(user_feed) >= float(amount):
1366 # one request returns max 13 items
1367 return user_feed[:amount]
1368 self.get_user_feed(user_id, next_max_id, min_timestamp)
1369 last_json = self.last_json
1370 if "items" not in last_json:
1371 return user_feed
1372 user_feed += last_json["items"]
1373 if not last_json.get("more_available"):
1374 return user_feed
1375 next_max_id = last_json.get("next_max_id", "")
1376
1377 def get_total_hashtag_feed(self, hashtag_str, amount=100):
1378 hashtag_feed = []
1379 next_max_id = ""
1380
1381 with tqdm(total=amount, desc="Getting hashtag media.", leave=False) as pbar:
1382 while True:
1383 self.get_hashtag_feed(hashtag_str, next_max_id)
1384 last_json = self.last_json
1385 if "items" not in last_json:
1386 return hashtag_feed[:amount]
1387 items = last_json["items"]
1388 try:
1389 pbar.update(len(items))
1390 hashtag_feed += items
1391 if not items or len(hashtag_feed) >= amount:
1392 return hashtag_feed[:amount]
1393 except Exception:
1394 return hashtag_feed[:amount]
1395 next_max_id = last_json.get("next_max_id", "")
1396
1397 def get_total_self_user_feed(self, min_timestamp=None):
1398 return self.get_total_user_feed(self.user_id, min_timestamp)
1399
1400 def get_total_self_followers(self):
1401 return self.get_total_followers(self.user_id)
1402
1403 def get_total_self_followings(self):
1404 return self.get_total_followings(self.user_id)
1405
1406 def get_total_liked_media(self, scan_rate=1):
1407 next_id = ""
1408 liked_items = []
1409 for _ in range(scan_rate):
1410 self.get_liked_media(next_id)
1411 last_json = self.last_json
1412 next_id = last_json.get("next_max_id", "")
1413 liked_items += last_json["items"]
1414 return liked_items
1415
1416 # ====== ACCOUNT / PERSONAL INFO METHODS ====== #
1417 def change_password(self, new_password):
1418 data = self.json_data(
1419 {
1420 "old_password": self.password,
1421 "new_password1": new_password,
1422 "new_password2": new_password,
1423 }
1424 )
1425 return self.send_request("accounts/change_password/", data)
1426
1427 def remove_profile_picture(self):
1428 data = self.json_data()
1429 return self.send_request("accounts/remove_profile_picture/", data)
1430
1431 def set_private_account(self):
1432 data = self.json_data()
1433 return self.send_request("accounts/set_private/", data)
1434
1435 def set_public_account(self):
1436 data = self.json_data()
1437 return self.send_request("accounts/set_public/", data)
1438
1439 def set_name_and_phone(self, name="", phone=""):
1440 return self.send_request(
1441 "accounts/set_phone_and_name/",
1442 self.json_data({"first_name": name, "phone_number": phone}),
1443 )
1444
1445 def get_profile_data(self):
1446 data = self.json_data()
1447 return self.send_request("accounts/current_user/?edit=true", data)
1448
1449 def edit_profile(self, url, phone, first_name, biography, email, gender):
1450 data = self.json_data(
1451 {
1452 "external_url": url,
1453 "phone_number": phone,
1454 "username": self.username,
1455 "full_name": first_name,
1456 "biography": biography,
1457 "email": email,
1458 "gender": gender,
1459 }
1460 )
1461 return self.send_request("accounts/edit_profile/", data)
1462
1463 def fb_user_search(self, query):
1464 url = (
1465 "fbsearch/topsearch/?context=blended&query={query}"
1466 "&rank_token={rank_token}"
1467 )
1468 return self.send_request(url.format(query=query, rank_token=self.rank_token))
1469
1470 def search_users(self, query):
1471 url = (
1472 "users/search/?ig_sig_key_version={sig_key}"
1473 "&is_typeahead=true&query={query}&rank_token={rank_token}"
1474 )
1475 return self.send_request(
1476 url.format(
1477 sig_key=config.SIG_KEY_VERSION, query=query, rank_token=self.rank_token
1478 )
1479 )
1480
1481 def search_username(self, username):
1482 url = "users/{username}/usernameinfo/".format(username=username)
1483 return self.send_request(url)
1484
1485 def search_tags(self, query):
1486 url = "tags/search/?is_typeahead=true&q={query}" "&rank_token={rank_token}"
1487 return self.send_request(url.format(query=query, rank_token=self.rank_token))
1488
1489 def search_location(self, query="", lat=None, lng=None):
1490 url = (
1491 "fbsearch/places/?rank_token={rank_token}"
1492 "&query={query}&lat={lat}&lng={lng}"
1493 )
1494 url = url.format(rank_token=self.rank_token, query=query, lat=lat, lng=lng)
1495 return self.send_request(url)
1496
1497 def get_user_reel(self, user_id):
1498 url = "feed/user/{}/reel_media/".format(user_id)
1499 return self.send_request(url)
1500
1501 def get_reels_tray_feed(
1502 self, reason="pull_to_refresh"
1503 ): # reason can be = cold_start, pull_to_refresh
1504 data = {
1505 "supported_capabilities_new": config.SUPPORTED_CAPABILITIES,
1506 "reason": reason,
1507 "_csrftoken": self.token,
1508 "_uuid": self.uuid,
1509 }
1510 data = json.dumps(data)
1511 return self.send_request("feed/reels_tray/", data)
1512
1513 def get_users_reel(self, user_ids):
1514 """
1515 Input: user_ids - a list of user_id
1516 Output: dictionary: user_id - stories data.
1517 Basically, for each user output the same as after
1518 self.get_user_reel
1519 """
1520 url = "feed/reels_media/"
1521 res = self.send_request(
1522 url, post=self.json_data({"user_ids": [str(x) for x in user_ids]})
1523 )
1524 if res:
1525 return self.last_json["reels"] if "reels" in self.last_json else []
1526 return []
1527
1528 def see_reels(self, reels):
1529 """
1530 Input - the list of reels jsons
1531 They can be aquired by using get_users_reel()
1532 or get_user_reel() methods
1533 """
1534 if not isinstance(reels, list):
1535 # In case of only one reel as input
1536 reels = [reels]
1537
1538 story_seen = {}
1539 now = int(time.time())
1540 for i, story in enumerate(
1541 sorted(reels, key=lambda m: m["taken_at"], reverse=True)
1542 ):
1543 story_seen_at = now - min(
1544 i + 1 + random.randint(0, 2), max(0, now - story["taken_at"])
1545 )
1546 story_seen["{!s}_{!s}".format(story["id"], story["user"]["pk"])] = [
1547 "{!s}_{!s}".format(story["taken_at"], story_seen_at)
1548 ]
1549
1550 data = self.json_data(
1551 {
1552 "reels": story_seen,
1553 "_csrftoken": self.token,
1554 "_uuid": self.uuid,
1555 "_uid": self.user_id,
1556 }
1557 )
1558 data = self.generate_signature(data)
1559 return self.session.post(
1560 "https://i.instagram.com/api/v2/" + "media/seen/", data=data
1561 ).ok
1562
1563 def get_user_stories(self, user_id):
1564 url = "feed/user/{}/story/".format(user_id)
1565 return self.send_request(url)
1566
1567 def get_self_story_viewers(self, story_id):
1568 url = ("media/{}/list_reel_media_viewer/?supported_capabilities_new={}").format(
1569 story_id, config.SUPPORTED_CAPABILITIES
1570 )
1571 return self.send_request(url)
1572
1573 def get_tv_suggestions(self):
1574 url = "igtv/tv_guide/"
1575 return self.send_request(url)
1576
1577 def get_hashtag_stories(self, hashtag):
1578 url = "tags/{}/story/".format(hashtag)
1579 return self.send_request(url)
1580
1581 def follow_hashtag(self, hashtag):
1582 data = self.json_data({})
1583 url = "tags/follow/{}/".format(hashtag)
1584 return self.send_request(url, data)
1585
1586 def unfollow_hashtag(self, hashtag):
1587 data = self.json_data({})
1588 url = "tags/unfollow/{}/".format(hashtag)
1589 return self.send_request(url, data)
1590
1591 def get_tags_followed_by_user(self, user_id):
1592 url = "users/{}/following_tags_info/".format(user_id)
1593 return self.send_request(url)
1594
1595 def get_hashtag_sections(self, hashtag):
1596 data = self.json_data(
1597 {
1598 "supported_tabs": "['top','recent','places']",
1599 "include_persistent": "true",
1600 }
1601 )
1602 url = "tags/{}/sections/".format(hashtag)
1603 return self.send_request(url, data)
1604
1605 def get_media_insight(self, media_id):
1606 url = ("insights/media_organic_insights/{}/?ig_sig_key_version={}").format(
1607 media_id, config.IG_SIG_KEY
1608 )
1609 return self.send_request(url)
1610
1611 def get_self_insight(self):
1612 # TODO:
1613 url = (
1614 "insights/account_organic_insights/?"
1615 "show_promotions_in_landing_page=true&first={}"
1616 ).format()
1617 return self.send_request(url)
1618
1619 # From profile => "module_name":"feed_contextual_profile"
1620 # From home/feed => "module_name":"feed_timeline"
1621 def save_media(self, media_id, module_name="feed_timeline"):
1622 return self.send_request(
1623 endpoint="media/{media_id}/save/".format(media_id=media_id),
1624 post=self.json_data(self.action_data({"module_name": module_name})),
1625 )
1626
1627 def unsave_media(self, media_id):
1628 data = self.json_data()
1629 url = "media/{}/unsave/".format(media_id)
1630 return self.send_request(url, data)
1631
1632 def get_saved_medias(self):
1633 url = "feed/saved/"
1634 return self.send_request(url)
1635
1636 def get_loom_fetch_config(self):
1637 return self.send_request("loom/fetch_config/")
1638
1639 def get_profile_notice(self):
1640 return self.send_request("users/profile_notice/")
1641
1642 # ====== DIRECT METHODS ====== #
1643 def get_inbox_v2(self):
1644 data = json.dumps({"persistentBadging": True, "use_unified_inbox": True})
1645 return self.send_request("direct_v2/inbox/", data)
1646
1647 def get_presence(self):
1648 return self.send_request("direct_v2/get_presence/")
1649
1650 def get_thread(self, thread_id, cursor_id=None):
1651 data = {"use_unified_inbox": "true"}
1652 if cursor_id is not None:
1653 data["cursor"] = cursor_id
1654 return self.send_request(
1655 "direct_v2/threads/{}/".format(thread_id), json.dumps(data)
1656 )
1657
1658 def get_ranked_recipients(self, mode, show_threads, query=None):
1659 data = {
1660 "mode": mode,
1661 "show_threads": "false" if show_threads is False else "true",
1662 "use_unified_inbox": "true",
1663 }
1664 if query is not None:
1665 data["query"] = query
1666 return self.send_request("direct_v2/ranked_recipients/", json.dumps(data))
1667
1668 def send_direct_item(self, item_type, users, **options):
1669 data = {"client_context": self.generate_UUID(True), "action": "send_item"}
1670 headers = {}
1671 recipients = self._prepare_recipients(
1672 users, options.get("thread"), use_quotes=False
1673 )
1674 if not recipients:
1675 return False
1676 data["recipient_users"] = recipients.get("users")
1677 if recipients.get("thread"):
1678 data["thread_ids"] = recipients.get("thread")
1679 data.update(self.default_data)
1680
1681 url = "direct_v2/threads/broadcast/{}/".format(item_type)
1682 text = options.get("text", "")
1683 if item_type == "link":
1684 data["link_text"] = text
1685 data["link_urls"] = json.dumps(options.get("urls"))
1686 elif item_type == "text":
1687 data["text"] = text
1688 elif item_type == "media_share":
1689 data["text"] = text
1690 data["media_type"] = options.get("media_type", "photo")
1691 data["media_id"] = options.get("media_id", "")
1692 elif item_type == "hashtag":
1693 data["text"] = text
1694 data["hashtag"] = options.get("hashtag", "")
1695 elif item_type == "profile":
1696 data["text"] = text
1697 data["profile_user_id"] = options.get("profile_user_id")
1698 elif item_type == "photo":
1699 url = "direct_v2/threads/broadcast/upload_photo/"
1700 filepath = options["filepath"]
1701 upload_id = str(int(time.time() * 1000))
1702 with open(filepath, "rb") as f:
1703 photo = f.read()
1704
1705 data["photo"] = (
1706 "direct_temp_photo_%s.jpg" % upload_id,
1707 photo,
1708 "application/octet-stream",
1709 {"Content-Transfer-Encoding": "binary"},
1710 )
1711
1712 m = MultipartEncoder(data, boundary=self.uuid)
1713 data = m.to_string()
1714 headers.update({"Content-type": m.content_type})
1715
1716 return self.send_request(url, data, with_signature=False, headers=headers)
1717
1718 def get_pending_inbox(self):
1719 url = (
1720 "direct_v2/pending_inbox/?persistentBadging=true" "&use_unified_inbox=true"
1721 )
1722 return self.send_request(url)
1723
1724 # ACCEPT button in pending request
1725 def approve_pending_thread(self, thread_id):
1726 data = self.json_data({"_uuid": self.uuid, "_csrftoken": self.token})
1727 url = "direct_v2/threads/{}/approve/".format(thread_id)
1728 return self.send_request(url, post=data)
1729
1730 # DELETE button in pending request
1731 def hide_pending_thread(self, thread_id):
1732 data = self.json_data({"_uuid": self.uuid, "_csrftoken": self.token})
1733 url = "direct_v2/threads/{}/hide/".format(thread_id)
1734 return self.send_request(url, post=data)
1735
1736 # BLOCK button in pending request
1737 def decline_pending_thread(self, thread_id):
1738 data = self.json_data({"_uuid": self.uuid, "_csrftoken": self.token})
1739 url = "direct_v2/threads/{}/decline/".format(thread_id)
1740 return self.send_request(url, post=data)
1741
1742 def open_instagram_link(self, link):
1743 return self.send_request(
1744 "oembed/?url={}".format(urllib.parse.quote(link, safe=""))
1745 )