· 5 years ago · Aug 17, 2020, 01:36 AM
1#!/usr/bin/env python
2import time
3
4import tweepy
5
6consumer_key = 'YOUR KEY HERE'
7consumer_secret = 'YOUR KEY HERE'
8
9
10class UserTimeline():
11 def __init__(self, user_id):
12 self.user_id = user_id
13 auth = tweepy.AppAuthHandler(consumer_key, consumer_secret)
14 self.api = tweepy.API(auth)
15
16 def get_recent_tweet_video_urls(self):
17 result = []
18 for status in self.api.user_timeline(id=self.user_id, count=100):
19 if not hasattr(status, 'in_reply_to_screen_name') or status.in_reply_to_screen_name != self.user_id:
20 continue
21 if hasattr(status, 'retweeted_status'):
22 status = status.retweeted_status
23 elif hasattr(status, 'quoted_status'):
24 status = status.quoted_status
25 if hasattr(status, 'extended_entities'):
26 if 'media' in status.extended_entities:
27 for m in status.extended_entities['media']:
28 if m['type'] == 'video':
29 result.append(m['expanded_url'])
30 return result
31
32
33class ThreadVideos():
34 def __init__(self, url):
35 auth = tweepy.AppAuthHandler(consumer_key, consumer_secret)
36 self.api = tweepy.API(auth)
37 self.url = url
38
39 def get_thread_video_urls(self):
40 tweet_id = self.url.split('/')[-1]
41 status = self.api.get_status(tweet_id)
42
43 user_name = status.author.screen_name
44
45 replies = tweepy.Cursor(self.api.search, q='to:{} from:{}'.format(user_name, user_name),
46 since_id=tweet_id, tweet_mode='extended').items()
47 tweets = [status]
48 ids_in_thread = set([tweet_id])
49 query_results = []
50
51 while True:
52 try:
53 reply = replies.next()
54 query_results.append(reply)
55
56 except tweepy.RateLimitError as e:
57 print("Twitter api rate limit reached".format(e))
58 time.sleep(60)
59 continue
60
61 except tweepy.TweepError as e:
62 print("Tweepy error occured:{}".format(e))
63 break
64
65 except StopIteration:
66 break
67
68 except Exception as e:
69 print("Failed while fetching replies {}".format(e))
70 break
71 got_new = True
72 while got_new:
73 got_new = False
74 for status in query_results:
75 if not hasattr(status, 'in_reply_to_status_id_str'):
76 continue
77 if str(status.in_reply_to_status_id) in ids_in_thread and str(status.id) not in ids_in_thread:
78 got_new = True
79 ids_in_thread.add(str(status.id))
80 tweets.append(status)
81
82 result = []
83 for status in tweets:
84 if status.author.screen_name != user_name:
85 continue
86 if hasattr(status, 'retweeted_status'):
87 status = status.retweeted_status
88 elif hasattr(status, 'quoted_status'):
89 status = status.quoted_status
90 if hasattr(status, 'extended_entities'):
91 if 'media' in status.extended_entities:
92 for m in status.extended_entities['media']:
93 if m['type'] == 'video':
94 result.append(m['expanded_url'])
95 return result
96
97