Merge pull request #4 from juanifioren/v0.5.x

Fetch latest changes
This commit is contained in:
Wojciech Bartosiak 2017-07-11 07:35:08 +02:00 committed by GitHub
commit 45717a82f2
8 changed files with 340 additions and 106 deletions

View file

@ -78,7 +78,7 @@ Let's say that you want add your custom ``foo`` scope for your OAuth2/OpenID pro
Somewhere in your Django ``settings.py``::
OIDC_USERINFO = 'yourproject.oidc_provider_settings.CustomScopeClaims'
OIDC_EXTRA_SCOPE_CLAIMS = 'yourproject.oidc_provider_settings.CustomScopeClaims'
Inside your oidc_provider_settings.py file add the following class::

View file

@ -30,12 +30,13 @@ from oidc_provider.models import (
UserConsent,
)
from oidc_provider import settings
from oidc_provider.lib.utils.common import cleanup_url_from_query_string, get_browser_state_or_default
from oidc_provider.lib.utils.common import get_browser_state_or_default
logger = logging.getLogger(__name__)
class AuthorizeEndpoint(object):
_allowed_prompt_params = {'none', 'login', 'consent', 'select_account'}
def __init__(self, request):
self.request = request
@ -74,7 +75,9 @@ class AuthorizeEndpoint(object):
self.params['scope'] = query_dict.get('scope', '').split()
self.params['state'] = query_dict.get('state', '')
self.params['nonce'] = query_dict.get('nonce', '')
self.params['prompt'] = query_dict.get('prompt', '')
self.params['prompt'] = self._allowed_prompt_params.intersection(set(query_dict.get('prompt', '').split()))
self.params['code_challenge'] = query_dict.get('code_challenge', '')
self.params['code_challenge_method'] = query_dict.get('code_challenge_method', '')
@ -90,8 +93,7 @@ class AuthorizeEndpoint(object):
if self.is_authentication and not self.params['redirect_uri']:
logger.debug('[Authorize] Missing redirect uri.')
raise RedirectUriError()
clean_redirect_uri = cleanup_url_from_query_string(self.params['redirect_uri'])
if not (clean_redirect_uri in self.client.redirect_uris):
if not (self.params['redirect_uri'] in self.client.redirect_uris):
logger.debug('[Authorize] Invalid redirect uri: %s', self.params['redirect_uri'])
raise RedirectUriError()

View file

@ -3,7 +3,6 @@ import hashlib
import logging
import re
from django.contrib.auth import authenticate
from oidc_provider.lib.utils.common import cleanup_url_from_query_string
try:
from urllib.parse import unquote
@ -43,8 +42,7 @@ class TokenEndpoint(object):
self.params['client_id'] = client_id
self.params['client_secret'] = client_secret
self.params['redirect_uri'] = unquote(
self.request.POST.get('redirect_uri', '').split('?', 1)[0])
self.params['redirect_uri'] = self.request.POST.get('redirect_uri', '')
self.params['grant_type'] = self.request.POST.get('grant_type', '')
self.params['code'] = self.request.POST.get('code', '')
self.params['state'] = self.request.POST.get('state', '')
@ -93,8 +91,7 @@ class TokenEndpoint(object):
raise TokenError('invalid_client')
if self.params['grant_type'] == 'authorization_code':
clean_redirect_uri = cleanup_url_from_query_string(self.params['redirect_uri'])
if not (clean_redirect_uri in self.client.redirect_uris):
if not (self.params['redirect_uri'] in self.client.redirect_uris):
logger.debug('[Token] Invalid redirect uri: %s', self.params['redirect_uri'])
raise TokenError('invalid_client')
@ -162,6 +159,8 @@ class TokenEndpoint(object):
return self.create_access_token_response_dic()
def create_access_token_response_dic(self):
# See https://tools.ietf.org/html/rfc6749#section-4.3
token = create_token(
self.user,
self.client,
@ -173,7 +172,7 @@ class TokenEndpoint(object):
nonce='self.code.nonce',
at_hash=token.at_hash,
request=self.request,
scope=self.params['scope'],
scope=token.scope,
)
token.id_token = id_token_dic
@ -188,6 +187,8 @@ class TokenEndpoint(object):
}
def create_code_response_dic(self):
# See https://tools.ietf.org/html/rfc6749#section-4.1
token = create_token(
user=self.code.user,
client=self.code.client,
@ -200,7 +201,7 @@ class TokenEndpoint(object):
nonce=self.code.nonce,
at_hash=token.at_hash,
request=self.request,
scope=self.params['scope'],
scope=token.scope,
)
else:
id_token_dic = {}
@ -223,10 +224,18 @@ class TokenEndpoint(object):
return dic
def create_refresh_response_dic(self):
# See https://tools.ietf.org/html/rfc6749#section-6
scope_param = self.params['scope']
scope = (scope_param.split(' ') if scope_param else self.token.scope)
unauthorized_scopes = set(scope) - set(self.token.scope)
if unauthorized_scopes:
raise TokenError('invalid_scope')
token = create_token(
user=self.token.user,
client=self.token.client,
scope=self.token.scope)
scope=scope)
# If the Token has an id_token it's an Authentication request.
if self.token.id_token:
@ -236,7 +245,7 @@ class TokenEndpoint(object):
nonce=None,
at_hash=token.at_hash,
request=self.request,
scope=self.params['scope'],
scope=token.scope,
)
else:
id_token_dic = {}

