· 4 years ago · May 23, 2021, 01:12 PM
1# Builtin
2import math
3import os
4import random
5# Pip
6from discord.ext import commands
7from discord import Embed
8from discord import Colour
9import apsw
10# Custom
11from Utils.Paginator import Paginator
12from Utils import Utils
13
14# Path variables
15rootDirectory = os.path.join(os.path.dirname(__file__))
16databasePath = os.path.join("BotFiles", "levels.db")
17
18
19# Cog to manage levelling commands
20class Levelling(commands.Cog):
21 def __init__(self, client):
22 self.client = client
23 self.colour = Colour.red()
24 self.startingXp = 10
25 self.messageCooldown = commands.CooldownMapping.from_cooldown(1, 60, commands.BucketType.user)
26 self.cursor = apsw.Connection(databasePath).cursor()
27 self.cursor.execute("""CREATE TABLE IF NOT EXISTS levels(
28 guildID INTEGER NOT NULL,
29 userID INTEGER NOT NULL,
30 currentExp INTEGER NOT NULL,
31 currentLevel INTEGER NOT NULL,
32 rank INTEGER NOT NULL,
33 PRIMARY KEY(guildID, userID)
34 )""")
35
36 # Grab remaining cooldown
37 def cooldownTest(self, message):
38 return self.messageCooldown.get_bucket(message).update_rate_limit()
39
40 # Get experience required for the next level
41 def nextExp(self, level):
42 return round(self.startingXp*(5**level))
43
44 # Get the new exp values for a user
45 def getExp(self, row):
46 expRequired = self.nextExp(row[3])
47 newExp = row[2]+random.randint(10, 20)
48 if newExp >= expRequired:
49 return newExp, row[3]+1
50 return newExp, row[3]
51
52 # Sort the ranks for each guild
53 def rankSort(self, guildID):
54 # Get all users in a specific guild
55 guildUsers = list(self.cursor.execute(f"SELECT * FROM levels WHERE guildID == {guildID}"))
56 sortedUsers = sorted(guildUsers, key=lambda x: x[2], reverse=True)
57 result = []
58 for count, user in enumerate(sortedUsers):
59 temp = list(user)
60 temp[4] = count+1
61 result.append(tuple(temp))
62 for user in result:
63 self.cursor.execute(f"UPDATE levels SET rank = ? WHERE guildID == ? AND userID == ?", (user[4], user[0], user[1]))
64
65 # Function to split a list with a set amount of items in each
66 def listSplit(self, arr, perListSize, listAmmount):
67 result = []
68 for i in range(listAmmount):
69 result.append(arr[i * perListSize:i * perListSize + perListSize])
70 return result
71
72 # Function to create embed list for ranks command
73 async def ranksEmbedCreator(self, ctx, users):
74 result = []
75 if len(users) > 10:
76 # Paginate the result
77 maxPage = math.ceil(len(users)/10)
78 splittedList = self.listSplit(users, 10, maxPage)
79 for count, arr in enumerate(splittedList):
80 tempEmbed = Embed(title=f"{ctx.guild.name} Leaderboard", colour=self.colour)
81 tempEmbed.set_footer(text=f"Page {count+1} of {maxPage}")
82 resultString = ""
83 for user in sorted(arr, key=lambda x: x[2], reverse=True):
84 userName = await self.client.fetch_user(user[1])
85 resultString += f"{user[4]}. {userName} (XP: {user[2]} | Level: {user[3]})\n"
86 tempEmbed.add_field(name="Ranks", value=resultString)
87 result.append(tempEmbed)
88 elif len(users) <= 10 and len(users) > 0:
89 # Don't paginate the result
90 rankEmbed = Embed(title=f"{ctx.guild.name} Leaderboard", colour=self.colour)
91 rankEmbed.set_footer(text=f"Page 1 of 1")
92 resultString = ""
93 for user in sorted(users, key=lambda x: x[2], reverse=True):
94 userName = await self.client.fetch_user(user[1])
95 resultString += f"{user[4]}. {userName} (XP: {user[2]} | Level: {user[3]})\n"
96 rankEmbed.add_field(name="Ranks", value=resultString)
97 result.append(rankEmbed)
98 else:
99 # No users registered on database
100 rankEmbed = Embed(title=f"{ctx.guild.name} Leaderboard", colour=self.colour)
101 rankEmbed.description = "No users registered yet"
102 result.append(rankEmbed)
103 return result
104
105 # Check if a user is not a bot
106 async def isBot(self, id):
107 result = await self.client.fetch_user(id)
108 return result.bot
109
110 # Detect incoming messages
111 @commands.Cog.listener()
112 async def on_message(self, message):
113 # Detect if the user is a bot and a minute has passed
114 guildID = message.guild.id
115 authorID = message.author.id
116 isBot = await self.isBot(authorID)
117 if not isBot and self.cooldownTest(message) is None:
118 try:
119 # Add new user to database
120 self.cursor.execute("INSERT INTO levels values(?, ?, ?, ?, ?)", (guildID, authorID, self.startingXp, 1, 0))
121 await message.channel.send(f"{message.author.mention} just advanced to to **Level 1**. Congratulations!")
122 except apsw.ConstraintError:
123 # User already exists so increment their experience
124 for row in self.cursor.execute(f"SELECT * FROM levels WHERE guildID == {guildID} AND userID == {authorID}"):
125 exp, level = self.getExp(row)
126 if level != row[3]:
127 await message.channel.send(f"{message.author.mention} just advanced to to **Level {level}**. Congratulations!")
128 self.cursor.execute("UPDATE levels SET currentExp = ?, currentLevel = ? WHERE guildID == ? AND userID == ?", (exp, level, row[0], row[1]))
129 self.rankSort(guildID)
130
131 # level command with a cooldown of 1 use every 30 seconds per guild
132 @commands.command()
133 @commands.cooldown(1, 30, commands.BucketType.guild)
134 async def level(self, ctx):
135 # Create embed which gives details about a user's experience
136 levelEmbed = Embed(title=f"{ctx.author.name}", colour=self.colour)
137 levelEmbed.set_thumbnail(url=ctx.author.avatar_url)
138 for row in self.cursor.execute(f"SELECT * FROM levels WHERE guildID == {ctx.guild.id} AND userID == {ctx.author.id}"):
139 levelEmbed.add_field(name="Current level", value=row[3], inline=True)
140 levelEmbed.add_field(name="Current experience", value=row[2], inline=True)
141 levelEmbed.add_field(name=f"Experience required for **Level {row[3]+1}**", value=str(self.nextExp(row[3])), inline=True)
142 await ctx.channel.send(embed=levelEmbed)
143
144 # ranks command with a cooldown of 1 use every 30 seconds per guild
145 @commands.command()
146 @commands.cooldown(1, 30, commands.BucketType.guild)
147 async def ranks(self, ctx):
148 # Create embed which gives details about a guild's leaderboard
149 rankEmbeds = await self.ranksEmbedCreator(ctx, list(self.cursor.execute(f"SELECT * FROM levels WHERE guildID == {ctx.guild.id}")))
150 if len(rankEmbeds) == 1:
151 await ctx.channel.send(embed=rankEmbeds[0])
152 else:
153 # Create paginator
154 paginator = Paginator(ctx, self.client)
155 paginator.addPages(rankEmbeds)
156 await paginator.start()
157
158 # Catch any cog errors
159 async def cog_command_error(self, ctx, error):
160 if isinstance(error, commands.CommandOnCooldown):
161 await ctx.channel.send(f"Command is on cooldown, try again in {round(error.retry_after, 2)} seconds")
162 Utils.errorWrite(error)
163
164
165# Function which initialises the Levelling cog
166def setup(client):
167 client.add_cog(Levelling(client))
168