76efb6fc68
This implements a very small part of the OIDC session management as described in http://openid.net/specs/openid-connect-session-1_0-17.html#rfc.section.5. It does not implement the full session management (using iframes) and does not implement the registration and verification of logout redirect uri's.
180 lines
5.3 KiB
Python
180 lines
5.3 KiB
Python
import logging
|
|
|
|
from Crypto.PublicKey import RSA
|
|
from django.contrib.auth.views import redirect_to_login, logout
|
|
from django.http import HttpResponse, HttpResponseRedirect, JsonResponse
|
|
from django.shortcuts import render
|
|
from django.template.loader import render_to_string
|
|
from django.views.decorators.http import require_http_methods
|
|
from django.views.generic import View
|
|
from hashlib import md5
|
|
from jwkest import long_to_base64
|
|
|
|
from oidc_provider.lib.endpoints.authorize import *
|
|
from oidc_provider.lib.endpoints.discovery import *
|
|
from oidc_provider.lib.endpoints.token import *
|
|
from oidc_provider.lib.endpoints.userinfo import *
|
|
from oidc_provider.lib.errors import *
|
|
from oidc_provider.lib.utils.common import get_rsa_key
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class AuthorizeView(View):
|
|
|
|
def get(self, request, *args, **kwargs):
|
|
|
|
authorize = AuthorizeEndpoint(request)
|
|
|
|
try:
|
|
authorize.validate_params()
|
|
|
|
if request.user.is_authenticated():
|
|
# Check if there's a hook setted.
|
|
hook_resp = settings.get('OIDC_AFTER_USERLOGIN_HOOK')(
|
|
request=request, user=request.user,
|
|
client=authorize.client)
|
|
if hook_resp:
|
|
return hook_resp
|
|
|
|
if settings.get('OIDC_SKIP_CONSENT_ENABLE'):
|
|
# Check if user previously give consent.
|
|
if authorize.client_has_user_consent():
|
|
uri = authorize.create_response_uri()
|
|
return HttpResponseRedirect(uri)
|
|
|
|
# Generate hidden inputs for the form.
|
|
context = {
|
|
'params': authorize.params,
|
|
}
|
|
hidden_inputs = render_to_string(
|
|
'oidc_provider/hidden_inputs.html', context)
|
|
|
|
# Remove `openid` from scope list
|
|
# since we don't need to print it.
|
|
authorize.params.scope.remove('openid')
|
|
|
|
context = {
|
|
'client': authorize.client,
|
|
'hidden_inputs': hidden_inputs,
|
|
'params': authorize.params,
|
|
}
|
|
|
|
return render(request, 'oidc_provider/authorize.html', context)
|
|
else:
|
|
path = request.get_full_path()
|
|
return redirect_to_login(path)
|
|
|
|
except (ClientIdError, RedirectUriError) as error:
|
|
context = {
|
|
'error': error.error,
|
|
'description': error.description,
|
|
}
|
|
|
|
return render(request, 'oidc_provider/error.html', context)
|
|
|
|
except (AuthorizeError) as error:
|
|
uri = error.create_uri(
|
|
authorize.params.redirect_uri,
|
|
authorize.params.state)
|
|
|
|
return HttpResponseRedirect(uri)
|
|
|
|
def post(self, request, *args, **kwargs):
|
|
|
|
authorize = AuthorizeEndpoint(request)
|
|
|
|
allow = True if request.POST.get('allow') else False
|
|
|
|
try:
|
|
authorize.validate_params()
|
|
|
|
if not allow:
|
|
raise AuthorizeError(authorize.params.redirect_uri,
|
|
'access_denied',
|
|
authorize.grant_type)
|
|
|
|
# Save the user consent given to the client.
|
|
authorize.set_client_user_consent()
|
|
|
|
uri = authorize.create_response_uri()
|
|
return HttpResponseRedirect(uri)
|
|
|
|
except (AuthorizeError) as error:
|
|
uri = error.create_uri(
|
|
authorize.params.redirect_uri,
|
|
authorize.params.state)
|
|
|
|
return HttpResponseRedirect(uri)
|
|
|
|
|
|
class TokenView(View):
|
|
|
|
def post(self, request, *args, **kwargs):
|
|
|
|
token = TokenEndpoint(request)
|
|
|
|
try:
|
|
token.validate_params()
|
|
|
|
dic = token.create_response_dic()
|
|
|
|
return TokenEndpoint.response(dic)
|
|
|
|
except (TokenError) as error:
|
|
return TokenEndpoint.response(error.create_dict(), status=400)
|
|
|
|
|
|
@require_http_methods(['GET', 'POST'])
|
|
def userinfo(request):
|
|
|
|
userinfo = UserInfoEndpoint(request)
|
|
|
|
try:
|
|
userinfo.validate_params()
|
|
|
|
dic = userinfo.create_response_dic()
|
|
|
|
return UserInfoEndpoint.response(dic)
|
|
|
|
except (UserInfoError) as error:
|
|
return UserInfoEndpoint.error_response(
|
|
error.code,
|
|
error.description,
|
|
error.status)
|
|
|
|
|
|
class ProviderInfoView(View):
|
|
|
|
def get(self, request, *args, **kwargs):
|
|
|
|
dic = ProviderInfoEndpoint.create_response_dic()
|
|
|
|
return JsonResponse(dic)
|
|
|
|
|
|
class JwksView(View):
|
|
|
|
def get(self, request, *args, **kwargs):
|
|
dic = dict(keys=[])
|
|
|
|
key = get_rsa_key()
|
|
public_key = RSA.importKey(key).publickey()
|
|
|
|
dic['keys'].append({
|
|
'kty': 'RSA',
|
|
'alg': 'RS256',
|
|
'use': 'sig',
|
|
'kid': md5(key).hexdigest(),
|
|
'n': long_to_base64(public_key.n),
|
|
'e': long_to_base64(public_key.e),
|
|
})
|
|
|
|
return JsonResponse(dic)
|
|
|
|
|
|
class LogoutView(View):
|
|
def get(self, request, *args, **kwargs):
|
|
# We should actually verify if the requested redirect URI is safe
|
|
return logout(request, next_page=request.GET.get('post_logout_redirect_uri'))
|