· 6 years ago · Aug 01, 2019, 09:00 AM
1import os
2import time
3import random
4from threading import Thread
5
6from twython import Twython, TwythonStreamer, TwythonError
7
8print("Listening for tweets...")
9
10APP_KEY = "CONSUMER KEY"
11APP_SECRET = "CONSUMER SECRET"
12OAUTH_TOKEN = "ACCESS TOKEN"
13OAUTH_TOKEN_SECRET = "ACCESS TOKEN SECRET"
14
15
16def twitter_api():
17 """ Authenticate credentials"""
18
19 # Replace the placeholder below with your credentials
20 twitter = Twython(APP_KEY, APP_SECRET, OAUTH_TOKEN, OAUTH_TOKEN_SECRET)
21 return twitter
22
23
24def reply(data):
25 api = twitter_api()
26 tweet = data.get('text').lower()
27 tweet_text = "Reply text"
28 if tweet[0] != "@" and "RT" not in tweet and tweet.count("#") <= 3:
29 try:
30 handle = data.get('user').get('screen_name')
31 tweet_id = data.get('id')
32 status = f"@{handle} {tweet_text}"
33 time.sleep(random.choice(list(range(3, 10))))
34 api.update_status(status=status, in_reply_to_status_id=tweet_id)
35 print("Tweet successfully sent!")
36 except TwythonError as e:
37 print(e)
38
39
40class MyStreamer(TwythonStreamer):
41 def on_success(self, data):
42
43 tweetText = data.get('text')
44 print(tweetText)
45 thread = Thread(target=reply, args = (data,))
46 thread.start() # This will run in the background
47
48 def on_error(self, status_code, data):
49 print("Twitter Error Status code", status_code)
50
51 # self.disconnect()
52
53
54stream = MyStreamer(APP_KEY, APP_SECRET,
55 OAUTH_TOKEN, OAUTH_TOKEN_SECRET)
56
57hashtags = ["keyword1", "keyword2", "keyword3"]
58stream.statuses.filter(track=hashtags)