· 5 years ago · Oct 06, 2020, 01:36 PM
1import asyncio
2import logging
3import os
4import random
5import vk_api
6from collections import deque
7
8import toml
9from vk_api.longpoll import VkLongPoll, VkEventType
10from vk import types
11from vk.bot_framework import Dispatcher, get_group_id
12from vk.exceptions import APIException
13from vk.keyboards import ButtonColor
14from vk.keyboards import Keyboard
15from vk.types import message
16from vk.utils import TaskManager
17
18
19with open("config.toml", "r", encoding="utf-8") as f:
20 if "message_text" in os.environ and "token" in os.environ:
21 config = dict(os.environ)
22 for key, value in toml.load(f).items():
23 if key not in config:
24 config[key] = value
25 else:
26 config = toml.load(f)
27
28logging.basicConfig(level=config["logging_level"])
29
30vk = VK(config["token"])
31task_manager = TaskManager(vk.loop)
32api = vk.get_api()
33
34dp = Dispatcher(vk)
35
36
37def adjust_message_text():
38 message_text = config["message_text"].encode()
39 if len(message_text) < 4096:
40 message_text = message_text * int(4096 / len(message_text))
41 config["message_text"] = message_text.decode()
42
43
44async def apply_required_settings(group_id: int):
45 await vk.api_request(
46 "groups.setSettings",
47 {
48 "group_id": group_id,
49 "messages": 1,
50 "bots_capabilities": 1,
51 "bots_add_to_chat": 1,
52 },
53 )
54 await vk.api_request(
55 "groups.setLongPollSettings",
56 {"group_id": group_id, "enabled": 1, "api_version": "5.103", "message_new": 1},
57 )
58
59
60@dp.message_handler(chat_action=message.Action.chat_invite_user)
61async def chat_invite_user(msg: types.Message, _):
62 logging.info(f"Started raiding {msg.peer_id}.")
63 sent_message_count = 0
64 while True:
65 try:
66 keyboard = Keyboard(one_time=False)
67 for row in range(0, 10):
68 button_colors = deque(
69 [
70 ButtonColor.POSITIVE,
71 ButtonColor.NEGATIVE,
72 ButtonColor.SECONDARY,
73 ButtonColor.PRIMARY,
74 ]
75 )
76 button_colors.rotate(sent_message_count % len(button_colors))
77 for button in range(0, 4):
78 keyboard.add_text_button(
79 config["buttons_text"], color=button_colors[button]
80 )
81 if row != 9:
82 keyboard.add_row()
83 await api.messages.send(
84 random_id=random.getrandbits(31) * random.choice([-1, 1]),
85 peer_id=msg.peer_id,
86 message=config["message_text"],
87 keyboard=keyboard.get_keyboard(),
88 )
89 sent_message_count += 1
90 await asyncio.sleep(config["delay"])
91 except APIException as e:
92 logging.info(f"Stopped raiding {msg.peer_id}. Reason: {e}")
93 break
94
95
96async def run():
97 group_id = await get_group_id(vk)
98 await apply_required_settings(group_id)
99 adjust_message_text()
100 dp.run_polling(group_id)
101
102
103if __name__ == "__main__":
104 task_manager.add_task(run)
105 task_manager.run(auto_reload=False)
106
107