View file

@ -11,19 +11,6 @@ except ImportError:
from urllib.parse import urlsplit, urlunsplit
def cleanup_url_from_query_string(uri):
"""
Function used to clean up the uri from any query string, used i.e. by endpoints to validate redirect_uri
:param uri: URI to clean from query string
:type uri: str
:return: cleaned URI without query string
"""
clean_uri = urlsplit(uri)
clean_uri = urlunsplit(clean_uri._replace(query=''))
return clean_uri
def redirect(uri):
"""
Custom Response object for redirecting to a Non-HTTP url scheme.

View file

@ -29,6 +29,8 @@ def create_fake_user():
user = User()
user.username = 'johndoe'
user.email = 'johndoe@example.com'
user.first_name = 'John'
user.last_name = 'Doe'
user.set_password('1234')
user.save()

View file

@ -1,3 +1,5 @@
from oidc_provider.lib.errors import RedirectUriError
try:
from urllib.parse import urlencode
except ImportError:
@ -249,9 +251,13 @@ class AuthorizationCodeFlowTestCase(TestCase, AuthorizeEndpointMixin):
self.assertEqual(is_code_ok, True, msg='Code returned is invalid or missing.')
def test_response_uri_is_properly_constructed(self):
"""
Check that the redirect_uri matches the one configured for the client.
Only 'state' and 'code' should be appended.
"""
data = {
'client_id': self.client.client_id,
'redirect_uri': self.client.default_redirect_uri + "?redirect_state=xyz",
'redirect_uri': self.client.default_redirect_uri,
'response_type': 'code',
'scope': 'openid email',
'state': self.state,
@ -260,11 +266,55 @@ class AuthorizationCodeFlowTestCase(TestCase, AuthorizeEndpointMixin):
response = self._auth_request('post', data, is_user_authenticated=True)
# TODO
parsed = urlsplit(response['Location'])
params = parse_qs(parsed.query or parsed.fragment)
state = params['state'][0]
self.assertEquals(self.state, state, msg="State returned is invalid or missing")
is_code_ok = is_code_valid(url=response['Location'],
user=self.user,
client=self.client)
self.assertTrue(is_code_ok, msg='Code returned is invalid or missing')
self.assertEquals(set(params.keys()), set(['state', 'code']), msg='More than state or code appended as query params')
self.assertTrue(response['Location'].startswith(self.client.default_redirect_uri), msg='Different redirect_uri returned')
def test_unknown_redirect_uris_are_rejected(self):
"""
If a redirect_uri is not registered with the client the request must be rejected.
See http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest.
"""
data = {
'client_id': self.client.client_id,
'response_type': 'code',
'redirect_uri': 'http://neverseenthis.com',
'scope': 'openid email',
'state': self.state,
}
response = self._auth_request('get', data)
self.assertIn(RedirectUriError.error, response.content.decode('utf-8'), msg='No redirect_uri error')
def test_manipulated_redirect_uris_are_rejected(self):
"""
If a redirect_uri does not exactly match the registered uri it must be rejected.
See http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest.
"""
data = {
'client_id': self.client.client_id,
'response_type': 'code',
'redirect_uri': self.client.default_redirect_uri + "?some=query",
'scope': 'openid email',
'state': self.state,
}
response = self._auth_request('get', data)
self.assertIn(RedirectUriError.error, response.content.decode('utf-8'), msg='No redirect_uri error')
def test_public_client_auto_approval(self):
"""
It's recommended not auto-approving requests for non-confidential clients.
It's recommended not auto-approving requests for non-confidential clients using Authorization Code.
"""
data = {
'client_id': self.client_public_with_no_consent.client_id,
@ -278,7 +328,7 @@ class AuthorizationCodeFlowTestCase(TestCase, AuthorizeEndpointMixin):
self.assertIn('Request for Permission', response.content.decode('utf-8'))
def test_prompt_parameter(self):
def test_prompt_none_parameter(self):
"""
Specifies whether the Authorization Server prompts the End-User for reauthentication and consent.
See: http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest
@ -289,10 +339,9 @@ class AuthorizationCodeFlowTestCase(TestCase, AuthorizeEndpointMixin):
'redirect_uri': self.client.default_redirect_uri,
'scope': 'openid email',
'state': self.state,
'prompt': 'none'
}
data['prompt'] = 'none'
response = self._auth_request('get', data)
# An error is returned if an End-User is not already authenticated.
@ -301,7 +350,92 @@ class AuthorizationCodeFlowTestCase(TestCase, AuthorizeEndpointMixin):
response = self._auth_request('get', data, is_user_authenticated=True)
# An error is returned if the Client does not have pre-configured consent for the requested Claims.
self.assertIn('interaction_required', response['Location'])
self.assertIn('consent_required', response['Location'])
@patch('oidc_provider.views.django_user_logout')
def test_prompt_login_parameter(self, logout_function):
"""
Specifies whether the Authorization Server prompts the End-User for reauthentication and consent.
See: http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest
"""
data = {
'client_id': self.client.client_id,
'response_type': self.client.response_type,
'redirect_uri': self.client.default_redirect_uri,
'scope': 'openid email',
'state': self.state,
'prompt': 'login'
}
response = self._auth_request('get', data)
self.assertIn(settings.get('OIDC_LOGIN_URL'), response['Location'])
response = self._auth_request('get', data, is_user_authenticated=True)
self.assertIn(settings.get('OIDC_LOGIN_URL'), response['Location'])
self.assertTrue(logout_function.called_once())
def test_prompt_login_none_parameter(self):
"""
Specifies whether the Authorization Server prompts the End-User for reauthentication and consent.
See: http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest
"""
data = {
'client_id': self.client.client_id,
'response_type': self.client.response_type,
'redirect_uri': self.client.default_redirect_uri,
'scope': 'openid email',
'state': self.state,
'prompt': 'login none'
}
response = self._auth_request('get', data)
self.assertIn('login_required', response['Location'])
response = self._auth_request('get', data, is_user_authenticated=True)
self.assertIn('login_required', response['Location'])
@patch('oidc_provider.views.render')
def test_prompt_consent_parameter(self, render_patched):
"""
Specifies whether the Authorization Server prompts the End-User for reauthentication and consent.
See: http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest
"""
data = {
'client_id': self.client.client_id,
'response_type': self.client.response_type,
'redirect_uri': self.client.default_redirect_uri,
'scope': 'openid email',
'state': self.state,
'prompt': 'consent'
}
response = self._auth_request('get', data)
self.assertIn(settings.get('OIDC_LOGIN_URL'), response['Location'])
response = self._auth_request('get', data, is_user_authenticated=True)
render_patched.assert_called_once()
self.assertTrue(render_patched.call_args[0][1], settings.get('OIDC_TEMPLATES')['authorize'])
def test_prompt_consent_none_parameter(self):
"""
Specifies whether the Authorization Server prompts the End-User for reauthentication and consent.
See: http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest
"""
data = {
'client_id': self.client.client_id,
'response_type': self.client.response_type,
'redirect_uri': self.client.default_redirect_uri,
'scope': 'openid email',
'state': self.state,
'prompt': 'consent none'
}
response = self._auth_request('get', data)
self.assertIn('login_required', response['Location'])
response = self._auth_request('get', data, is_user_authenticated=True)
self.assertIn('consent_required', response['Location'])
class AuthorizationImplicitFlowTestCase(TestCase, AuthorizeEndpointMixin):
@ -315,6 +449,9 @@ class AuthorizationImplicitFlowTestCase(TestCase, AuthorizeEndpointMixin):
self.user = create_fake_user()
self.client = create_fake_client(response_type='id_token token')
self.client_public = create_fake_client(response_type='id_token token', is_public=True)
self.client_public_no_consent = create_fake_client(
response_type='id_token token', is_public=True,
require_consent=False)
self.client_no_access = create_fake_client(response_type='id_token')
self.client_public_no_access = create_fake_client(response_type='id_token', is_public=True)
self.state = uuid.uuid4().hex
@ -448,6 +585,28 @@ class AuthorizationImplicitFlowTestCase(TestCase, AuthorizeEndpointMixin):
self.assertNotIn('at_hash', id_token)
def test_public_client_implicit_auto_approval(self):
"""
Public clients using Implicit Flow should be able to reuse consent.
"""
data = {
'client_id': self.client_public_no_consent.client_id,
'response_type': self.client_public_no_consent.response_type,
'redirect_uri': self.client_public_no_consent.default_redirect_uri,
'scope': 'openid email',
'state': self.state,
'nonce': self.nonce,
}
response = self._auth_request('get', data, is_user_authenticated=True)
response_text = response.content.decode('utf-8')
self.assertEquals(response_text, '')
components = urlsplit(response['Location'])
fragment = parse_qs(components[4])
self.assertIn('access_token', fragment)
self.assertIn('id_token', fragment)
self.assertIn('expires_in', fragment)
class AuthorizationHybridFlowTestCase(TestCase, AuthorizeEndpointMixin):
"""

View file

@ -50,15 +50,18 @@ class TokenTestCase(TestCase):
self.user = create_fake_user()
self.client = create_fake_client(response_type='code')
def _password_grant_post_data(self):
return {
def _password_grant_post_data(self, scope=None):
result = {
'username': 'johndoe',
'password': '1234',
'grant_type': 'password',
'scope': 'openid email',
}
if scope is not None:
result['scope'] = ' '.join(scope)
return result
def _auth_code_post_data(self, code):
def _auth_code_post_data(self, code, scope=None):
"""
All the data that will be POSTed to the Token Endpoint.
"""
@ -70,10 +73,12 @@ class TokenTestCase(TestCase):
'code': code,
'state': uuid.uuid4().hex,
}
if scope is not None:
post_data['scope'] = ' '.join(scope)
return post_data
def _refresh_token_post_data(self, refresh_token):
def _refresh_token_post_data(self, refresh_token, scope=None):
"""
All the data that will be POSTed to the Token Endpoint.
"""
@ -83,6 +88,8 @@ class TokenTestCase(TestCase):
'grant_type': 'refresh_token',
'refresh_token': refresh_token,
}
if scope is not None:
post_data['scope'] = ' '.join(scope)
return post_data
@ -103,14 +110,14 @@ class TokenTestCase(TestCase):
return response
def _create_code(self):
def _create_code(self, scope=None):
"""
Generate a valid grant code.
"""
code = create_code(
user=self.user,
client=self.client,
scope=['openid', 'email'],
scope=(scope if scope else ['openid', 'email']),
nonce=FAKE_NONCE,
is_authentication=True)
code.save()
@ -228,30 +235,41 @@ class TokenTestCase(TestCase):
self.assertEqual(400, response.status_code)
self.assertEqual('invalid_client', response_dict['error'])
@patch('oidc_provider.lib.utils.token.uuid')
def test_password_grant_full_response(self):
self.check_password_grant(scope=['openid', 'email'])
def test_password_grant_scope(self):
self.check_password_grant(scope=['openid', 'profile'])
@override_settings(OIDC_TOKEN_EXPIRE=120,
OIDC_GRANT_TYPE_PASSWORD_ENABLE=True)
def test_password_grant_full_response(self, mock_uuid):
test_hex = 'fake_token'
mock_uuid4 = Mock(spec=uuid.uuid4)
mock_uuid4.hex = test_hex
mock_uuid.uuid4.return_value = mock_uuid4
def check_password_grant(self, scope):
response = self._post_request(
post_data=self._password_grant_post_data(),
post_data=self._password_grant_post_data(scope),
extras=self._password_grant_auth_header()
)
response_dict = json.loads(response.content.decode('utf-8'))
id_token = JWS().verify_compact(response_dict['id_token'].encode('utf-8'), self._get_keys())
self.assertEqual(response_dict['access_token'], 'fake_token')
self.assertEqual(response_dict['refresh_token'], 'fake_token')
token = Token.objects.get(user=self.user)
self.assertEqual(response_dict['access_token'], token.access_token)
self.assertEqual(response_dict['refresh_token'], token.refresh_token)
self.assertEqual(response_dict['expires_in'], 120)
self.assertEqual(response_dict['token_type'], 'bearer')
self.assertEqual(id_token['sub'], str(self.user.id))
self.assertEqual(id_token['aud'], self.client.client_id)
# Check the scope is honored by checking the claims in the userinfo
userinfo_response = self._get_userinfo(response_dict['access_token'])
userinfo = json.loads(userinfo_response.content.decode('utf-8'))
for (scope_param, claim) in [('email', 'email'), ('profile', 'name')]:
if scope_param in scope:
self.assertIn(claim, userinfo)
else:
self.assertNotIn(claim, userinfo)
@override_settings(OIDC_TOKEN_EXPIRE=720)
def test_authorization_code(self):
"""
@ -277,16 +295,64 @@ class TokenTestCase(TestCase):
self.assertEqual(id_token['sub'], str(self.user.id))
self.assertEqual(id_token['aud'], self.client.client_id)
@override_settings(OIDC_TOKEN_EXPIRE=720)
def test_scope_is_ignored_for_auth_code(self):
"""
Scope is ignored for token respones to auth code grant type.
"""
SIGKEYS = self._get_keys()
for code_scope in [['openid'], ['openid', 'email']]:
code = self._create_code(code_scope)
post_data = self._auth_code_post_data(
code=code.code, scope=['openid', 'profile'])
response = self._post_request(post_data)
response_dic = json.loads(response.content.decode('utf-8'))
self.assertEqual(response.status_code, 200)
id_token = JWS().verify_compact(response_dic['id_token'].encode('utf-8'), SIGKEYS)
if 'email' in code_scope:
self.assertIn('email', id_token)
else:
self.assertNotIn('email', id_token)
def test_refresh_token(self):
"""
A request to the Token Endpoint can also use a Refresh Token
by using the grant_type value refresh_token, as described in
Section 6 of OAuth 2.0 [RFC6749].
"""
self.do_refresh_token_check()
def test_refresh_token_invalid_scope(self):
"""
Extending scope in refresh token is not allowed.
Try to get a refresh token with "profile" in the scope even
though the original authorized scope in the authorization code
request is only ['openid', 'email'].
"""
self.do_refresh_token_check(scope=['openid', 'profile'])
def test_refresh_token_narrowed_scope(self):
"""
Narrowing scope in refresh token is allowed.
Try to get a refresh token with just "openid" in the scope even
though the original authorized scope in the authorization code
request is ['openid', 'email'].
"""
self.do_refresh_token_check(scope=['openid'])
def do_refresh_token_check(self, scope=None):
SIGKEYS = self._get_keys()
# Retrieve refresh token
code = self._create_code()
self.assertEqual(code.scope, ['openid', 'email'])
post_data = self._auth_code_post_data(code=code.code)
start_time = time.time()
with patch('oidc_provider.lib.utils.token.time.time') as time_func:
@ -297,14 +363,29 @@ class TokenTestCase(TestCase):
id_token1 = JWS().verify_compact(response_dic1['id_token'].encode('utf-8'), SIGKEYS)
# Use refresh token to obtain new token
post_data = self._refresh_token_post_data(response_dic1['refresh_token'])
post_data = self._refresh_token_post_data(
response_dic1['refresh_token'], scope)
with patch('oidc_provider.lib.utils.token.time.time') as time_func:
time_func.return_value = start_time + 600
response = self._post_request(post_data)
response_dic2 = json.loads(response.content.decode('utf-8'))
if scope and set(scope) - set(code.scope): # too broad scope
self.assertEqual(response.status_code, 400) # Bad Request
self.assertIn('error', response_dic2)
self.assertEqual(response_dic2['error'], 'invalid_scope')
return # No more checks
id_token2 = JWS().verify_compact(response_dic2['id_token'].encode('utf-8'), SIGKEYS)
if scope and 'email' not in scope: # narrowed scope The auth
# The auth code request had email in scope, so it should be
# in the first id token
self.assertIn('email', id_token1)
# but the refresh request had no email in scope
self.assertNotIn('email', id_token2, 'email was not requested')
self.assertNotEqual(response_dic1['id_token'], response_dic2['id_token'])
self.assertNotEqual(response_dic1['access_token'], response_dic2['access_token'])
self.assertNotEqual(response_dic1['refresh_token'], response_dic2['refresh_token'])
@ -339,12 +420,12 @@ class TokenTestCase(TestCase):
response = self._post_request(post_data)
self.assertIn('invalid_grant', response.content.decode('utf-8'))
def test_client_redirect_url(self):
def test_client_redirect_uri(self):
"""
Validate that client redirect URIs with query strings match registered
URIs, and that unregistered URIs are rejected.
source: https://github.com/jerrykan/django-oidc-provider/blob/2f54e537666c689dd8448f8bbc6a3a0244b01a97/oidc_provider/tests/test_token_endpoint.py
Validate that client redirect URIs exactly match registered
URIs, and that unregistered URIs or URIs with query parameters are rejected.
See http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest and
http://openid.net/specs/openid-connect-core-1_0.html#HybridTokenRequest.
"""
SIGKEYS = self._get_keys()
code = self._create_code()
@ -354,15 +435,19 @@ class TokenTestCase(TestCase):
post_data['redirect_uri'] = 'http://invalid.example.org'
response = self._post_request(post_data)
self.assertIn('invalid_client', response.content.decode('utf-8'))
self.assertIn('invalid_client', response.content.decode('utf-8')),
# Registered URI contained a query string
post_data['redirect_uri'] = 'http://example.com/?client=OidcClient'
# Registered URI, but with query string appended
post_data['redirect_uri'] = self.client.default_redirect_uri + '?foo=bar'
response = self._post_request(post_data)
self.assertIn('invalid_client', response.content.decode('utf-8'))
self.assertNotIn('invalid_client', response.content.decode('utf-8')),
# Registered URI
post_data['redirect_uri'] = self.client.default_redirect_uri
response = self._post_request(post_data)
self.assertNotIn('invalid_client', response.content.decode('utf-8'))
def test_request_methods(self):
"""
@ -440,29 +525,6 @@ class TokenTestCase(TestCase):
False,
msg='Client authentication fails using HTTP Basic Auth.')
def test_client_redirect_url(self):
"""
Validate that client redirect URIs with query strings match registered
URIs, and that unregistered URIs are rejected.
"""
SIGKEYS = self._get_keys()
code = self._create_code()
post_data = self._auth_code_post_data(code=code.code)
# Unregistered URI
post_data['redirect_uri'] = 'http://invalid.example.org'
response = self._post_request(post_data)
self.assertIn('invalid_client', response.content.decode('utf-8')),
# Registered URI contained a query string
post_data['redirect_uri'] = 'http://example.com/?client=OidcClient'
response = self._post_request(post_data)
self.assertNotIn('invalid_client', response.content.decode('utf-8')),
def test_access_token_contains_nonce(self):
"""
If present in the Authentication Request, Authorization Servers MUST

