from flask import Blueprint, url_for, redirect, session, render_template, abort, request, jsonify from flask_socketio import emit from functools import wraps from titanembeds.database import db, get_administrators_list, Cosmetics, Guilds, UnauthenticatedUsers, UnauthenticatedBans, TitanTokens, TokenTransactions, get_titan_token, set_titan_token, list_disabled_guilds, DisabledGuilds, UserCSS, AuthenticatedUsers from titanembeds.oauth import generate_guild_icon_url from titanembeds.utils import get_online_embed_user_keys import datetime import json from sqlalchemy import func admin = Blueprint("admin", __name__) def is_admin(f): def decorator(f): @wraps(f) def decorated_function(*args, **kwargs): if 'user_id' not in session: return redirect(url_for("index")) if session['user_id'] not in get_administrators_list(): return redirect(url_for("index")) return f(*args, **kwargs) return decorated_function return decorator(f) def get_online_users_count(): users = get_online_embed_user_keys() auths = len(users["AuthenticatedUsers"]) unauths = len(users["UnauthenticatedUsers"]) return {"authenticated": auths, "guest": unauths, "total": auths + unauths} @admin.route("/") @is_admin def index(): return render_template("admin_index.html.j2", count=get_online_users_count()) @admin.route("/cosmetics", methods=["GET"]) @is_admin def cosmetics(): entries = db.session.query(Cosmetics).all() return render_template("admin_cosmetics.html.j2", cosmetics=entries) @admin.route("/cosmetics", methods=["POST"]) @is_admin def cosmetics_post(): user_id = request.form.get("user_id", None) if not user_id: abort(400) css = request.form.get("css", None) css_limit = int(request.form.get("css_limit", 0)) guest_icon = request.form.get("guest_icon", None) badges = request.form.get("badges", None) entry = db.session.query(Cosmetics).filter(Cosmetics.user_id == user_id).first() if entry: abort(409) user = Cosmetics(user_id) if css: css = css.lower() == "true" user.css = css if css_limit is not None: user.css_limit = css_limit if guest_icon is not None: guest_icon = guest_icon.lower() == "true" user.guest_icon = guest_icon if badges is not None: badges = badges.split(",") if badges == [""]: badges = [] user.badges = json.dumps(badges) db.session.add(user) db.session.commit() return ('', 204) @admin.route("/cosmetics", methods=["DELETE"]) @is_admin def cosmetics_delete(): user_id = request.form.get("user_id", None) if not user_id: abort(400) entry = db.session.query(Cosmetics).filter(Cosmetics.user_id == user_id).first() if not entry: abort(409) db.session.delete(entry) db.session.commit() return ('', 204) @admin.route("/cosmetics", methods=["PATCH"]) @is_admin def cosmetics_patch(): user_id = request.form.get("user_id", None) if not user_id: abort(400) css = request.form.get("css", None) css_limit = request.form.get("css_limit", None) guest_icon = request.form.get("guest_icon", None) badges = request.form.get("badges", None) entry = db.session.query(Cosmetics).filter(Cosmetics.user_id == user_id).first() if not entry: abort(409) if css: css = css.lower() == "true" entry.css = css if css_limit is not None: entry.css_limit = css_limit if guest_icon: guest_icon = guest_icon.lower() == "true" entry.guest_icon = guest_icon if badges is not None: badges = badges.split(",") if badges == [""]: badges = [] entry.badges = json.dumps(badges) db.session.commit() return ('', 204) def prepare_guild_members_list(members, bans): all_users = [] ip_pool = [] members = sorted(members, key=lambda k: k.id, reverse=True) for member in members: user = { "id": member.id, "username": member.username, "discrim": member.discriminator, "ip": member.ip_address, "kicked": member.revoked, "banned": False, "banned_timestamp": None, "banned_by": None, "banned_reason": None, "ban_lifted_by": None, "aliases": [], } for banned in bans: if banned.ip_address == member.ip_address: if banned.lifter_id is None: user['banned'] = True user["banned_timestamp"] = banned.timestamp user['banned_by'] = banned.placer_id user['banned_reason'] = banned.reason user['ban_lifted_by'] = banned.lifter_id continue if user["ip"] not in ip_pool: all_users.append(user) ip_pool.append(user["ip"]) else: for usr in all_users: if user["ip"] == usr["ip"]: alias = user["username"]+"#"+str(user["discrim"]) if len(usr["aliases"]) < 5 and alias not in usr["aliases"]: usr["aliases"].append(alias) continue return all_users @admin.route("/administrate_guild/", methods=["GET"]) @is_admin def administrate_guild(guild_id): db_guild = db.session.query(Guilds).filter(Guilds.guild_id == guild_id).first() if not db_guild: abort(500) return session["redirect"] = None cosmetics = db.session.query(Cosmetics).filter(Cosmetics.user_id == session['user_id']).first() permissions=[] permissions.append("Manage Embed Settings") permissions.append("Ban Members") permissions.append("Kick Members") all_members = db.session.query(UnauthenticatedUsers).filter(UnauthenticatedUsers.guild_id == guild_id).order_by(UnauthenticatedUsers.id).all() all_bans = db.session.query(UnauthenticatedBans).filter(UnauthenticatedBans.guild_id == guild_id).all() users = prepare_guild_members_list(all_members, all_bans) dbguild_dict = { "id": db_guild.guild_id, "name": db_guild.name, "unauth_users": db_guild.unauth_users, "visitor_view": db_guild.visitor_view, "webhook_messages": db_guild.webhook_messages, "chat_links": db_guild.chat_links, "bracket_links": db_guild.bracket_links, "mentions_limit": db_guild.mentions_limit, "unauth_captcha": db_guild.unauth_captcha, "icon": db_guild.icon, "invite_link": db_guild.invite_link if db_guild.invite_link != None else "", "guest_icon": db_guild.guest_icon if db_guild.guest_icon != None else "", } return render_template("administrate_guild.html.j2", guild=dbguild_dict, members=users, permissions=permissions, cosmetics=cosmetics) @admin.route("/administrate_guild/", methods=["POST"]) @is_admin def update_administrate_guild(guild_id): db_guild = db.session.query(Guilds).filter(Guilds.guild_id == guild_id).first() db_guild.unauth_users = request.form.get("unauth_users", db_guild.unauth_users) in ["true", True] db_guild.visitor_view = request.form.get("visitor_view", db_guild.visitor_view) in ["true", True] db_guild.webhook_messages = request.form.get("webhook_messages", db_guild.webhook_messages) in ["true", True] db_guild.chat_links = request.form.get("chat_links", db_guild.chat_links) in ["true", True] db_guild.bracket_links = request.form.get("bracket_links", db_guild.bracket_links) in ["true", True] db_guild.mentions_limit = request.form.get("mentions_limit", db_guild.mentions_limit) db_guild.unauth_captcha = request.form.get("unauth_captcha", db_guild.unauth_captcha) in ["true", True] invite_link = request.form.get("invite_link", db_guild.invite_link) if invite_link != None and invite_link.strip() == "": invite_link = None db_guild.invite_link = invite_link guest_icon = request.form.get("guest_icon", db_guild.guest_icon) if guest_icon != None and guest_icon.strip() == "": guest_icon = None db_guild.guest_icon = guest_icon db.session.commit() emit("guest_icon_change", {"guest_icon": guest_icon if guest_icon else url_for('static', filename='img/titanembeds_square.png')}, room="GUILD_"+guild_id, namespace="/gateway") return jsonify( id=db_guild.id, guild_id=db_guild.guild_id, unauth_users=db_guild.unauth_users, visitor_view=db_guild.visitor_view, webhook_messages=db_guild.webhook_messages, chat_links=db_guild.chat_links, bracket_links=db_guild.bracket_links, mentions_limit=db_guild.mentions_limit, invite_link=db_guild.invite_link, guest_icon=db_guild.guest_icon, unauth_captcha=db_guild.unauth_captcha, ) @admin.route("/guilds") @is_admin def guilds(): guilds = db.session.query(Guilds).all() return render_template("admin_guilds.html.j2", servers=guilds, icon_generate=generate_guild_icon_url) @admin.route("/tokens", methods=["GET"]) @is_admin def manage_titan_tokens(): tokeners = db.session.query(TitanTokens).all() donators = [] for usr in tokeners: row = { "user_id": usr.user_id, "tokens": usr.tokens, "transactions": [] } transact = db.session.query(TokenTransactions).filter(TokenTransactions.user_id == usr.user_id).all() for tr in transact: row["transactions"].append({ "id": tr.id, "user_id": tr.user_id, "timestamp": tr.timestamp, "action": tr.action, "net_tokens": tr.net_tokens, "start_tokens": tr.start_tokens, "end_tokens": tr.end_tokens }) donators.append(row) return render_template("admin_token_transactions.html.j2", donators=donators) @admin.route("/tokens", methods=["POST"]) @is_admin def post_titan_tokens(): user_id = request.form.get("user_id", None) amount = request.form.get("amount", None, type=int) reason = request.form.get("reason", None) if not user_id or not amount: abort(400) if get_titan_token(user_id) != -1: abort(409) set_titan_token(user_id, amount, "NEW VIA ADMIN [{}]".format(str(reason))) return ('', 204) @admin.route("/tokens", methods=["PATCH"]) @is_admin def patch_titan_tokens(): user_id = request.form.get("user_id", None) amount = request.form.get("amount", None, type=int) reason = request.form.get("reason", None) if not user_id or not amount: abort(400) if get_titan_token(user_id) == -1: abort(409) set_titan_token(user_id, amount, "MODIFY VIA ADMIN [{}]".format(str(reason))) return ('', 204) @admin.route("/disabled_guilds", methods=["GET"]) @is_admin def get_disabled_guilds(): return render_template("admin_disabled_guilds.html.j2", guilds=list_disabled_guilds()) @admin.route("/disabled_guilds", methods=["POST"]) @is_admin def post_disabled_guilds(): guild_id = request.form.get("guild_id", None) if guild_id in list_disabled_guilds(): abort(409) guild = DisabledGuilds(guild_id) db.session.add(guild) db.session.commit() return ('', 204) @admin.route("/disabled_guilds", methods=["DELETE"]) @is_admin def delete_disabled_guilds(): guild_id = request.form.get("guild_id", None) if guild_id not in list_disabled_guilds(): abort(409) guild = db.session.query(DisabledGuilds).filter(DisabledGuilds.guild_id == guild_id).first() db.session.delete(guild) db.session.commit() return ('', 204) @admin.route("/custom_css", methods=["GET"]) @is_admin def list_custom_css_get(): css = db.session.query(UserCSS).order_by(UserCSS.id).all() return render_template("admin_usercss.html.j2", css=css) @admin.route("/custom_css/edit/", methods=["GET"]) @is_admin def edit_custom_css_get(css_id): css = db.session.query(UserCSS).filter(UserCSS.id == css_id).first() if not css: abort(404) variables = css.css_variables if variables: variables = json.loads(variables) return render_template("usercss.html.j2", new=False, css=css, variables=variables, admin=True) @admin.route("/custom_css/edit/", methods=["POST"]) @is_admin def edit_custom_css_post(css_id): dbcss = db.session.query(UserCSS).filter(UserCSS.id == css_id).first() if not dbcss: abort(404) name = request.form.get("name", None) user_id = request.form.get("user_id", None) css = request.form.get("css", None) variables = request.form.get("variables", None) variables_enabled = request.form.get("variables_enabled", False) in ["true", True] if not name: abort(400) else: name = name.strip() css = css.strip() if not user_id: user_id = dbcss.user_id if (len(css) == 0): css = None dbcss.name = name dbcss.user_id = user_id dbcss.css = css dbcss.css_variables = variables dbcss.css_var_bool = variables_enabled db.session.commit() return jsonify({"id": dbcss.id}) @admin.route("/custom_css/edit/", methods=["DELETE"]) @is_admin def edit_custom_css_delete(css_id): dbcss = db.session.query(UserCSS).filter(UserCSS.id == css_id).first() if not dbcss: abort(404) db.session.delete(dbcss) db.session.commit() return jsonify({}) @admin.route("/custom_css/new", methods=["GET"]) @is_admin def new_custom_css_get(): return render_template("usercss.html.j2", new=True, admin=True) @admin.route("/custom_css/new", methods=["POST"]) @is_admin def new_custom_css_post(): name = request.form.get("name", None) user_id = request.form.get("user_id", None) css = request.form.get("css", None) variables = request.form.get("variables", None) variables_enabled = request.form.get("variables_enabled", False) in ["true", True] if not name: abort(400) else: name = name.strip() css = css.strip() if not user_id: abort(400) if (len(css) == 0): css = None css = UserCSS(name, user_id, variables_enabled, variables, css) db.session.add(css) db.session.commit() return jsonify({"id": css.id})