mirror of
				https://github.com/TitanEmbeds/Titan.git
				synced 2025-11-04 07:47:10 +01:00 
			
		
		
		
	Isolated oauth
This commit is contained in:
		@@ -1,103 +1,11 @@
 | 
				
			|||||||
from flask import Blueprint, request, redirect, jsonify, abort, session, url_for, render_template
 | 
					from flask import Blueprint, request, redirect, jsonify, abort, session, url_for, render_template
 | 
				
			||||||
from requests_oauthlib import OAuth2Session
 | 
					 | 
				
			||||||
from config import config
 | 
					from config import config
 | 
				
			||||||
from titanembeds.decorators import discord_users_only
 | 
					from titanembeds.decorators import discord_users_only
 | 
				
			||||||
from titanembeds.utils import discord_api, cache, make_cache_key, make_guilds_cache_key
 | 
					from titanembeds.utils import discord_api
 | 
				
			||||||
from titanembeds.database import db, Guilds, UnauthenticatedUsers, UnauthenticatedBans
 | 
					from titanembeds.database import db, Guilds, UnauthenticatedUsers, UnauthenticatedBans
 | 
				
			||||||
 | 
					from titanembeds.oauth import authorize_url, token_url, make_authenticated_session, get_current_authenticated_user, get_user_managed_servers, check_user_can_administrate_guild, check_user_permission, generate_avatar_url, generate_guild_icon_url, generate_bot_invite_url
 | 
				
			||||||
 | 
					
 | 
				
			||||||
user = Blueprint("user", __name__)
 | 
					user = Blueprint("user", __name__)
 | 
				
			||||||
redirect_url = config['app-base-url'] + "/user/callback"
 | 
					 | 
				
			||||||
authorize_url = "https://discordapp.com/api/oauth2/authorize"
 | 
					 | 
				
			||||||
token_url = "https://discordapp.com/api/oauth2/token"
 | 
					 | 
				
			||||||
avatar_base_url = "https://cdn.discordapp.com/avatars/"
 | 
					 | 
				
			||||||
guild_icon_url = "https://cdn.discordapp.com/icons/"
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
def update_user_token(discord_token):
 | 
					 | 
				
			||||||
    session['user_keys'] = discord_token
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
def make_authenticated_session(token=None, state=None, scope=None):
 | 
					 | 
				
			||||||
    return OAuth2Session(
 | 
					 | 
				
			||||||
        client_id=config['client-id'],
 | 
					 | 
				
			||||||
        token=token,
 | 
					 | 
				
			||||||
        state=state,
 | 
					 | 
				
			||||||
        scope=scope,
 | 
					 | 
				
			||||||
        redirect_uri=url_for("user.callback", _external=True),
 | 
					 | 
				
			||||||
        auto_refresh_kwargs={
 | 
					 | 
				
			||||||
            'client_id': config['client-id'],
 | 
					 | 
				
			||||||
            'client_secret': config['client-secret'],
 | 
					 | 
				
			||||||
        },
 | 
					 | 
				
			||||||
        auto_refresh_url=token_url,
 | 
					 | 
				
			||||||
        token_updater=update_user_token,
 | 
					 | 
				
			||||||
    )
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
def discordrest_from_user(endpoint):
 | 
					 | 
				
			||||||
    token = session['user_keys']
 | 
					 | 
				
			||||||
    discord = make_authenticated_session(token=token)
 | 
					 | 
				
			||||||
    req = discord.get("https://discordapp.com/api/v6{}".format(endpoint))
 | 
					 | 
				
			||||||
    return req
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
def get_current_authenticated_user():
 | 
					 | 
				
			||||||
    req = discordrest_from_user("/users/@me")
 | 
					 | 
				
			||||||
    if req.status_code != 200:
 | 
					 | 
				
			||||||
        abort(req.status_code)
 | 
					 | 
				
			||||||
    user = req.json()
 | 
					 | 
				
			||||||
    return user
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
def user_has_permission(permission, index):
 | 
					 | 
				
			||||||
    return bool((int(permission) >> index) & 1)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
@cache.cached(timeout=120, key_prefix=make_guilds_cache_key)
 | 
					 | 
				
			||||||
