django-oidc-provider/oidc_provider/lib/endpoints/token.py

212 lines
7.4 KiB
Python
Raw Normal View History

2016-05-25 21:58:58 +00:00
from base64 import b64decode, urlsafe_b64encode
2016-04-06 21:03:30 +00:00
import hashlib
2015-06-08 19:36:49 +00:00
import logging
import re
try:
from urllib.parse import unquote
except ImportError:
from urllib import unquote
2015-03-02 20:37:54 +00:00
2015-01-08 20:55:24 +00:00
from django.http import JsonResponse
2015-03-02 20:37:54 +00:00
2015-02-18 18:07:22 +00:00
from oidc_provider.lib.errors import *
from oidc_provider.lib.utils.params import *
from oidc_provider.lib.utils.token import *
from oidc_provider.models import *
from oidc_provider import settings
2015-01-08 20:55:24 +00:00
2015-06-19 20:46:00 +00:00
2015-06-08 19:36:49 +00:00
logger = logging.getLogger(__name__)
2015-01-08 20:55:24 +00:00
2015-06-19 20:46:00 +00:00
2015-01-08 20:55:24 +00:00
class TokenEndpoint(object):
def __init__(self, request):
self.request = request
self.params = Params()
2015-01-08 20:55:24 +00:00
self._extract_params()
def _extract_params(self):
client_id, client_secret = self._extract_client_auth()
2015-01-08 20:55:24 +00:00
self.params.client_id = client_id
self.params.client_secret = client_secret
2016-04-06 21:03:30 +00:00
self.params.redirect_uri = unquote(self.request.POST.get('redirect_uri', ''))
self.params.grant_type = self.request.POST.get('grant_type', '')
self.params.code = self.request.POST.get('code', '')
self.params.state = self.request.POST.get('state', '')
self.params.scope = self.request.POST.get('scope', '')
self.params.refresh_token = self.request.POST.get('refresh_token', '')
2016-04-06 21:03:30 +00:00
# PKCE parameters.
self.params.code_verifier = self.request.POST.get('code_verifier')
def _extract_client_auth(self):
"""
Get client credentials using HTTP Basic Authentication method.
Or try getting parameters via POST.
See: http://tools.ietf.org/html/rfc6750#section-2.1
Return a string.
"""
auth_header = self.request.META.get('HTTP_AUTHORIZATION', '')
if re.compile('^Basic\s{1}.+$').match(auth_header):
b64_user_pass = auth_header.split()[1]
try:
user_pass = b64decode(b64_user_pass).decode('utf-8').split(':')
client_id, client_secret = tuple(user_pass)
except:
client_id = client_secret = ''
else:
client_id = self.request.POST.get('client_id', '')
client_secret = self.request.POST.get('client_secret', '')
return (client_id, client_secret)
2015-01-08 20:55:24 +00:00
def validate_params(self):
try:
self.client = Client.objects.get(client_id=self.params.client_id)
except Client.DoesNotExist:
2016-03-17 18:31:41 +00:00
logger.debug('[Token] Client does not exist: %s', self.params.client_id)
raise TokenError('invalid_client')
if self.client.client_type == 'confidential':
if not (self.client.client_secret == self.params.client_secret):
logger.debug('[Token] Invalid client secret: client %s do not have secret %s',
self.client.client_id, self.client.client_secret)
raise TokenError('invalid_client')
2015-01-08 20:55:24 +00:00
if self.params.grant_type == 'authorization_code':
2015-01-08 20:55:24 +00:00
if not (self.params.redirect_uri in self.client.redirect_uris):
2016-03-17 18:31:41 +00:00
logger.debug('[Token] Invalid redirect uri: %s', self.params.redirect_uri)
2015-01-08 20:55:24 +00:00
raise TokenError('invalid_client')
try:
self.code = Code.objects.get(code=self.params.code)
except Code.DoesNotExist:
2016-03-17 18:31:41 +00:00
logger.debug('[Token] Code does not exist: %s', self.params.code)
raise TokenError('invalid_grant')
2015-01-08 20:55:24 +00:00
2015-04-21 17:28:59 +00:00
if not (self.code.client == self.client) \
or self.code.has_expired():
2016-03-17 18:31:41 +00:00
logger.debug('[Token] Invalid code: invalid client or code has expired',
2015-06-19 20:46:00 +00:00
self.params.redirect_uri)
2015-01-08 20:55:24 +00:00
raise TokenError('invalid_grant')
2016-04-06 21:03:30 +00:00
# Validate PKCE parameters.
if self.params.code_verifier:
if self.code.code_challenge_method == 'S256':
2016-04-08 16:22:05 +00:00
new_code_challenge = urlsafe_b64encode(
hashlib.sha256(self.params.code_verifier.encode('ascii')).digest()
).decode('utf-8').replace('=', '')
2016-04-06 21:03:30 +00:00
else:
new_code_challenge = self.params.code_verifier
# TODO: We should explain the error.
if not (new_code_challenge == self.code.code_challenge):
2016-04-06 21:03:30 +00:00
raise TokenError('invalid_grant')
elif self.params.grant_type == 'refresh_token':
if not self.params.refresh_token:
2016-03-17 18:31:41 +00:00
logger.debug('[Token] Missing refresh token')
raise TokenError('invalid_grant')
try:
self.token = Token.objects.get(refresh_token=self.params.refresh_token,
client=self.client)
2015-01-08 20:55:24 +00:00
except Token.DoesNotExist:
2016-03-17 18:31:41 +00:00
logger.debug('[Token] Refresh token does not exist: %s', self.params.refresh_token)
raise TokenError('invalid_grant')
else:
2016-03-17 18:31:41 +00:00
logger.debug('[Token] Invalid grant type: %s', self.params.grant_type)
raise TokenError('unsupported_grant_type')
2015-01-08 20:55:24 +00:00
def create_response_dic(self):
if self.params.grant_type == 'authorization_code':
return self.create_code_response_dic()
elif self.params.grant_type == 'refresh_token':
return self.create_refresh_response_dic()
def create_code_response_dic(self):
token = create_token(
user=self.code.user,
client=self.code.client,
scope=self.code.scope)
2016-02-16 20:33:12 +00:00
if self.code.is_authentication:
id_token_dic = create_id_token(
user=self.code.user,
aud=self.client.client_id,
nonce=self.code.nonce,
at_hash=token.at_hash,
2016-05-25 21:58:58 +00:00
request=self.request,
2016-02-16 20:33:12 +00:00
)
else:
id_token_dic = {}
token.id_token = id_token_dic
2015-01-08 20:55:24 +00:00
# Store the token.
token.save()
# We don't need to store the code anymore.
self.code.delete()
dic = {
'access_token': token.access_token,
'refresh_token': token.refresh_token,
'token_type': 'bearer',
'expires_in': settings.get('OIDC_TOKEN_EXPIRE'),
2016-03-22 19:17:56 +00:00
'id_token': encode_id_token(id_token_dic, token.client),
}
return dic
def create_refresh_response_dic(self):
token = create_token(
user=self.token.user,
client=self.token.client,
scope=self.token.scope)
2016-02-16 20:33:12 +00:00
# If the Token has an id_token it's an Authentication request.
if self.token.id_token:
id_token_dic = create_id_token(
user=self.token.user,
aud=self.client.client_id,
nonce=None,
at_hash=token.at_hash,
2016-05-25 21:58:58 +00:00
request=self.request,
2016-02-16 20:33:12 +00:00
)
else:
id_token_dic = {}
token.id_token = id_token_dic
# Store the token.
token.save()
# Forget the old token.
self.token.delete()
dic = {
'access_token': token.access_token,
'refresh_token': token.refresh_token,
2015-01-08 20:55:24 +00:00
'token_type': 'bearer',
2015-02-26 19:14:36 +00:00
'expires_in': settings.get('OIDC_TOKEN_EXPIRE'),
2016-03-22 19:17:56 +00:00
'id_token': encode_id_token(id_token_dic, self.token.client),
2015-01-08 20:55:24 +00:00
}
2015-06-19 20:46:00 +00:00
2015-01-08 20:55:24 +00:00
return dic
@classmethod
2015-01-29 15:54:13 +00:00
def response(cls, dic, status=200):
"""
2015-01-08 20:55:24 +00:00
Create and return a response object.
"""
2015-01-08 20:55:24 +00:00
response = JsonResponse(dic, status=status)
response['Cache-Control'] = 'no-store'
response['Pragma'] = 'no-cache'
return response