· 6 years ago · Oct 27, 2019, 05:14 PM
1# -*- coding: utf-8 -*-
2
3from vk_api import VkApi
4from vk_api.longpoll import VkLongPoll, VkEventType, Event
5from vk_api.utils import get_random_id
6
7from typing import Optional
8from DB import BotDB
9from threading import Thread
10from time import sleep
11import logging
12
13token = ""
14
15_vk = VkApi(token=token)
16longpoll = VkLongPoll(_vk, preload_messages=True)
17vk = _vk.get_api()
18
19logging.basicConfig(format="%(asctime)s - %(message)s", level=logging.INFO)
20logger_vk_api = logging.getLogger('vk_api')
21logger_vk_api.setLevel(logging.DEBUG)
22
23logger = logging.getLogger('bot_vk')
24
25
26def send_message(peer_id: int, message: Optional[str] = None, **kwargs):
27 vk.messages.send(message=message, peer_id=peer_id, random_id=get_random_id(), **kwargs)
28
29
30def get_username(user_id: int, **kwargs) -> str:
31 return vk.users.get(user_id=user_id, **kwargs)[0]['first_name']
32
33
34def thread_user_handler(event: Event):
35 db: BotDB = BotDB()
36 body: str = event.text.strip().lower()
37 user_id: int = event.user_id
38 peer_id: int = event.peer_id
39 username: str = get_username(user_id)
40 request = {}
41
42 db.add_user(username=username, user_id=user_id, role=0)
43
44 current_user = db.get_user_by_id(user_id=user_id)
45
46 if body[:6] == "выдать":
47 if current_user.role == 0:
48 request.update({"message": f"где ваши права?"})
49
50 if body != "выдать":
51 try:
52 money = body[7::]
53 except IndexError:
54 return
55 if "+" in money or "-" in money:
56 request.update({'message': "+- нельзя"})
57 if not money.isdigit():
58 request.update({"message": "где цифры?"})
59 else:
60 money = int(money)
61 db.add_money(user_id=user_id, money=money, is_set=True)
62 request.update({"message": f"Вы добавили: {money}"})
63
64 elif body[:4] == 'роль':
65 role = current_user.role
66 message = "Ваша роль: "
67 if role == 0:
68 message += "Гость"
69 elif role == 1:
70 message += "Админ"
71 elif role == 2:
72 message += "Зам"
73 elif role == 3:
74 message += "Создатель"
75 request.update({"message": message})
76
77 elif body[:6] == 'баланс':
78 balance = current_user.money
79 request.update({'message': str(balance)})
80 else:
81 request.update({'message': f"Привет {username}, Ты написал: <{body}>"})
82
83 if request.get('message') or request.get('attachment'):
84 send_message(peer_id=peer_id, **request)
85
86 logger.info(f"New message! username: {username}, user_id: {user_id}, text: {body}")
87
88
89def main():
90 for event in longpoll.listen():
91 if event.type == VkEventType.MESSAGE_NEW and event.from_user and event.text and not event.from_me:
92 Thread(target=thread_user_handler, args=(event,)).start()
93
94
95if __name__ == '__main__':
96 while True:
97 try:
98 main()
99 except Exception as e:
100 print(e)
101 sleep(10)
102
103
104# db:
105
106# -*- coding: utf-8 -*-
107
108import sqlite3
109from typing import Optional, NamedTuple, List, NoReturn, Iterator, Union
110
111
112class User(NamedTuple):
113 id: int
114 user_id: int
115 username: str
116 money: int
117 role: int
118
119 def __add__(self, value: int):
120 with BotDB() as pool:
121 if self.user_id and isinstance(value, int) and bool(value):
122 pool.add_money(user_id=self.user_id, money=value, is_set=True)
123
124
125class SQLCommand:
126 CREATE_DB = """CREATE TABLE IF NOT EXISTS `users`(
127 id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
128 username VARCHAR(300) NOT NULL,
129 user_id INTEGER NOT NULL,
130 money INTEGER NOT NULL DEFAULT 0,
131 role INTEGER NOT NULL DEFAULT 0)"""
132
133 GET_USER_BY_ID = "SELECT * FROM `users` WHERE user_id = ?"
134 ADD_USER_TO_DB = "INSERT INTO `users`(username, user_id, role) VALUES (?, ?, ?)"
135 ADD_MONEY = "UPDATE `users` SET money = ? WHERE user_id = ?"
136 ALL_DATA = "SELECT * FROM `users`"
137 COUNT_COLUMN = "SELECT COUNT(*) FROM `users`"
138 REMOVE_USER = "DELETE FROM `users` WHERE user_id = ?"
139 SET_ROLE = "UPDATE `users` SET role = ? WHERE user_id = ?"
140 SEARCH_USER_BY_NAME = "SELECT * FROM `users` WHERE LOWER(username) = LOWER(?)"
141 SEARCH_USER_BY_INDEX = "SELECT * FROM `users` WHERE id = ?"
142 CLEAR_COLUMNS = "DELETE FROM `users`"
143
144
145class BotDB:
146 __slots__ = ('filename', 'connection', 'cursor')
147
148 def __init__(self, filename: Optional[str] = None):
149 if filename is not None:
150 self.filename: str = filename
151 else:
152 self.filename: str = 'users.db'
153
154 self.connection = sqlite3.connect(self.filename, check_same_thread=False)
155 self.connection.row_factory = sqlite3.Row
156 self.cursor = self.connection.cursor()
157
158 def create_db(self) -> NoReturn:
159 with self.connection:
160 command = SQLCommand.CREATE_DB
161 self.cursor.execute(command)
162 self.connection.commit()
163
164 def add_user(self, username: str, user_id: int, role: int = 0) -> NoReturn:
165 with self.connection:
166 command = SQLCommand.ADD_USER_TO_DB
167 user = self.get_user_by_id(user_id=user_id)
168 if user is None:
169 self.cursor.execute(command, (username, user_id, role))
170 self.connection.commit()
171
172 def get_all_users(self) -> List[User]:
173 with self.connection:
174 command = SQLCommand.ALL_DATA
175 users = self.cursor.execute(command)
176 return [self._prepare_user(user) for user in users]
177
178 def get_user_by_id(self, user_id: int) -> Optional[User]:
179 with self.connection:
180 command = SQLCommand.GET_USER_BY_ID
181 user = self.cursor.execute(command, (user_id,)).fetchone()
182 if user is None:
183 return None
184 return self._prepare_user(user)
185
186 def add_money(self, user_id: int, money: int, is_set=None) -> NoReturn:
187 with self.connection:
188 command = SQLCommand.ADD_MONEY
189 balance = 0
190 if is_set:
191 balance = self.get_user_by_id(user_id=user_id).money
192
193 self.cursor.execute(command, (money + balance, user_id))
194 self.connection.commit()
195
196 def set_role(self, user_id: int, role: int = 0) -> NoReturn:
197 with self.connection.commit():
198 command = SQLCommand.SET_ROLE
199 self.cursor.execute(command, (role, user_id))
200 self.connection.commit()
201
202 def remove_user(self, user_id: int) -> NoReturn:
203 with self.connection:
204 command = SQLCommand.REMOVE_USER
205 user = self.get_user_by_id(user_id)
206 if user is not None:
207 self.cursor.execute(command, (user_id,))
208 self.connection.commit()
209
210 def clear_all_users(self) -> NoReturn:
211 with self.connection:
212 command = SQLCommand.CLEAR_COLUMNS
213 self.cursor.execute(command)
214
215 def _prepare_user(self, user: sqlite3.Row) -> User:
216 return User(id=user['id'], user_id=user['user_id'], username=user['username'], money=user['money'],
217 role=user['role'])
218
219 def __enter__(self):
220 return self
221
222 def __exit__(self, type_error, value, traceback):
223 self.connection.close()
224
225 def __iter__(self) -> Iterator[User]:
226 with self.connection:
227 command = SQLCommand.ALL_DATA
228 for user in self.cursor.execute(command):
229 yield self._prepare_user(user)
230
231 def __len__(self):
232 with self.connection:
233 command = SQLCommand.COUNT_COLUMN
234 return self.cursor.execute(command).fetchone()[0]
235
236 def __int__(self):
237 with self.connection:
238 command = SQLCommand.COUNT_COLUMN
239 return self.cursor.execute(command).fetchone()[0]
240
241 def __getitem__(self, item: Union[int, str]) -> Optional[User]:
242 with self.connection:
243 if isinstance(item, str):
244 command = SQLCommand.SEARCH_USER_BY_NAME
245 elif isinstance(item, int):
246 command = SQLCommand.SEARCH_USER_BY_INDEX
247 else:
248 raise TypeError
249 user = self.cursor.execute(command, (item,)).fetchone()
250 if user is None:
251 return None
252 return self._prepare_user(user)