django-oidc-provider/openid_provider/lib/grants/authorization_code.py

260 lines
7.3 KiB
Python
Raw Normal View History

2014-12-19 15:27:43 +00:00
from datetime import timedelta
from django.http import HttpResponse, HttpResponseRedirect, JsonResponse
from django.utils import timezone
import urllib
import uuid
import json
import jwt
import random
import re
import time
from openid_provider.models import *
from openid_provider.lib.errors import *
from openid_provider.lib.scopes import *
class AuthorizeEndpoint(object):
def __init__(self, request):
self.request = request
self.extract_params()
def extract_params(self):
query_dict = self.request.POST if self.request.method == 'POST' else self.request.GET
class Params(object): pass
Params.client_id = query_dict.get('client_id', '')
Params.redirect_uri = query_dict.get('redirect_uri', '')
Params.response_type = query_dict.get('response_type', '')
Params.scope = query_dict.get('scope', '')
Params.state = query_dict.get('state', '')
self.params = Params
def validate_params(self):
if not self.params.redirect_uri:
raise RedirectUriError()
if not ('openid' in self.params.scope.split()):
raise AuthorizeError(self.params.redirect_uri, 'invalid_scope')
try:
self.client = Client.objects.get(client_id=self.params.client_id)
if not (self.params.redirect_uri in self.client.redirect_uris):
raise RedirectUriError()
if not (self.params.response_type == 'code'):
raise AuthorizeError(self.params.redirect_uri, 'unsupported_response_type')
except Client.DoesNotExist:
raise ClientIdError()
def create_response_uri(self, allow):
if not allow:
raise AuthorizeError(self.params.redirect_uri, 'access_denied')
try:
self.validate_params()
code = Code()
code.user = self.request.user
code.client = self.client
code.code = uuid.uuid4().hex
code.expires_at = timezone.now() + timedelta(seconds=60*10)
code.scope = self.params.scope
code.save()
except:
raise AuthorizeError(self.params.redirect_uri, 'server_error')
uri = self.params.redirect_uri + '?code={0}'.format(code.code)
# Add state if present.
uri = uri + ('&state={0}'.format(self.params.state) if self.params.state else '')
return uri
class TokenEndpoint(object):
def __init__(self, request):
self.request = request
self.extract_params()
def extract_params(self):
query_dict = self.request.POST
class Params(object): pass
Params.client_id = query_dict.get('client_id', '')
Params.client_secret = query_dict.get('client_secret', '')
Params.redirect_uri = urllib.unquote(query_dict.get('redirect_uri', ''))
Params.grant_type = query_dict.get('grant_type', '')
Params.code = query_dict.get('code', '')
Params.state = query_dict.get('state', '')
self.params = Params
def validate_params(self):
if not (self.params.grant_type == 'authorization_code'):
raise TokenError('unsupported_grant_type')
try:
self.client = Client.objects.get(client_id=self.params.client_id)
if not (self.client.client_secret == self.params.client_secret):
raise TokenError('invalid_client')
if not (self.params.redirect_uri in self.client.redirect_uris):
raise TokenError('invalid_client')
self.code = Code.objects.get(code=self.params.code)
if not (self.code.client == self.client) and not self.code.has_expired():
raise TokenError('invalid_grant')
except Client.DoesNotExist:
raise TokenError('invalid_client')
except Code.DoesNotExist:
raise TokenError('invalid_grant')
def create_response_dic(self):
expires_in = 60*60 # TODO: Probably add into settings
token = Token()
token.user = self.code.user
token.client = self.code.client
token.access_token = uuid.uuid4().hex
id_token_dic = self.generate_id_token_dic()
token.id_token = id_token_dic
token.refresh_token = uuid.uuid4().hex
token.expires_at = timezone.now() + timedelta(seconds=expires_in)
token.scope = self.code.scope
token.save()
self.code.delete()
id_token = jwt.encode(id_token_dic, self.client.client_secret)
dic = {
'access_token': token.access_token,
'token_type': 'bearer',
'expires_in': expires_in,
'id_token': id_token,
# TODO: 'refresh_token': token.refresh_token,
}
return dic
def generate_id_token_dic(self):
expires_in = 60*10
now = timezone.now()
# Convert datetimes into timestamps.
iat_time = time.mktime(now.timetuple())
exp_time = time.mktime((now + timedelta(seconds=expires_in)).timetuple())
user_auth_time = time.mktime(self.code.user.last_login.timetuple())
dic = {
'iss': 'https://localhost:8000', # TODO: this should not be hardcoded.
'sub': self.code.user.id,
'aud': self.client.client_id,
'exp': exp_time,
'iat': iat_time,
'auth_time': user_auth_time,
}
return dic
@classmethod
def response(self, dic, status=200):
response = JsonResponse(dic, status=status)
response['Cache-Control'] = 'no-store'
response['Pragma'] = 'no-cache'
return response
class UserInfoEndpoint(object):
def __init__(self, request):
self.request = request
self.extract_params()
def extract_params(self):
# TODO: Add other ways of passing access token
# http://tools.ietf.org/html/rfc6750#section-2
class Params(object): pass
Params.access_token = self._get_access_token()
self.params = Params
def validate_params(self):
try:
self.token = Token.objects.get(access_token=self.params.access_token)
except Token.DoesNotExist:
raise UserInfoError('invalid_token')
def _get_access_token(self):
# Using Authorization Request Header Field
# http://tools.ietf.org/html/rfc6750#section-2.1
auth_header = self.request.META.get('HTTP_AUTHORIZATION', '')
if re.compile('^Bearer\s{1}.+$').match(auth_header):
access_token = auth_header.split()[1]
else:
access_token = ''
return access_token
def create_response_dic(self):
dic = {
'sub': self.token.id_token.get('sub'),
}
standard_claims = StandardClaims(self.token.user, self.token.scope.split())
dic.update(standard_claims.response_dic)
return dic
@classmethod
def response(self, dic):
response = JsonResponse(dic, status=200)
response['Cache-Control'] = 'no-store'
response['Pragma'] = 'no-cache'
return response
@classmethod
def error_response(self, code, description, status):
response = HttpResponse(status=status)
response['WWW-Authenticate'] = 'error="{0}", error_description="{1}"'.format(code, description)
return response