Titan/webapp/titanembeds/blueprints/admin/admin.py

385 lines
14 KiB
Python
Raw Normal View History

from flask import Blueprint, url_for, redirect, session, render_template, abort, request, jsonify
2017-09-24 06:17:06 +02:00
from flask_socketio import emit
from functools import wraps
2018-01-08 10:13:59 +01:00
from titanembeds.database import db, get_administrators_list, Cosmetics, Guilds, UnauthenticatedUsers, UnauthenticatedBans, TitanTokens, TokenTransactions, get_titan_token, set_titan_token, list_disabled_guilds, DisabledGuilds, UserCSS, AuthenticatedUsers
2017-07-27 22:55:28 +02:00
from titanembeds.oauth import generate_guild_icon_url
import datetime
2017-12-07 07:49:32 +01:00
import json
2018-01-08 10:13:59 +01:00
from sqlalchemy import func
admin = Blueprint("admin", __name__)
def is_admin(f):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'user_id' not in session:
return redirect(url_for("index"))
2017-07-22 04:06:45 +02:00
if session['user_id'] not in get_administrators_list():
return redirect(url_for("index"))
return f(*args, **kwargs)
return decorated_function
return decorator(f)
2018-01-08 10:13:59 +01:00
def get_online_users_count():
time_past = (datetime.datetime.now() - datetime.timedelta(seconds = 15)).strftime('%Y-%m-%d %H:%M:%S')
unauths = db.session.query(func.count(UnauthenticatedUsers.id)).filter(UnauthenticatedUsers.last_timestamp > time_past, UnauthenticatedUsers.revoked == False).scalar()
auths = db.session.query(func.count(AuthenticatedUsers.id)).filter(AuthenticatedUsers.last_timestamp > time_past).scalar()
return {"authenticated": auths, "guest": unauths, "total": auths + unauths}
@admin.route("/")
@is_admin
def index():
2018-01-08 10:13:59 +01:00
return render_template("admin_index.html.j2", count=get_online_users_count())
@admin.route("/cosmetics", methods=["GET"])
@is_admin
def cosmetics():
entries = db.session.query(Cosmetics).all()
return render_template("admin_cosmetics.html.j2", cosmetics=entries)
@admin.route("/cosmetics", methods=["POST"])
@is_admin
def cosmetics_post():
user_id = request.form.get("user_id", None)
if not user_id:
abort(400)
css = request.form.get("css", None)
css_limit = int(request.form.get("css_limit", 0))
guest_icon = request.form.get("guest_icon", None)
2017-12-07 07:49:32 +01:00
badges = request.form.get("badges", None)
entry = db.session.query(Cosmetics).filter(Cosmetics.user_id == user_id).first()
if entry:
abort(409)
user = Cosmetics(user_id)
if css:
css = css.lower() == "true"
user.css = css
if css_limit is not None:
user.css_limit = css_limit
if guest_icon is not None:
guest_icon = guest_icon.lower() == "true"
user.guest_icon = guest_icon
2017-12-07 07:49:32 +01:00
if badges is not None:
badges = badges.split(",")
if badges == [""]:
badges = []
user.badges = json.dumps(badges)
db.session.add(user)
db.session.commit()
return ('', 204)
@admin.route("/cosmetics", methods=["DELETE"])
@is_admin
def cosmetics_delete():
user_id = request.form.get("user_id", None)
if not user_id:
abort(400)
entry = db.session.query(Cosmetics).filter(Cosmetics.user_id == user_id).first()
if not entry:
abort(409)
db.session.delete(entry)
db.session.commit()
return ('', 204)
@admin.route("/cosmetics", methods=["PATCH"])
@is_admin
def cosmetics_patch():
user_id = request.form.get("user_id", None)
if not user_id:
abort(400)
css = request.form.get("css", None)
css_limit = request.form.get("css_limit", None)
guest_icon = request.form.get("guest_icon", None)
2017-12-07 07:49:32 +01:00
badges = request.form.get("badges", None)
entry = db.session.query(Cosmetics).filter(Cosmetics.user_id == user_id).first()
if not entry:
abort(409)
if css:
css = css.lower() == "true"
entry.css = css
if css_limit is not None:
entry.css_limit = css_limit
if guest_icon:
guest_icon = guest_icon.lower() == "true"
entry.guest_icon = guest_icon
2017-12-07 07:49:32 +01:00
if badges is not None:
badges = badges.split(",")
if badges == [""]:
badges = []
entry.badges = json.dumps(badges)
db.session.commit()
return ('', 204)
2017-12-07 07:49:32 +01:00
def prepare_guild_members_list(members, bans):
all_users = []
ip_pool = []
members = sorted(members, key=lambda k: datetime.datetime.strptime(str(k.last_timestamp.replace(tzinfo=None, microsecond=0)), "%Y-%m-%d %H:%M:%S"), reverse=True)
for member in members:
user = {
"id": member.id,
"username": member.username,
"discrim": member.discriminator,
"ip": member.ip_address,
"last_visit": member.last_timestamp,
"kicked": member.revoked,
"banned": False,
"banned_timestamp": None,
"banned_by": None,
"banned_reason": None,
"ban_lifted_by": None,
"aliases": [],
}
for banned in bans:
if banned.ip_address == member.ip_address:
if banned.lifter_id is None:
user['banned'] = True
user["banned_timestamp"] = banned.timestamp
user['banned_by'] = banned.placer_id
user['banned_reason'] = banned.reason
user['ban_lifted_by'] = banned.lifter_id
continue
if user["ip"] not in ip_pool:
all_users.append(user)
ip_pool.append(user["ip"])
else:
for usr in all_users:
if user["ip"] == usr["ip"]:
alias = user["username"]+"#"+str(user["discrim"])
if len(usr["aliases"]) < 5 and alias not in usr["aliases"]:
usr["aliases"].append(alias)
continue
return all_users
2017-07-27 22:55:28 +02:00
@admin.route("/administrate_guild/<guild_id>", methods=["GET"])
@is_admin
def administrate_guild(guild_id):
db_guild = db.session.query(Guilds).filter(Guilds.guild_id == guild_id).first()
if not db_guild:
abort(500)
return
2017-07-27 22:55:28 +02:00
session["redirect"] = None
cosmetics = db.session.query(Cosmetics).filter(Cosmetics.user_id == session['user_id']).first()
2017-07-27 22:55:28 +02:00
permissions=[]
permissions.append("Manage Embed Settings")
permissions.append("Ban Members")
permissions.append("Kick Members")
all_members = db.session.query(UnauthenticatedUsers).filter(UnauthenticatedUsers.guild_id == guild_id).order_by(UnauthenticatedUsers.last_timestamp).all()
all_bans = db.session.query(UnauthenticatedBans).filter(UnauthenticatedBans.guild_id == guild_id).all()
users = prepare_guild_members_list(all_members, all_bans)
dbguild_dict = {
"id": db_guild.guild_id,
"name": db_guild.name,
"unauth_users": db_guild.unauth_users,
"visitor_view": db_guild.visitor_view,
"webhook_messages": db_guild.webhook_messages,
2017-07-27 22:55:28 +02:00
"chat_links": db_guild.chat_links,
"bracket_links": db_guild.bracket_links,
"mentions_limit": db_guild.mentions_limit,
"unauth_captcha": db_guild.unauth_captcha,
2017-07-27 22:55:28 +02:00
"icon": db_guild.icon,
"discordio": db_guild.discordio if db_guild.discordio != None else "",
"guest_icon": db_guild.guest_icon if db_guild.guest_icon != None else "",
2017-07-27 22:55:28 +02:00
}
return render_template("administrate_guild.html.j2", guild=dbguild_dict, members=users, permissions=permissions, cosmetics=cosmetics)
2017-07-27 22:55:28 +02:00
@admin.route("/administrate_guild/<guild_id>", methods=["POST"])
@is_admin
def update_administrate_guild(guild_id):
db_guild = db.session.query(Guilds).filter(Guilds.guild_id == guild_id).first()
db_guild.unauth_users = request.form.get("unauth_users", db_guild.unauth_users) in ["true", True]
db_guild.visitor_view = request.form.get("visitor_view", db_guild.visitor_view) in ["true", True]
db_guild.webhook_messages = request.form.get("webhook_messages", db_guild.webhook_messages) in ["true", True]
2017-07-27 22:55:28 +02:00
db_guild.chat_links = request.form.get("chat_links", db_guild.chat_links) in ["true", True]
db_guild.bracket_links = request.form.get("bracket_links", db_guild.bracket_links) in ["true", True]
db_guild.mentions_limit = request.form.get("mentions_limit", db_guild.mentions_limit)
db_guild.unauth_captcha = request.form.get("unauth_captcha", db_guild.unauth_captcha) in ["true", True]
2017-07-27 22:55:28 +02:00
discordio = request.form.get("discordio", db_guild.discordio)
if discordio != None and discordio.strip() == "":
2017-07-27 22:55:28 +02:00
discordio = None
db_guild.discordio = discordio
guest_icon = request.form.get("guest_icon", db_guild.guest_icon)
if guest_icon != None and guest_icon.strip() == "":
guest_icon = None
db_guild.guest_icon = guest_icon
2017-07-27 22:55:28 +02:00
db.session.commit()
emit("guest_icon_change", {"guest_icon": guest_icon if guest_icon else url_for('static', filename='img/titanembeds_square.png')}, room="GUILD_"+guild_id, namespace="/gateway")
2017-07-27 22:55:28 +02:00
return jsonify(
id=db_guild.id,
guild_id=db_guild.guild_id,
unauth_users=db_guild.unauth_users,
visitor_view=db_guild.visitor_view,
webhook_messages=db_guild.webhook_messages,
2017-07-27 22:55:28 +02:00
chat_links=db_guild.chat_links,
bracket_links=db_guild.bracket_links,
mentions_limit=db_guild.mentions_limit,
discordio=db_guild.discordio,
guest_icon=db_guild.guest_icon,
unauth_captcha=db_guild.unauth_captcha,
2017-07-27 22:55:28 +02:00
)
@admin.route("/guilds")
2017-07-27 22:55:28 +02:00
@is_admin
def guilds():
guilds = db.session.query(Guilds).all()
return render_template("admin_guilds.html.j2", servers=guilds, icon_generate=generate_guild_icon_url)
@admin.route("/tokens", methods=["GET"])
@is_admin
def manage_titan_tokens():
tokeners = db.session.query(TitanTokens).all()
donators = []
for usr in tokeners:
row = {
"user_id": usr.user_id,
"tokens": usr.tokens,
"transactions": []
}
transact = db.session.query(TokenTransactions).filter(TokenTransactions.user_id == usr.user_id).all()
for tr in transact:
row["transactions"].append({
"id": tr.id,
"user_id": tr.user_id,
"timestamp": tr.timestamp,
"action": tr.action,
"net_tokens": tr.net_tokens,
"start_tokens": tr.start_tokens,
"end_tokens": tr.end_tokens
})
donators.append(row)
return render_template("admin_token_transactions.html.j2", donators=donators)
@admin.route("/tokens", methods=["POST"])
@is_admin
def post_titan_tokens():
user_id = request.form.get("user_id", None)
amount = request.form.get("amount", None, type=int)
reason = request.form.get("reason", None)
if not user_id or not amount:
abort(400)
if get_titan_token(user_id) != -1:
abort(409)
set_titan_token(user_id, amount, "NEW VIA ADMIN [{}]".format(str(reason)))
return ('', 204)
@admin.route("/tokens", methods=["PATCH"])
@is_admin
def patch_titan_tokens():
user_id = request.form.get("user_id", None)
amount = request.form.get("amount", None, type=int)
reason = request.form.get("reason", None)
if not user_id or not amount:
abort(400)
if get_titan_token(user_id) == -1:
abort(409)
set_titan_token(user_id, amount, "MODIFY VIA ADMIN [{}]".format(str(reason)))
return ('', 204)
@admin.route("/disabled_guilds", methods=["GET"])
@is_admin
def get_disabled_guilds():
return render_template("admin_disabled_guilds.html.j2", guilds=list_disabled_guilds())
@admin.route("/disabled_guilds", methods=["POST"])
@is_admin
def post_disabled_guilds():
guild_id = request.form.get("guild_id", None)
if guild_id in list_disabled_guilds():
abort(409)
guild = DisabledGuilds(guild_id)
db.session.add(guild)
db.session.commit()
return ('', 204)
@admin.route("/disabled_guilds", methods=["DELETE"])
@is_admin
def delete_disabled_guilds():
guild_id = request.form.get("guild_id", None)
if guild_id not in list_disabled_guilds():
abort(409)
guild = db.session.query(DisabledGuilds).filter(DisabledGuilds.guild_id == guild_id).first()
db.session.delete(guild)
db.session.commit()
return ('', 204)
@admin.route("/custom_css", methods=["GET"])
@is_admin
def list_custom_css_get():
css = db.session.query(UserCSS).order_by(UserCSS.id).all()
return render_template("admin_usercss.html.j2", css=css)
@admin.route("/custom_css/edit/<css_id>", methods=["GET"])
@is_admin
def edit_custom_css_get(css_id):
css = db.session.query(UserCSS).filter(UserCSS.id == css_id).first()
if not css:
abort(404)
variables = css.css_variables
if variables:
variables = json.loads(variables)
return render_template("usercss.html.j2", new=False, css=css, variables=variables, admin=True)
@admin.route("/custom_css/edit/<css_id>", methods=["POST"])
@is_admin
def edit_custom_css_post(css_id):
dbcss = db.session.query(UserCSS).filter(UserCSS.id == css_id).first()
if not dbcss:
abort(404)
name = request.form.get("name", None)
user_id = request.form.get("user_id", None)
css = request.form.get("css", None)
variables = request.form.get("variables", None)
variables_enabled = request.form.get("variables_enabled", False) in ["true", True]
if not name:
abort(400)
else:
name = name.strip()
css = css.strip()
if not user_id:
user_id = dbcss.user_id
if (len(css) == 0):
css = None
dbcss.name = name
dbcss.user_id = user_id
dbcss.css = css
dbcss.css_variables = variables
dbcss.css_var_bool = variables_enabled
db.session.commit()
return jsonify({"id": dbcss.id})
@admin.route("/custom_css/edit/<css_id>", methods=["DELETE"])
@is_admin
def edit_custom_css_delete(css_id):
dbcss = db.session.query(UserCSS).filter(UserCSS.id == css_id).first()
if not dbcss:
abort(404)
db.session.delete(dbcss)
db.session.commit()
return jsonify({})
@admin.route("/custom_css/new", methods=["GET"])
@is_admin
def new_custom_css_get():
return render_template("usercss.html.j2", new=True, admin=True)
@admin.route("/custom_css/new", methods=["POST"])
@is_admin
def new_custom_css_post():
name = request.form.get("name", None)
user_id = request.form.get("user_id", None)
css = request.form.get("css", None)
variables = request.form.get("variables", None)
variables_enabled = request.form.get("variables_enabled", False) in ["true", True]
if not name:
abort(400)
else:
name = name.strip()
css = css.strip()
if not user_id:
abort(400)
if (len(css) == 0):
css = None
css = UserCSS(name, user_id, variables_enabled, variables, css)
db.session.add(css)
db.session.commit()
return jsonify({"id": css.id})