· 6 years ago · Feb 18, 2019, 07:40 AM
1import aiohttp
2import aio_pika
3import asyncio
4import ujson
5
6from typing import AnyStr, Dict, Tuple
7from pprint import pprint
8from uuid import uuid1
9
10
11class Bot(object):
12 def __init__(self, settings: Dict, loop: asyncio.AbstractEventLoop):
13 self.settings = settings
14 self.loop = loop
15
16 self.aiohttp_session = aiohttp.ClientSession(json_serialize=ujson)
17 self.aio_pika_connection = None
18
19 self.bot_information = None
20 self.team_information = None
21 self.websocket_url = None
22
23 def parse_message_data(self, message_data: Dict) -> Tuple[Dict, AnyStr]:
24 pprint(message_data)
25 result = None
26 if message_data.get('user') and message_data.get('user') != self.bot_information['id']:
27 result = message_data.get('content') or message_data.get('text')
28 return result, 'default'
29
30 async def publish_task(self, task_data: Dict, queue_name: AnyStr):
31 yield None
32
33 async def consume_task_result(self):
34 yield None
35
36 async def setup(self):
37 post_parameters = {'token': self.settings['access_token']}
38 async with self.aiohttp_session.get(
39 url='{api_url}/{rtm_connect}'.format(api_url=self.settings['api_url'], rtm_connect='rtm.connect'),
40 params=post_parameters
41 ) as response:
42 response_data = await response.json()
43 if not response_data['ok']:
44 raise RuntimeError(response_data['error'])
45
46 self.bot_information = response_data['self']
47 self.team_information = response_data['team']
48 self.websocket_url = response_data['url']
49
50 self.aio_pika_connection = await aio_pika.connect_robust('amqp://guest:guest@127.0.0.1/', loop=self.loop)
51
52 async def process_input(self):
53 async with self.aiohttp_session.ws_connect(self.websocket_url) as websocket_connection:
54 async for message in websocket_connection:
55 if message.type == aiohttp.WSMsgType.TEXT:
56 message_data = ujson.loads(message.data)
57 task_data, queue = self.parse_message_data(message_data)
58
59 # if queue:
60 # data = await self.publish_task(task_data, queue)
61 # print(data)
62
63 # text = await self.consume_task_result()
64 # print(text)
65 # notification_message = {
66 # "id": "{}".format(uuid1()),
67 # "type": "message",
68 # "channel": "CE72XEVH8",
69 # "text": text
70 # }
71 # await websocket_connection.send_str(ujson.dumps(notification_message))
72
73 async def run(self):
74 await self.setup()
75 await self.process_input()
76 # need to handle it more accurate
77 await self.aiohttp_session.close()
78
79
80async def main(loop):
81 settings = {
82 'access_token': 'secret_key',
83 'api_url': 'https://slack.com/api',
84
85 'task_results_queue': 'bot_task_results',
86 }
87 bot = Bot(settings, loop)
88 await bot.run()
89
90
91loop = asyncio.get_event_loop()
92loop.run_until_complete(main(loop))
93loop.close()