def get_user_guilds():
 | 
					 | 
				
			||||||
    req = discordrest_from_user("/users/@me/guilds")
 | 
					 | 
				
			||||||
    return req
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
def get_user_managed_servers():
 | 
					 | 
				
			||||||
    guilds = get_user_guilds()
 | 
					 | 
				
			||||||
    if guilds.status_code != 200:
 | 
					 | 
				
			||||||
        abort(guilds.status_code)
 | 
					 | 
				
			||||||
    guilds = guilds.json()
 | 
					 | 
				
			||||||
    filtered = []
 | 
					 | 
				
			||||||
    for guild in guilds:
 | 
					 | 
				
			||||||
        permission = guild['permissions'] # Manage Server, Ban Members, Kick Members
 | 
					 | 
				
			||||||
        if guild['owner'] or user_has_permission(permission, 5) or user_has_permission(permission, 2) or user_has_permission(permission, 1):
 | 
					 | 
				
			||||||
            filtered.append(guild)
 | 
					 | 
				
			||||||
    filtered = sorted(filtered, key=lambda guild: guild['name'])
 | 
					 | 
				
			||||||
    return filtered
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
def get_user_managed_servers_safe():
 | 
					 | 
				
			||||||
    guilds = get_user_managed_servers()
 | 
					 | 
				
			||||||
    if guilds:
 | 
					 | 
				
			||||||
        return guilds
 | 
					 | 
				
			||||||
    return []
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
def get_user_managed_servers_id():
 | 
					 | 
				
			||||||
    guilds = get_user_managed_servers_safe()
 | 
					 | 
				
			||||||
    ids=[]
 | 
					 | 
				
			||||||
    for guild in guilds:
 | 
					 | 
				
			||||||
        ids.append(guild['id'])
 | 
					 | 
				
			||||||
    return ids
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
def check_user_can_administrate_guild(guild_id):
 | 
					 | 
				
			||||||
    guilds = get_user_managed_servers_id()
 | 
					 | 
				
			||||||
    return guild_id in guilds
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
def check_user_permission(guild_id, id):
 | 
					 | 
				
			||||||
    guilds = get_user_managed_servers_safe()
 | 
					 | 
				
			||||||
    for guild in guilds:
 | 
					 | 
				
			||||||
        if guild['id'] == guild_id:
 | 
					 | 
				
			||||||
            return user_has_permission(guild['permissions'], id) or guild['owner']
 | 
					 | 
				
			||||||
    return False
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
def generate_avatar_url(id, av):
 | 
					 | 
				
			||||||
    return avatar_base_url + str(id) + '/' + str(av) + '.jpg'
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
def generate_guild_icon_url(id, hash):
 | 
					 | 
				
			||||||
    return guild_icon_url + str(id) + "/" + str(hash) + ".jpg"
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
def generate_bot_invite_url(guild_id):
 | 
					 | 
				
			||||||
    url = "https://discordapp.com/oauth2/authorize?&client_id={}&scope=bot&permissions={}&guild_id={}&response_type=code&redirect_uri={}".format(config['client-id'], '536083583', guild_id, url_for("user.dashboard", _external=True))
 | 
					 | 
				
			||||||
    return url
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
@user.route("/login_authenticated", methods=["GET"])
 | 
					@user.route("/login_authenticated", methods=["GET"])
 | 
				
			||||||
def login_authenticated():
 | 
					def login_authenticated():
 | 
				
			||||||
    session["redirect"] = request.args.get("redirect")
 | 
					    session["redirect"] = request.args.get("redirect")
 | 
				
			||||||
@@ -281,8 +189,8 @@ def unban_unauthenticated_user():
 | 
				
			|||||||
@user.route("/revoke", methods=["POST"])
 | 
					@user.route("/revoke", methods=["POST"])
 | 
				
			||||||
@discord_users_only(api=True)
 | 
					@discord_users_only(api=True)
 | 
				
			||||||
