Throw threadpool out of the window

This commit is contained in:
Jeremy Zhang 2018-03-25 22:16:55 +00:00
parent e0b18fb94f
commit 5fa1e43e2a
2 changed files with 223 additions and 240 deletions

View File

@ -68,7 +68,7 @@ class Titan(discord.AutoShardedClient):
await self.change_presence(status=discord.Status.online, activity=game) await self.change_presence(status=discord.Status.online, activity=game)
try: try:
await self.database.connect(config["database-uri"]) self.database.connect(config["database-uri"])
except Exception: except Exception:
self.logger.error("Unable to connect to specified database!") self.logger.error("Unable to connect to specified database!")
traceback.print_exc() traceback.print_exc()

View File

@ -1,5 +1,4 @@
from contextlib import contextmanager from contextlib import contextmanager
from asyncio_extras import threadpool
import sqlalchemy as db import sqlalchemy as db
from sqlalchemy.engine import Engine, create_engine from sqlalchemy.engine import Engine, create_engine
from sqlalchemy.orm import sessionmaker, Session from sqlalchemy.orm import sessionmaker, Session
@ -26,77 +25,71 @@ class DatabaseInterface(object):
self.engine = None # type: Engine self.engine = None # type: Engine
self._sessionmaker = None # type: sessionmaker self._sessionmaker = None # type: sessionmaker
async def connect(self, dburi): def connect(self, dburi):
async with threadpool(): self.engine = create_engine(dburi, pool_recycle=10)
self.engine = create_engine(dburi, pool_recycle=10)
self._sessionmaker = sessionmaker(bind=self.engine, expire_on_commit=False)
@contextmanager @contextmanager
def get_session(self) -> Session: def get_session(self):
session = self._sessionmaker() # type: Session Session = sessionmaker(bind=self.engine)
session = Session()
try: try:
yield session yield session
session.commit()
except: except:
session.rollback() session.rollback()
raise
finally: finally:
session.close() session.commit()
async def push_message(self, message): async def push_message(self, message):
if message.guild: if message.guild:
async with threadpool(): with self.get_session() as session:
with self.get_session() as session: edit_ts = message.edited_at
edit_ts = message.edited_at if not edit_ts:
if not edit_ts: edit_ts = None
edit_ts = None else:
else: edit_ts = str(edit_ts)
edit_ts = str(edit_ts)
msg = Messages( msg = Messages(
int(message.guild.id), int(message.guild.id),
int(message.channel.id), int(message.channel.id),
int(message.id), int(message.id),
message.content, message.content,
json.dumps(get_message_author(message)), json.dumps(get_message_author(message)),
str(message.created_at), str(message.created_at),
edit_ts, edit_ts,
json.dumps(get_message_mentions(message.mentions)), json.dumps(get_message_mentions(message.mentions)),
json.dumps(get_attachments_list(message.attachments)), json.dumps(get_attachments_list(message.attachments)),
json.dumps(get_embeds_list(message.embeds)) json.dumps(get_embeds_list(message.embeds))
) )
session.add(msg) session.add(msg)
session.commit() session.commit()
async def update_message(self, message): async def update_message(self, message):
if message.guild: if message.guild:
async with threadpool(): with self.get_session() as session:
with self.get_session() as session: msg = session.query(Messages) \
msg = session.query(Messages) \ .filter(Messages.guild_id == message.guild.id) \
.filter(Messages.guild_id == message.guild.id) \ .filter(Messages.channel_id == message.channel.id) \
.filter(Messages.channel_id == message.channel.id) \ .filter(Messages.message_id == message.id).first()
.filter(Messages.message_id == message.id).first() if msg:
if msg: msg.content = message.content
msg.content = message.content msg.timestamp = message.created_at
msg.timestamp = message.created_at msg.edited_timestamp = message.edited_at
msg.edited_timestamp = message.edited_at msg.mentions = json.dumps(get_message_mentions(message.mentions))
msg.mentions = json.dumps(get_message_mentions(message.mentions)) msg.attachments = json.dumps(get_attachments_list(message.attachments))
msg.attachments = json.dumps(get_attachments_list(message.attachments)) msg.embeds = json.dumps(get_embeds_list(message.embeds))
msg.embeds = json.dumps(get_embeds_list(message.embeds)) msg.author = json.dumps(get_message_author(message))
msg.author = json.dumps(get_message_author(message)) session.commit()
session.commit()
async def delete_message(self, message): async def delete_message(self, message):
if message.guild: if message.guild:
async with threadpool(): with self.get_session() as session:
with self.get_session() as session: msg = session.query(Messages) \
msg = session.query(Messages) \ .filter(Messages.guild_id == int(message.guild.id)) \
.filter(Messages.guild_id == int(message.guild.id)) \ .filter(Messages.channel_id == int(message.channel.id)) \
.filter(Messages.channel_id == int(message.channel.id)) \ .filter(Messages.message_id == int(message.id)).first()
.filter(Messages.message_id == int(message.id)).first() if msg:
if msg: session.delete(msg)
session.delete(msg) session.commit()
session.commit()
async def update_guild(self, guild): async def update_guild(self, guild):
if guild.me.guild_permissions.manage_webhooks: if guild.me.guild_permissions.manage_webhooks:
@ -106,205 +99,195 @@ class DatabaseInterface(object):
server_webhooks = [] server_webhooks = []
else: else:
server_webhooks = [] server_webhooks = []
async with threadpool(): with self.get_session() as session:
with self.get_session() as session: gui = session.query(Guilds).filter(Guilds.guild_id == guild.id).first()
gui = session.query(Guilds).filter(Guilds.guild_id == guild.id).first() if not gui:
if not gui: gui = Guilds(
gui = Guilds( int(guild.id),
int(guild.id), guild.name,
guild.name, json.dumps(get_roles_list(guild.roles)),
json.dumps(get_roles_list(guild.roles)), json.dumps(get_channels_list(guild.channels)),
json.dumps(get_channels_list(guild.channels)), json.dumps(get_webhooks_list(server_webhooks)),
json.dumps(get_webhooks_list(server_webhooks)), json.dumps(get_emojis_list(guild.emojis)),
json.dumps(get_emojis_list(guild.emojis)), int(guild.owner_id),
int(guild.owner_id), guild.icon
guild.icon )
) session.add(gui)
session.add(gui) else:
else: gui.name = guild.name
gui.name = guild.name gui.roles = json.dumps(get_roles_list(guild.roles))
gui.roles = json.dumps(get_roles_list(guild.roles)) gui.channels = json.dumps(get_channels_list(guild.channels))
gui.channels = json.dumps(get_channels_list(guild.channels)) gui.webhooks = json.dumps(get_webhooks_list(server_webhooks))
gui.webhooks = json.dumps(get_webhooks_list(server_webhooks)) gui.emojis = json.dumps(get_emojis_list(guild.emojis))
gui.emojis = json.dumps(get_emojis_list(guild.emojis)) gui.owner_id = int(guild.owner_id)
gui.owner_id = int(guild.owner_id) gui.icon = guild.icon
gui.icon = guild.icon session.commit()
session.commit()
async def remove_unused_guilds(self, guilds): async def remove_unused_guilds(self, guilds):
async with threadpool(): with self.get_session() as session:
with self.get_session() as session: dbguilds = session.query(Guilds).all()
dbguilds = session.query(Guilds).all() changed = False
changed = False for guild in dbguilds:
for guild in dbguilds: disguild = discord.utils.get(guilds, id=guild.guild_id)
disguild = discord.utils.get(guilds, id=guild.guild_id) if not disguild:
if not disguild: changed = True
changed = True dbmsgs = session.query(Messages).filter(Messages.guild_id == int(guild.guild_id)).all()
dbmsgs = session.query(Messages).filter(Messages.guild_id == int(guild.guild_id)).all() for msg in dbmsgs:
for msg in dbmsgs: session.delete(msg)
session.delete(msg) session.delete(guild)
session.delete(guild) if changed:
if changed: session.commit()
session.commit()
async def remove_guild(self, guild): async def remove_guild(self, guild):
async with threadpool(): with self.get_session() as session:
with self.get_session() as session: gui = session.query(Guilds).filter(Guilds.guild_id == int(guild.id)).first()
gui = session.query(Guilds).filter(Guilds.guild_id == int(guild.id)).first() if gui:
if gui: dbmsgs = session.query(Messages).filter(Messages.guild_id == int(guild.id)).delete()
dbmsgs = session.query(Messages).filter(Messages.guild_id == int(guild.id)).delete() session.delete(gui)
session.delete(gui) session.commit()
session.commit()
async def update_guild_member(self, member, active=True, banned=False, guild=None): async def update_guild_member(self, member, active=True, banned=False, guild=None):
async with threadpool(): with self.get_session() as session:
with self.get_session() as session: if guild:
if guild: dbmember = session.query(GuildMembers) \
dbmember = session.query(GuildMembers) \ .filter(GuildMembers.guild_id == int(guild.id)) \
.filter(GuildMembers.guild_id == int(guild.id)) \ .filter(GuildMembers.user_id == int(member.id)) \
.filter(GuildMembers.user_id == int(member.id)) \ .order_by(GuildMembers.id).all()
.order_by(GuildMembers.id).all() else:
else: dbmember = session.query(GuildMembers) \
dbmember = session.query(GuildMembers) \ .filter(GuildMembers.guild_id == int(member.guild.id)) \
.filter(GuildMembers.guild_id == int(member.guild.id)) \ .filter(GuildMembers.user_id == int(member.id)) \
.filter(GuildMembers.user_id == int(member.id)) \ .order_by(GuildMembers.id).all()
.order_by(GuildMembers.id).all() if not dbmember:
if not dbmember: dbmember = GuildMembers(
dbmember = GuildMembers( int(member.guild.id),
int(member.guild.id), int(member.id),
int(member.id), member.name,
member.name, member.discriminator,
member.discriminator, member.nick,
member.nick, member.avatar,
member.avatar, active,
active, banned,
banned, json.dumps(list_role_ids(member.roles))
json.dumps(list_role_ids(member.roles)) )
) session.add(dbmember)
session.add(dbmember) else:
else: if len(dbmember) > 1:
if len(dbmember) > 1: for mem in dbmember[1:]:
for mem in dbmember[1:]: session.delete(mem)
session.delete(mem) dbmember = dbmember[0]
dbmember = dbmember[0] if dbmember.banned != banned or dbmember.active != active or dbmember.username != member.name or dbmember.discriminator != int(member.discriminator) or dbmember.nickname != member.nick or dbmember.avatar != member.avatar or set(json.loads(dbmember.roles)) != set(list_role_ids(member.roles)):
if dbmember.banned != banned or dbmember.active != active or dbmember.username != member.name or dbmember.discriminator != int(member.discriminator) or dbmember.nickname != member.nick or dbmember.avatar != member.avatar or set(json.loads(dbmember.roles)) != set(list_role_ids(member.roles)): dbmember.banned = banned
dbmember.banned = banned dbmember.active = active
dbmember.active = active dbmember.username = member.name
dbmember.username = member.name dbmember.discriminator = member.discriminator
dbmember.discriminator = member.discriminator dbmember.nickname = member.nick
dbmember.nickname = member.nick dbmember.avatar = member.avatar
dbmember.avatar = member.avatar dbmember.roles = json.dumps(list_role_ids(member.roles))
dbmember.roles = json.dumps(list_role_ids(member.roles)) session.commit()
session.commit()
async def unban_server_user(self, user, server): async def unban_server_user(self, user, server):
async with threadpool(): with self.get_session() as session:
with self.get_session() as session: dbmember = session.query(GuildMembers) \
dbmember = session.query(GuildMembers) \ .filter(GuildMembers.guild_id == int(server.id)) \
.filter(GuildMembers.guild_id == int(server.id)) \ .filter(GuildMembers.user_id == int(user.id)).first()
.filter(GuildMembers.user_id == int(user.id)).first() if dbmember:
if dbmember: dbmember.banned = False
dbmember.banned = False session.commit()
session.commit()
async def flag_unactive_guild_members(self, guild_id, guild_members): async def flag_unactive_guild_members(self, guild_id, guild_members):
async with threadpool(): with self.get_session() as session:
with self.get_session() as session: changed = False
changed = False dbmembers = session.query(GuildMembers) \
dbmembers = session.query(GuildMembers) \ .filter(GuildMembers.guild_id == int(guild_id)) \
.filter(GuildMembers.guild_id == int(guild_id)) \ .filter(GuildMembers.active == True).all()
.filter(GuildMembers.active == True).all() for member in dbmembers:
for member in dbmembers: dismember = discord.utils.get(guild_members, id=member.user_id)
dismember = discord.utils.get(guild_members, id=member.user_id) if not dismember:
if not dismember: changed = True
changed = True member.active = False
member.active = False if changed:
if changed: session.commit()
session.commit()
async def flag_unactive_bans(self, guild_id, guildbans): async def flag_unactive_bans(self, guild_id, guildbans):
async with threadpool(): with self.get_session() as session:
with self.get_session() as session: changed = False
changed = False for usr in guildbans:
for usr in guildbans: dbusr = session.query(GuildMembers) \
dbusr = session.query(GuildMembers) \ .filter(GuildMembers.guild_id == int(guild_id)) \
.filter(GuildMembers.guild_id == int(guild_id)) \ .filter(GuildMembers.user_id == int(usr.id)) \
.filter(GuildMembers.user_id == int(usr.id)) \ .filter(GuildMembers.active == False).first()
.filter(GuildMembers.active == False).first() changed = True
changed = True if dbusr:
if dbusr: dbusr.banned = True
dbusr.banned = True else:
else: dbusr = GuildMembers(
dbusr = GuildMembers( int(guild_id),
int(guild_id), int(usr.id),
int(usr.id), usr.name,
usr.name, usr.discriminator,
usr.discriminator, None,
None, usr.avatar,
usr.avatar, False,
False, True,
True, "[]"
"[]" )
) session.add(dbusr)
session.add(dbusr) if changed:
if changed: session.commit()
session.commit()
async def ban_unauth_user_by_query(self, guild_id, placer_id, username, discriminator): async def ban_unauth_user_by_query(self, guild_id, placer_id, username, discriminator):
async with threadpool(): with self.get_session() as session:
with self.get_session() as session: dbuser = None
dbuser = None if discriminator:
if discriminator: dbuser = session.query(UnauthenticatedUsers) \
dbuser = session.query(UnauthenticatedUsers) \ .filter(UnauthenticatedUsers.guild_id == int(guild_id)) \
.filter(UnauthenticatedUsers.guild_id == int(guild_id)) \ .filter(UnauthenticatedUsers.username.ilike("%" + username + "%")) \
.filter(UnauthenticatedUsers.username.ilike("%" + username + "%")) \ .filter(UnauthenticatedUsers.discriminator == discriminator) \
.filter(UnauthenticatedUsers.discriminator == discriminator) \ .order_by(UnauthenticatedUsers.id.desc()).first()
.order_by(UnauthenticatedUsers.id.desc()).first() else:
else: dbuser = session.query(UnauthenticatedUsers) \
dbuser = session.query(UnauthenticatedUsers) \ .filter(UnauthenticatedUsers.guild_id == int(guild_id)) \
.filter(UnauthenticatedUsers.guild_id == int(guild_id)) \ .filter(UnauthenticatedUsers.username.ilike("%" + username + "%")) \
.filter(UnauthenticatedUsers.username.ilike("%" + username + "%")) \ .order_by(UnauthenticatedUsers.id.desc()).first()
.order_by(UnauthenticatedUsers.id.desc()).first() if not dbuser:
if not dbuser: return "Ban error! Guest user cannot be found."
return "Ban error! Guest user cannot be found." dbban = session.query(UnauthenticatedBans) \
dbban = session.query(UnauthenticatedBans) \ .filter(UnauthenticatedBans.guild_id == int(guild_id)) \
.filter(UnauthenticatedBans.guild_id == int(guild_id)) \ .filter(UnauthenticatedBans.last_username == dbuser.username) \
.filter(UnauthenticatedBans.last_username == dbuser.username) \ .filter(UnauthenticatedBans.last_discriminator == dbuser.discriminator).first()
.filter(UnauthenticatedBans.last_discriminator == dbuser.discriminator).first() if dbban is not None:
if dbban is not None: if dbban.lifter_id is None:
if dbban.lifter_id is None: return "Ban error! Guest user, **{}#{}**, has already been banned.".format(dbban.last_username, dbban.last_discriminator)
return "Ban error! Guest user, **{}#{}**, has already been banned.".format(dbban.last_username, dbban.last_discriminator) session.delete(dbban)
session.delete(dbban) dbban = UnauthenticatedBans(int(guild_id), dbuser.ip_address, dbuser.username, dbuser.discriminator, "", int(placer_id))
dbban = UnauthenticatedBans(int(guild_id), dbuser.ip_address, dbuser.username, dbuser.discriminator, "", int(placer_id)) session.add(dbban)
session.add(dbban) session.commit()
session.commit() return "Guest user, **{}#{}**, has successfully been added to the ban list!".format(dbban.last_username, dbban.last_discriminator)
return "Guest user, **{}#{}**, has successfully been added to the ban list!".format(dbban.last_username, dbban.last_discriminator)
async def revoke_unauth_user_by_query(self, guild_id, username, discriminator): async def revoke_unauth_user_by_query(self, guild_id, username, discriminator):
async with threadpool(): with self.get_session() as session:
with self.get_session() as session: dbuser = None
dbuser = None if discriminator:
if discriminator: dbuser = session.query(UnauthenticatedUsers) \
dbuser = session.query(UnauthenticatedUsers) \ .filter(UnauthenticatedUsers.guild_id == int(guild_id)) \
.filter(UnauthenticatedUsers.guild_id == int(guild_id)) \ .filter(UnauthenticatedUsers.username.ilike("%" + username + "%")) \
.filter(UnauthenticatedUsers.username.ilike("%" + username + "%")) \ .filter(UnauthenticatedUsers.discriminator == discriminator) \
.filter(UnauthenticatedUsers.discriminator == discriminator) \ .order_by(UnauthenticatedUsers.id.desc()).first()
.order_by(UnauthenticatedUsers.id.desc()).first() else:
else: dbuser = session.query(UnauthenticatedUsers) \
dbuser = session.query(UnauthenticatedUsers) \ .filter(UnauthenticatedUsers.guild_id == int(guild_id)) \
.filter(UnauthenticatedUsers.guild_id == int(guild_id)) \ .filter(UnauthenticatedUsers.username.ilike("%" + username + "%")) \
.filter(UnauthenticatedUsers.username.ilike("%" + username + "%")) \ .order_by(UnauthenticatedUsers.id.desc()).first()
.order_by(UnauthenticatedUsers.id.desc()).first() if not dbuser:
if not dbuser: return "Kick error! Guest user cannot be found."
return "Kick error! Guest user cannot be found." elif dbuser.revoked:
elif dbuser.revoked: return "Kick error! Guest user **{}#{}** has already been kicked!".format(dbuser.username, dbuser.discriminator)
return "Kick error! Guest user **{}#{}** has already been kicked!".format(dbuser.username, dbuser.discriminator) dbuser.revoked = True
dbuser.revoked = True session.commit()
session.commit() return "Successfully kicked **{}#{}**!".format(dbuser.username, dbuser.discriminator)
return "Successfully kicked **{}#{}**!".format(dbuser.username, dbuser.discriminator)
async def delete_all_messages_from_channel(self, channel_id): async def delete_all_messages_from_channel(self, channel_id):
async with threadpool(): with self.get_session() as session:
with self.get_session() as session: session.query(Messages).filter(Messages.channel_id == int(channel_id)).delete()
session.query(Messages).filter(Messages.channel_id == int(channel_id)).delete() session.commit()
session.commit()