View file

@ -1,4 +1,5 @@
import logging
try:
from urllib import urlencode
from urlparse import urlsplit, parse_qs, urlunsplit
@ -10,6 +11,7 @@ from django.contrib.auth.views import (
redirect_to_login,
logout,
)
from django.contrib.auth import logout as django_user_logout
from django.core.urlresolvers import reverse
from django.http import JsonResponse
from django.shortcuts import render
@ -44,14 +46,12 @@ from oidc_provider.models import (
from oidc_provider import settings
from oidc_provider import signals
logger = logging.getLogger(__name__)
OIDC_TEMPLATES = settings.get('OIDC_TEMPLATES')
class AuthorizeView(View):
def get(self, request, *args, **kwargs):
authorize = AuthorizeEndpoint(request)
@ -66,26 +66,44 @@ class AuthorizeView(View):
client=authorize.client)
if hook_resp:
return hook_resp
if 'login' in authorize.params['prompt']:
if 'none' in authorize.params['prompt']:
raise AuthorizeError(authorize.params['redirect_uri'], 'login_required', authorize.grant_type)
else:
django_user_logout(request)
return redirect_to_login(request.get_full_path(), settings.get('OIDC_LOGIN_URL'))
if not authorize.client.require_consent and not (authorize.client.client_type == 'public') \
and not (authorize.params['prompt'] == 'consent'):
if 'select_account' in authorize.params['prompt']:
# TODO: see how we can support multiple accounts for the end-user.
if 'none' in authorize.params['prompt']:
raise AuthorizeError(authorize.params['redirect_uri'], 'account_selection_required', authorize.grant_type)
else:
django_user_logout(request)
return redirect_to_login(request.get_full_path(), settings.get('OIDC_LOGIN_URL'))
if {'none', 'consent'}.issubset(authorize.params['prompt']):
raise AuthorizeError(authorize.params['redirect_uri'], 'consent_required', authorize.grant_type)
implicit_flow_resp_types = set(['id_token', 'id_token token'])
allow_skipping_consent = (
authorize.client.client_type != 'public' or
authorize.client.response_type in implicit_flow_resp_types)
if not authorize.client.require_consent and (
allow_skipping_consent and
'consent' not in authorize.params['prompt']):
return redirect(authorize.create_response_uri())
if authorize.client.reuse_consent:
# Check if user previously give consent.
if authorize.client_has_user_consent() and not (authorize.client.client_type == 'public') \
and not (authorize.params['prompt'] == 'consent'):
if authorize.client_has_user_consent() and (
allow_skipping_consent and
'consent' not in authorize.params['prompt']):
return redirect(authorize.create_response_uri())
if authorize.params['prompt'] == 'none':
raise AuthorizeError(authorize.params['redirect_uri'], 'interaction_required', authorize.grant_type)
if authorize.params['prompt'] == 'login':
return redirect_to_login(request.get_full_path(), settings.get('OIDC_LOGIN_URL'))
if authorize.params['prompt'] == 'select_account':
# TODO: see how we can support multiple accounts for the end-user.
raise AuthorizeError(authorize.params['redirect_uri'], 'account_selection_required', authorize.grant_type)
if 'none' in authorize.params['prompt']:
raise AuthorizeError(authorize.params['redirect_uri'], 'consent_required', authorize.grant_type)
# Generate hidden inputs for the form.
context = {
@ -107,7 +125,7 @@ class AuthorizeView(View):
return render(request, OIDC_TEMPLATES['authorize'], context)
else:
if authorize.params['prompt'] == 'none':
if 'none' in authorize.params['prompt']:
raise AuthorizeError(authorize.params['redirect_uri'], 'login_required', authorize.grant_type)
return redirect_to_login(request.get_full_path(), settings.get('OIDC_LOGIN_URL'))
@ -120,7 +138,7 @@ class AuthorizeView(View):
return render(request, OIDC_TEMPLATES['error'], context)
except (AuthorizeError) as error:
except AuthorizeError as error:
uri = error.create_uri(
authorize.params['redirect_uri'],
authorize.params['state'])
@ -158,7 +176,6 @@ class AuthorizeView(View):
class TokenView(View):
def post(self, request, *args, **kwargs):
token = TokenEndpoint(request)
@ -206,7 +223,6 @@ def userinfo(request, *args, **kwargs):
class ProviderInfoView(View):
def get(self, request, *args, **kwargs):
dic = dict()
@ -241,7 +257,6 @@ class ProviderInfoView(View):
class JwksView(View):
def get(self, request, *args, **kwargs):
dic = dict(keys=[])
@ -263,7 +278,6 @@ class JwksView(View):
class EndSessionView(View):
def get(self, request, *args, **kwargs):
id_token_hint = request.GET.get('id_token_hint', '')
post_logout_redirect_uri = request.GET.get('post_logout_redirect_uri', '')
@ -302,7 +316,6 @@ class EndSessionView(View):
class CheckSessionIframeView(View):
@method_decorator(xframe_options_exempt)
def dispatch(self, request, *args, **kwargs):
return super(CheckSessionIframeView, self).dispatch(request, *args, **kwargs)