def revoke_unauthenticated_user():
 | 
					def revoke_unauthenticated_user():
 | 
				
			||||||
    guild_id = request.args.get("guild_id", None)
 | 
					    guild_id = request.form.get("guild_id", None)
 | 
				
			||||||
    user_id = request.args.get("user_id", None)
 | 
					    user_id = request.form.get("user_id", None)
 | 
				
			||||||
    if not guild_id or not user_id:
 | 
					    if not guild_id or not user_id:
 | 
				
			||||||
        abort(400)
 | 
					        abort(400)
 | 
				
			||||||
    if not check_user_permission(guild_id, 1):
 | 
					    if not check_user_permission(guild_id, 1):
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										96
									
								
								titanembeds/oauth.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										96
									
								
								titanembeds/oauth.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,96 @@
 | 
				
			|||||||
 | 
					from config import config
 | 
				
			||||||
 | 
					from requests_oauthlib import OAuth2Session
 | 
				
			||||||
 | 
					from titanembeds.utils import cache, make_guilds_cache_key
 | 
				
			||||||
 | 
					from flask import session, abort, url_for
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					redirect_url = config['app-base-url'] + "/user/callback"
 | 
				
			||||||
 | 
					authorize_url = "https://discordapp.com/api/oauth2/authorize"
 | 
				
			||||||
 | 
					token_url = "https://discordapp.com/api/oauth2/token"
 | 
				
			||||||
 | 
					avatar_base_url = "https://cdn.discordapp.com/avatars/"
 | 
				
			||||||
 | 
					guild_icon_url = "https://cdn.discordapp.com/icons/"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def update_user_token(discord_token):
 | 
				
			||||||
 | 
					    session['user_keys'] = discord_token
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def make_authenticated_session(token=None, state=None, scope=None):
 | 
				
			||||||
 | 
					    return OAuth2Session(
 | 
				
			||||||
 | 
					        client_id=config['client-id'],
 | 
				
			||||||
 | 
					        token=token,
 | 
				
			||||||
 | 
					        state=state,
 | 
				
			||||||
 | 
					        scope=scope,
 | 
				
			||||||
 | 
					        redirect_uri=url_for("user.callback", _external=True),
 | 
				
			||||||
 | 
					        auto_refresh_kwargs={
 | 
				
			||||||
 | 
					            'client_id': config['client-id'],
 | 
				
			||||||
 | 
					            'client_secret': config['client-secret'],
 | 
				
			||||||
 | 
					        },
 | 
				
			||||||
 | 
					        auto_refresh_url=token_url,
 | 
				
			||||||
 | 
					        token_updater=update_user_token,
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def discordrest_from_user(endpoint):
 | 
				
			||||||
 | 
					    token = session['user_keys']
 | 
				
			||||||
 | 
					    discord = make_authenticated_session(token=token)
 | 
				
			||||||
 | 
					    req = discord.get("https://discordapp.com/api/v6{}".format(endpoint))
 | 
				
			||||||
 | 
					    return req
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def get_current_authenticated_user():
 | 
				
			||||||
 | 
					    req = discordrest_from_user("/users/@me")
 | 
				
			||||||
 | 
					    if req.status_code != 200:
 | 
				
			||||||
 | 
					        abort(req.status_code)
 | 
				
			||||||
 | 
					    user = req.json()
 | 
				
			||||||
 | 
					    return user
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def user_has_permission(permission, index):
 | 
				
			||||||
 | 
					    return bool((int(permission) >> index) & 1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					@cache.cached(timeout=120, key_prefix=make_guilds_cache_key)
 | 
				
			||||||
 | 
					def get_user_guilds():
 | 
				
			||||||
 | 
					    req = discordrest_from_user("/users/@me/guilds")
 | 
				
			||||||
 | 
					    return req
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def get_user_managed_servers():
 | 
				
			||||||
 | 
					    guilds = get_user_guilds()
 | 
				
			||||||
 | 
					    if guilds.status_code != 200:
 | 
				
			||||||
 | 
					        abort(guilds.status_code)
 | 
				
			||||||
 | 
					    guilds = guilds.json()
 | 
				
			||||||
 | 
					    filtered = []
 | 
				
			||||||
 | 
					    for guild in guilds:
 | 
				
			||||||
 | 
					        permission = guild['permissions'] # Manage Server, Ban Members, Kick Members
 | 
				
			||||||
 | 
					        if guild['owner'] or user_has_permission(permission, 5) or user_has_permission(permission, 2) or user_has_permission(permission, 1):
 | 
				
			||||||
 | 
					            filtered.append(guild)
 | 
				
			||||||
 | 
					    filtered = sorted(filtered, key=lambda guild: guild['name'])
 | 
				
			||||||
 | 
					    return filtered
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def get_user_managed_servers_safe():
 | 
				
			||||||
 | 
					    guilds = get_user_managed_servers()
 | 
				
			||||||
 | 
					    if guilds:
 | 
				
			||||||
 | 
					        return guilds
 | 
				
			||||||
 | 
					    return []
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def get_user_managed_servers_id():
 | 
				
			||||||
 | 
					    guilds = get_user_managed_servers_safe()
 | 
				
			||||||
 | 
					    ids=[]
 | 
				
			||||||
 | 
					    for guild in guilds:
 | 
				
			||||||
 | 
					        ids.append(guild['id'])
 | 
				
			||||||
 | 
					    return ids
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def check_user_can_administrate_guild(guild_id):
 | 
				
			||||||
 | 
					    guilds = get_user_managed_servers_id()
 | 
				
			||||||
 | 
					    return guild_id in guilds
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def check_user_permission(guild_id, id):
 | 
				
			||||||
 | 
					    guilds = get_user_managed_servers_safe()
 | 
				
			||||||
 | 
					    for guild in guilds:
 | 
				
			||||||
 | 
					        if guild['id'] == guild_id:
 | 
				
			||||||
 | 
					            return user_has_permission(guild['permissions'], id) or guild['owner']
 | 
				
			||||||
 | 
					    return False
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def generate_avatar_url(id, av):
 | 
				
			||||||
 | 
					    return avatar_base_url + str(id) + '/' + str(av) + '.jpg'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def generate_guild_icon_url(id, hash):
 | 
				
			||||||
 | 
					    return guild_icon_url + str(id) + "/" + str(hash) + ".jpg"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def generate_bot_invite_url(guild_id):
 | 
				
			||||||
 | 
					    url = "https://discordapp.com/oauth2/authorize?&client_id={}&scope=bot&permissions={}&guild_id={}&response_type=code&redirect_uri={}".format(config['client-id'], '536083583', guild_id, url_for("user.dashboard", _external=True))
 | 
				
			||||||
 | 
					    return url
 | 
				
			||||||
@@ -74,7 +74,7 @@
 | 
				
			|||||||
              <tbody>
 | 
					              <tbody>
 | 
				
			||||||
                {% for member in members %}
 | 
					                {% for member in members %}
 | 
				
			||||||
                  <tr>
 | 
					                  <tr>
 | 
				
			||||||
                    <td><a class="waves-effect waves-light btn orange"  {% if "Kick Members" not in permissions or member["kicked"] %}disabled{% endif %}   >Kick</a></td>
 | 
					                    <td><a class="waves-effect waves-light btn orange"  {% if "Kick Members" not in permissions or member["kicked"] %}disabled{% endif %} onclick='revoke_user( "{{ guild['id'] }}" , {{ member['id'] }} )'  >Kick</a></td>
 | 
				
			||||||
                    {% if not member["banned"] %}
 | 
					                    {% if not member["banned"] %}
 | 
				
			||||||
                    <td><a class="waves-effect waves-light btn red"   {% if "Ban Members" not in permissions %}disabled{% endif %}  {% if "Ban Members" in permissions %}  onclick='initiate_ban( "{{ guild['id'] }}" , {{ member['id'] }} )'  {% endif %}  >Ban</a></td>
 | 
					                    <td><a class="waves-effect waves-light btn red"   {% if "Ban Members" not in permissions %}disabled{% endif %}  {% if "Ban Members" in permissions %}  onclick='initiate_ban( "{{ guild['id'] }}" , {{ member['id'] }} )'  {% endif %}  >Ban</a></td>
 | 
				
			||||||
                    {% else %}
 | 
					                    {% else %}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user