django-oidc-provider/oidc_provider/lib/endpoints/token.py
2018-04-20 12:00:38 -03:00

278 lines
9.6 KiB
Python

from base64 import urlsafe_b64encode
import hashlib
import logging
from django.contrib.auth import authenticate
from django.http import JsonResponse
from oidc_provider.lib.errors import (
TokenError,
UserAuthError,
)
from oidc_provider.lib.utils.oauth2 import extract_client_auth
from oidc_provider.lib.utils.token import (
create_id_token,
create_token,
encode_id_token,
)
from oidc_provider.models import (
Client,
Code,
Token,
)
from oidc_provider import settings
logger = logging.getLogger(__name__)
class TokenEndpoint(object):
def __init__(self, request):
self.request = request
self.params = {}
self.user = None
self._extract_params()
def _extract_params(self):
client_id, client_secret = extract_client_auth(self.request)
self.params['client_id'] = client_id
self.params['client_secret'] = client_secret
self.params['redirect_uri'] = self.request.POST.get('redirect_uri', '')
self.params['grant_type'] = self.request.POST.get('grant_type', '')
self.params['code'] = self.request.POST.get('code', '')
self.params['state'] = self.request.POST.get('state', '')
self.params['scope'] = self.request.POST.get('scope', '')
self.params['refresh_token'] = self.request.POST.get('refresh_token', '')
# PKCE parameter.
self.params['code_verifier'] = self.request.POST.get('code_verifier')
self.params['username'] = self.request.POST.get('username', '')
self.params['password'] = self.request.POST.get('password', '')
def validate_params(self):
try:
self.client = Client.objects.get(client_id=self.params['client_id'])
except Client.DoesNotExist:
logger.debug('[Token] Client does not exist: %s', self.params['client_id'])
raise TokenError('invalid_client')
if self.client.client_type == 'confidential':
if not (self.client.client_secret == self.params['client_secret']):
logger.debug('[Token] Invalid client secret: client %s do not have secret %s',
self.client.client_id, self.client.client_secret)
raise TokenError('invalid_client')
if self.params['grant_type'] == 'authorization_code':
if not (self.params['redirect_uri'] in self.client.redirect_uris):
logger.debug('[Token] Invalid redirect uri: %s', self.params['redirect_uri'])
raise TokenError('invalid_client')
try:
self.code = Code.objects.get(code=self.params['code'])
except Code.DoesNotExist:
logger.debug('[Token] Code does not exist: %s', self.params['code'])
raise TokenError('invalid_grant')
if not (self.code.client == self.client) \
or self.code.has_expired():
logger.debug('[Token] Invalid code: invalid client or code has expired')
raise TokenError('invalid_grant')
# Validate PKCE parameters.
if self.params['code_verifier']:
if self.code.code_challenge_method == 'S256':
new_code_challenge = urlsafe_b64encode(
hashlib.sha256(self.params['code_verifier'].encode('ascii')).digest()
).decode('utf-8').replace('=', '')
else:
new_code_challenge = self.params['code_verifier']
# TODO: We should explain the error.
if not (new_code_challenge == self.code.code_challenge):
raise TokenError('invalid_grant')
elif self.params['grant_type'] == 'password':
if not settings.get('OIDC_GRANT_TYPE_PASSWORD_ENABLE'):
raise TokenError('unsupported_grant_type')
user = authenticate(
username=self.params['username'],
password=self.params['password']
)
if not user:
raise UserAuthError()
self.user = user
elif self.params['grant_type'] == 'refresh_token':
if not self.params['refresh_token']:
logger.debug('[Token] Missing refresh token')
raise TokenError('invalid_grant')
try:
self.token = Token.objects.get(refresh_token=self.params['refresh_token'],
client=self.client)
except Token.DoesNotExist:
logger.debug(
'[Token] Refresh token does not exist: %s', self.params['refresh_token'])
raise TokenError('invalid_grant')
elif self.params['grant_type'] == 'client_credentials':
if not self.client._scope:
logger.debug('[Token] Client using client credentials with empty scope')
raise TokenError('invalid_scope')
else:
logger.debug('[Token] Invalid grant type: %s', self.params['grant_type'])
raise TokenError('unsupported_grant_type')
def create_response_dic(self):
if self.params['grant_type'] == 'authorization_code':
return self.create_code_response_dic()
elif self.params['grant_type'] == 'refresh_token':
return self.create_refresh_response_dic()
elif self.params['grant_type'] == 'password':
return self.create_access_token_response_dic()
elif self.params['grant_type'] == 'client_credentials':
return self.create_client_credentials_response_dic()
def create_code_response_dic(self):
# See https://tools.ietf.org/html/rfc6749#section-4.1
token = create_token(
user=self.code.user,
client=self.code.client,
scope=self.code.scope)
if self.code.is_authentication:
id_token_dic = create_id_token(
user=self.code.user,
aud=self.client.client_id,
token=token,
nonce=self.code.nonce,
at_hash=token.at_hash,
request=self.request,
scope=token.scope,
)
else:
id_token_dic = {}
token.id_token = id_token_dic
# Store the token.
token.save()
# We don't need to store the code anymore.
self.code.delete()
dic = {
'access_token': token.access_token,
'refresh_token': token.refresh_token,
'token_type': 'bearer',
'expires_in': settings.get('OIDC_TOKEN_EXPIRE'),
'id_token': encode_id_token(id_token_dic, token.client),
}
return dic
def create_refresh_response_dic(self):
# See https://tools.ietf.org/html/rfc6749#section-6
scope_param = self.params['scope']
scope = (scope_param.split(' ') if scope_param else self.token.scope)
unauthorized_scopes = set(scope) - set(self.token.scope)
if unauthorized_scopes:
raise TokenError('invalid_scope')
token = create_token(
user=self.token.user,
client=self.token.client,
scope=scope)
# If the Token has an id_token it's an Authentication request.
if self.token.id_token:
id_token_dic = create_id_token(
user=self.token.user,
aud=self.client.client_id,
token=token,
nonce=None,
at_hash=token.at_hash,
request=self.request,
scope=token.scope,
)
else:
id_token_dic = {}
token.id_token = id_token_dic
# Store the token.
token.save()
# Forget the old token.
self.token.delete()
dic = {
'access_token': token.access_token,
'refresh_token': token.refresh_token,
'token_type': 'bearer',
'expires_in': settings.get('OIDC_TOKEN_EXPIRE'),
'id_token': encode_id_token(id_token_dic, self.token.client),
}
return dic
def create_access_token_response_dic(self):
# See https://tools.ietf.org/html/rfc6749#section-4.3
token = create_token(
self.user,
self.client,
self.params['scope'].split(' '))
id_token_dic = create_id_token(
token=token,
user=self.user,
aud=self.client.client_id,
nonce='self.code.nonce',
at_hash=token.at_hash,
request=self.request,
scope=token.scope,
)
token.id_token = id_token_dic
token.save()
return {
'access_token': token.access_token,
'refresh_token': token.refresh_token,
'expires_in': settings.get('OIDC_TOKEN_EXPIRE'),
'token_type': 'bearer',
'id_token': encode_id_token(id_token_dic, token.client),
}
def create_client_credentials_response_dic(self):
# See https://tools.ietf.org/html/rfc6749#section-4.4.3
token = create_token(
user=None,
client=self.client,
scope=self.client.scope)
token.save()
return {
'access_token': token.access_token,
'expires_in': settings.get('OIDC_TOKEN_EXPIRE'),
'token_type': 'bearer',
'scope': self.client._scope,
}
@classmethod
def response(cls, dic, status=200):
"""
Create and return a response object.
"""
response = JsonResponse(dic, status=status)
response['Cache-Control'] = 'no-store'
response['Pragma'] = 'no-cache'
return response