django-oidc-provider/oidc_provider/lib/endpoints/authorize.py

243 lines
9.5 KiB
Python
Raw Normal View History

from datetime import timedelta
2015-06-08 19:36:49 +00:00
import logging
try:
from urllib import urlencode
from urlparse import urlsplit, parse_qs, urlunsplit
except ImportError:
from urllib.parse import urlsplit, parse_qs, urlunsplit, urlencode
2015-07-14 15:44:25 +00:00
from django.utils import timezone
from oidc_provider.lib.claims import StandardScopeClaims
from oidc_provider.lib.errors import (
AuthorizeError,
ClientIdError,
RedirectUriError,
)
from oidc_provider.lib.utils.token import (
create_code,
create_id_token,
create_token,
encode_id_token,
)
from oidc_provider.models import (
Client,
UserConsent,
)
2015-07-21 13:59:23 +00:00
from oidc_provider import settings
2015-01-08 20:55:24 +00:00
2015-06-19 20:46:00 +00:00
2015-06-08 19:36:49 +00:00
logger = logging.getLogger(__name__)
2015-01-08 20:55:24 +00:00
class AuthorizeEndpoint(object):
def __init__(self, request):
self.request = request
self.params = {}
2015-01-08 20:55:24 +00:00
self._extract_params()
# Determine which flow to use.
if self.params['response_type'] in ['code']:
2015-01-08 20:55:24 +00:00
self.grant_type = 'authorization_code'
elif self.params['response_type'] in ['id_token', 'id_token token', 'token']:
2015-01-08 20:55:24 +00:00
self.grant_type = 'implicit'
elif self.params['response_type'] in ['code token', 'code id_token', 'code id_token token']:
self.grant_type = 'hybrid'
2015-01-08 20:55:24 +00:00
else:
self.grant_type = None
2016-02-16 20:33:12 +00:00
# Determine if it's an OpenID Authentication request (or OAuth2).
self.is_authentication = 'openid' in self.params['scope']
2016-02-16 20:33:12 +00:00
2015-01-08 20:55:24 +00:00
def _extract_params(self):
"""
2015-01-08 20:55:24 +00:00
Get all the params used by the Authorization Code Flow
(and also for the Implicit and Hybrid).
2015-01-08 20:55:24 +00:00
See: http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest
"""
2015-07-28 18:55:30 +00:00
# Because in this endpoint we handle both GET
# and POST request.
query_dict = (self.request.POST if self.request.method == 'POST'
else self.request.GET)
self.params['client_id'] = query_dict.get('client_id', '')
self.params['redirect_uri'] = query_dict.get('redirect_uri', '')
self.params['response_type'] = query_dict.get('response_type', '')
self.params['scope'] = query_dict.get('scope', '').split()
self.params['state'] = query_dict.get('state', '')
self.params['nonce'] = query_dict.get('nonce', '')
self.params['prompt'] = query_dict.get('prompt', '')
self.params['code_challenge'] = query_dict.get('code_challenge', '')
self.params['code_challenge_method'] = query_dict.get('code_challenge_method', '')
2015-01-08 20:55:24 +00:00
def validate_params(self):
2016-04-13 20:19:37 +00:00
# Client validation.
2016-02-16 20:33:12 +00:00
try:
self.client = Client.objects.get(client_id=self.params['client_id'])
2016-02-16 20:33:12 +00:00
except Client.DoesNotExist:
logger.debug('[Authorize] Invalid client identifier: %s', self.params['client_id'])
2016-02-16 20:33:12 +00:00
raise ClientIdError()
2016-04-13 20:19:37 +00:00
# Redirect URI validation.
if self.is_authentication and not self.params['redirect_uri']:
2016-03-17 18:31:41 +00:00
logger.debug('[Authorize] Missing redirect uri.')
2015-01-08 20:55:24 +00:00
raise RedirectUriError()
clean_redirect_uri = urlsplit(self.params['redirect_uri'])
2016-04-13 20:19:37 +00:00
clean_redirect_uri = urlunsplit(clean_redirect_uri._replace(query=''))
if not (clean_redirect_uri in self.client.redirect_uris):
logger.debug('[Authorize] Invalid redirect uri: %s', self.params['redirect_uri'])
2016-04-13 20:19:37 +00:00
raise RedirectUriError()
2015-01-08 20:55:24 +00:00
2016-04-13 20:19:37 +00:00
# Grant type validation.
2016-02-16 20:33:12 +00:00
if not self.grant_type:
logger.debug('[Authorize] Invalid response type: %s', self.params['response_type'])
raise AuthorizeError(self.params['redirect_uri'], 'unsupported_response_type', self.grant_type)
2016-04-13 20:19:37 +00:00
# Nonce parameter validation.
if self.is_authentication and self.grant_type == 'implicit' and not self.params['nonce']:
raise AuthorizeError(self.params['redirect_uri'], 'invalid_request', self.grant_type)
2015-01-08 20:55:24 +00:00
2016-04-13 20:19:37 +00:00
# Response type parameter validation.
if self.is_authentication and self.params['response_type'] != self.client.response_type:
raise AuthorizeError(self.params['redirect_uri'], 'invalid_request', self.grant_type)
2016-04-06 21:03:30 +00:00
# PKCE validation of the transformation method.
if self.params['code_challenge']:
if not (self.params['code_challenge_method'] in ['plain', 'S256']):
raise AuthorizeError(self.params['redirect_uri'], 'invalid_request', self.grant_type)
2015-06-15 19:04:44 +00:00
def create_response_uri(self):
uri = urlsplit(self.params['redirect_uri'])
query_params = parse_qs(uri.query)
query_fragment = parse_qs(uri.fragment)
2015-01-08 20:55:24 +00:00
try:
if self.grant_type in ['authorization_code', 'hybrid']:
code = create_code(
user=self.request.user,
client=self.client,
scope=self.params['scope'],
nonce=self.params['nonce'],
2016-04-06 21:03:30 +00:00
is_authentication=self.is_authentication,
code_challenge=self.params['code_challenge'],
code_challenge_method=self.params['code_challenge_method'])
2015-01-08 20:55:24 +00:00
code.save()
if self.grant_type == 'authorization_code':
query_params['code'] = code.code
query_params['state'] = self.params['state'] if self.params['state'] else ''
elif self.grant_type in ['implicit', 'hybrid']:
token = create_token(
user=self.request.user,
client=self.client,
scope=self.params['scope'])
# Check if response_type must include access_token in the response.
if self.params['response_type'] in ['id_token token', 'token', 'code token', 'code id_token token']:
query_fragment['access_token'] = token.access_token
2016-02-16 20:33:12 +00:00
# We don't need id_token if it's an OAuth2 request.
if self.is_authentication:
kwargs = {
'user': self.request.user,
'aud': self.client.client_id,
'nonce': self.params['nonce'],
'request': self.request,
'scope': self.params['scope'],
}
# Include at_hash when access_token is being returned.
if 'access_token' in query_fragment:
kwargs['at_hash'] = token.at_hash
id_token_dic = create_id_token(**kwargs)
# Check if response_type must include id_token in the response.
if self.params['response_type'] in ['id_token', 'id_token token', 'code id_token', 'code id_token token']:
query_fragment['id_token'] = encode_id_token(id_token_dic, self.client)
2016-02-16 20:33:12 +00:00
else:
id_token_dic = {}
2015-01-08 20:55:24 +00:00
# Store the token.
token.id_token = id_token_dic
2015-01-08 20:55:24 +00:00
token.save()
# Code parameter must be present if it's Hybrid Flow.
if self.grant_type == 'hybrid':
query_fragment['code'] = code.code
query_fragment['token_type'] = 'bearer'
query_fragment['expires_in'] = settings.get('OIDC_TOKEN_EXPIRE')
query_fragment['state'] = self.params['state'] if self.params['state'] else ''
2015-06-19 20:46:00 +00:00
except Exception as error:
2016-03-17 18:31:41 +00:00
logger.debug('[Authorize] Error when trying to create response uri: %s', error)
raise AuthorizeError(self.params['redirect_uri'], 'server_error', self.grant_type)
2015-01-08 20:55:24 +00:00
uri = uri._replace(query=urlencode(query_params, doseq=True))
uri = uri._replace(fragment=urlencode(query_fragment, doseq=True))
2015-01-08 20:55:24 +00:00
return urlunsplit(uri)
def set_client_user_consent(self):
"""
Save the user consent given to a specific client.
Return None.
"""
date_given = timezone.now()
expires_at = date_given + timedelta(
2015-06-24 15:40:00 +00:00
days=settings.get('OIDC_SKIP_CONSENT_EXPIRE'))
uc, created = UserConsent.objects.get_or_create(
user=self.request.user,
client=self.client,
defaults={
'expires_at': expires_at,
'date_given': date_given,
}
)
uc.scope = self.params['scope']
# Rewrite expires_at and date_given if object already exists.
if not created:
uc.expires_at = expires_at
uc.date_given = date_given
uc.save()
def client_has_user_consent(self):
"""
Check if already exists user consent for some client.
Return bool.
"""
value = False
try:
uc = UserConsent.objects.get(user=self.request.user, client=self.client)
if (set(self.params['scope']).issubset(uc.scope)) and not (uc.has_expired()):
value = True
except UserConsent.DoesNotExist:
pass
return value
def get_scopes_information(self):
"""
Return a list with the description of all the scopes requested.
"""
scopes = StandardScopeClaims.get_scopes_info(self.params['scope'])
2016-07-07 15:50:27 +00:00
if settings.get('OIDC_EXTRA_SCOPE_CLAIMS'):
scopes_extra = settings.get('OIDC_EXTRA_SCOPE_CLAIMS', import_str=True).get_scopes_info(self.params['scope'])
2016-07-07 15:50:27 +00:00
for index_extra, scope_extra in enumerate(scopes_extra):
for index, scope in enumerate(scopes[:]):
if scope_extra['scope'] == scope['scope']:
del scopes[index]
else:
scopes_extra = []
return scopes + scopes_extra