Merge branch 'brosner-fix-id-token-at-hash' into v0.3.x

This commit is contained in:
Ignacio Fiorentino 2016-08-10 18:21:01 -03:00
commit cec9dc6b11
11 changed files with 381 additions and 86 deletions

2
.gitignore vendored
View file

@ -10,3 +10,5 @@ src/
.venv
.idea
docs/_build/
.eggs/
.python-version

View file

@ -121,35 +121,41 @@ class AuthorizeEndpoint(object):
query_params['state'] = self.params.state if self.params.state else ''
elif self.grant_type == 'implicit':
# We don't need id_token if it's an OAuth2 request.
if self.is_authentication:
id_token_dic = create_id_token(
user=self.request.user,
aud=self.client.client_id,
nonce=self.params.nonce,
request=self.request)
query_fragment['id_token'] = encode_id_token(id_token_dic, self.client)
else:
id_token_dic = {}
token = create_token(
user=self.request.user,
client=self.client,
id_token_dic=id_token_dic,
scope=self.params.scope)
# Store the token.
token.save()
query_fragment['token_type'] = 'bearer'
# TODO: Create setting 'OIDC_TOKEN_EXPIRE'.
query_fragment['expires_in'] = 60 * 10
# Check if response_type is an OpenID request with value 'id_token token'
# or it's an OAuth2 Implicit Flow request.
if self.params.response_type in ['id_token token', 'token']:
query_fragment['access_token'] = token.access_token
# We don't need id_token if it's an OAuth2 request.
if self.is_authentication:
kwargs = {
"user": self.request.user,
"aud": self.client.client_id,
"nonce": self.params.nonce,
"request": self.request
}
# Include at_hash when access_token is being returned.
if 'access_token' in query_fragment:
kwargs['at_hash'] = token.at_hash
id_token_dic = create_id_token(**kwargs)
query_fragment['id_token'] = encode_id_token(id_token_dic, self.client)
token.id_token = id_token_dic
else:
id_token_dic = {}
# Store the token.
token.id_token = id_token_dic
token.save()
query_fragment['token_type'] = 'bearer'
# TODO: Create setting 'OIDC_TOKEN_EXPIRE'.
query_fragment['expires_in'] = 60 * 10
query_fragment['state'] = self.params.state if self.params.state else ''
except Exception as error:

View file

@ -131,21 +131,22 @@ class TokenEndpoint(object):
return self.create_refresh_response_dic()
def create_code_response_dic(self):
token = create_token(
user=self.code.user,
client=self.code.client,
scope=self.code.scope)
if self.code.is_authentication:
id_token_dic = create_id_token(
user=self.code.user,
aud=self.client.client_id,
nonce=self.code.nonce,
at_hash=token.at_hash,
request=self.request,
)
else:
id_token_dic = {}
token = create_token(
user=self.code.user,
client=self.code.client,
id_token_dic=id_token_dic,
scope=self.code.scope)
token.id_token = id_token_dic
# Store the token.
token.save()
@ -164,22 +165,23 @@ class TokenEndpoint(object):
return dic
def create_refresh_response_dic(self):
token = create_token(
user=self.token.user,
client=self.token.client,
scope=self.token.scope)
# If the Token has an id_token it's an Authentication request.
if self.token.id_token:
id_token_dic = create_id_token(
user=self.token.user,
aud=self.client.client_id,
nonce=None,
at_hash=token.at_hash,
request=self.request,
)
else:
id_token_dic = {}
token = create_token(
user=self.token.user,
client=self.token.client,
id_token_dic=id_token_dic,
scope=self.token.scope)
token.id_token = id_token_dic
# Store the token.
token.save()

View file

@ -13,7 +13,7 @@ from oidc_provider.models import *
from oidc_provider import settings
def create_id_token(user, aud, nonce, request=None):
def create_id_token(user, aud, nonce, at_hash=None, request=None):
"""
Receives a user object and aud (audience).
Then creates the id_token dictionary.
@ -44,6 +44,9 @@ def create_id_token(user, aud, nonce, request=None):
if nonce:
dic['nonce'] = str(nonce)
if at_hash:
dic['at_hash'] = at_hash
processing_hook = settings.get('OIDC_IDTOKEN_PROCESSING_HOOK')
if isinstance(processing_hook, (list, tuple)):
@ -73,13 +76,13 @@ def encode_id_token(payload, client):
keys = [SYMKey(key=client.client_secret, alg=alg)]
else:
raise Exception('Unsupported key algorithm.')
_jws = JWS(payload, alg=alg)
return _jws.sign_compact(keys)
def create_token(user, client, id_token_dic, scope):
def create_token(user, client, scope, id_token_dic=None):
"""
Create and populate a Token object.
@ -90,7 +93,8 @@ def create_token(user, client, id_token_dic, scope):
token.client = client
token.access_token = uuid.uuid4().hex
token.id_token = id_token_dic
if id_token_dic is not None:
token.id_token = id_token_dic
token.refresh_token = uuid.uuid4().hex
token.expires_at = timezone.now() + timedelta(
@ -112,7 +116,7 @@ def create_code(user, client, scope, nonce, is_authentication,
code.client = client
code.code = uuid.uuid4().hex
if code_challenge and code_challenge_method:
code.code_challenge = code_challenge
code.code_challenge_method = code_challenge_method

View file

@ -1,5 +1,7 @@
# -*- coding: utf-8 -*-
from hashlib import md5
import base64
import binascii
from hashlib import md5, sha256
import json
from django.db import models
@ -24,6 +26,7 @@ JWT_ALGS = [
('RS256', 'RS256'),
]
class Client(models.Model):
name = models.CharField(max_length=100, default='', verbose_name=_(u'Name'))
@ -49,8 +52,10 @@ class Client(models.Model):
def redirect_uris():
def fget(self):
return self._redirect_uris.splitlines()
def fset(self, value):
self._redirect_uris = '\n'.join(value)
return locals()
redirect_uris = property(**redirect_uris())
@ -69,8 +74,10 @@ class BaseCodeTokenModel(models.Model):
def scope():
def fget(self):
return self._scope.split()
def fset(self, value):
self._scope = ' '.join(value)
return locals()
scope = property(**scope())
@ -105,11 +112,15 @@ class Token(BaseCodeTokenModel):
access_token = models.CharField(max_length=255, unique=True, verbose_name=_(u'Access Token'))
refresh_token = models.CharField(max_length=255, unique=True, null=True, verbose_name=_(u'Refresh Token'))
_id_token = models.TextField(verbose_name=_(u'ID Token'))
def id_token():
def fget(self):
return json.loads(self._id_token)
def fset(self, value):
self._id_token = json.dumps(value)
return locals()
id_token = property(**id_token())
@ -117,6 +128,18 @@ class Token(BaseCodeTokenModel):
verbose_name = _(u'Token')
verbose_name_plural = _(u'Tokens')
@property
def at_hash(self):
# @@@ d-o-p only supports 256 bits (change this if that changes)
hashed_access_token = sha256(
self.access_token.encode('ascii')
).hexdigest().encode('ascii')
return base64.urlsafe_b64encode(
binascii.unhexlify(
hashed_access_token[:len(hashed_access_token) // 2]
)
).rstrip(b'=').decode('ascii')
class UserConsent(BaseCodeTokenModel):
@ -142,4 +165,4 @@ class RSAKey(models.Model):
@property
def kid(self):
return u'{0}'.format(md5(self.key.encode('utf-8')).hexdigest() if self.key else '')
return u'{0}'.format(md5(self.key.encode('utf-8')).hexdigest() if self.key else '')

View file

View file

@ -1,15 +1,19 @@
try:
from urllib.parse import unquote, urlencode
from urllib.parse import urlencode
except ImportError:
from urllib import unquote, urlencode
from urllib import urlencode
try:
from urllib.parse import parse_qs, urlsplit
except ImportError:
from urlparse import parse_qs, urlsplit
import uuid
from django.contrib.auth import REDIRECT_FIELD_NAME
from django.contrib.auth.models import AnonymousUser
from django.core.management import call_command
from django.core.urlresolvers import reverse
from django.test import RequestFactory
from django.test import TestCase
from jwkest.jwt import JWT
from oidc_provider import settings
from oidc_provider.models import *
@ -19,7 +23,7 @@ from oidc_provider.views import *
class AuthorizationCodeFlowTestCase(TestCase):
"""
Test cases for Authorize Endpoint using Authorization Code Flow.
Test cases for Authorize Endpoint using Code Flow.
"""
def setUp(self):
@ -28,7 +32,6 @@ class AuthorizationCodeFlowTestCase(TestCase):
self.user = create_fake_user()
self.client = create_fake_client(response_type='code')
self.client_public = create_fake_client(response_type='code', is_public=True)
self.client_implicit = create_fake_client(response_type='id_token token')
self.state = uuid.uuid4().hex
self.nonce = uuid.uuid4().hex
@ -52,7 +55,6 @@ class AuthorizationCodeFlowTestCase(TestCase):
return response
def test_missing_parameters(self):
"""
If the request fails due to a missing, invalid, or mismatching
@ -148,7 +150,7 @@ class AuthorizationCodeFlowTestCase(TestCase):
for key, value in iter(to_check.items()):
is_input_ok = input_html.format(key, value) in response.content.decode('utf-8')
self.assertEqual(is_input_ok, True,
msg='Hidden input for "'+key+'" fails.')
msg='Hidden input for "' + key + '" fails.')
def test_user_consent_response(self):
"""
@ -183,7 +185,7 @@ class AuthorizationCodeFlowTestCase(TestCase):
msg='"access_denied" code is missing in query.')
# Simulate user authorization.
data['allow'] = 'Accept' # Will be the value of the button.
data['allow'] = 'Accept' # Will be the value of the button.
response = self._auth_request('post', data, is_user_authenticated=True)
@ -270,43 +272,6 @@ class AuthorizationCodeFlowTestCase(TestCase):
self.assertEqual('Request for Permission' in response.content.decode('utf-8'), True)
def test_implicit_missing_nonce(self):
"""
The `nonce` parameter is REQUIRED if you use the Implicit Flow.
"""
data = {
'client_id': self.client_implicit.client_id,
'response_type': self.client_implicit.response_type,
'redirect_uri': self.client_implicit.default_redirect_uri,
'scope': 'openid email',
'state': self.state,
}
response = self._auth_request('get', data, is_user_authenticated=True)
self.assertEqual('#error=invalid_request' in response['Location'], True)
def test_implicit_access_token_response(self):
"""
Unlike the Authorization Code flow, in which the client makes
separate requests for authorization and for an access token, the client
receives the access token as the result of the authorization request.
"""
data = {
'client_id': self.client_implicit.client_id,
'redirect_uri': self.client_implicit.default_redirect_uri,
'response_type': self.client_implicit.response_type,
'scope': 'openid email',
'state': self.state,
'nonce': self.nonce,
'allow': 'Accept',
}
response = self._auth_request('post', data, is_user_authenticated=True)
self.assertEqual('access_token' in response['Location'], True)
def test_prompt_parameter(self):
"""
Specifies whether the Authorization Server prompts the End-User for reauthentication and consent.
@ -331,3 +296,168 @@ class AuthorizationCodeFlowTestCase(TestCase):
# An error is returned if the Client does not have pre-configured consent for the requested Claims.
self.assertEqual('interaction_required' in response['Location'], True)
class AuthorizationImplicitFlowTestCase(TestCase):
"""
Test cases for Authorization Endpoint using Implicit Flow.
"""
def setUp(self):
call_command('creatersakey')
self.factory = RequestFactory()
self.user = create_fake_user()
self.client = create_fake_client(response_type='id_token token')
self.client_public = create_fake_client(response_type='id_token token', is_public=True)
self.client_no_access = create_fake_client(response_type='id_token')
self.client_public_no_access = create_fake_client(response_type='id_token', is_public=True)
self.state = uuid.uuid4().hex
self.nonce = uuid.uuid4().hex
def _auth_request(self, method, data={}, is_user_authenticated=False):
url = reverse('oidc_provider:authorize')
if method.lower() == 'get':
query_str = urlencode(data).replace('+', '%20')
if query_str:
url += '?' + query_str
request = self.factory.get(url)
elif method.lower() == 'post':
request = self.factory.post(url, data=data)
else:
raise Exception('Method unsupported for an Authorization Request.')
# Simulate that the user is logged.
request.user = self.user if is_user_authenticated else AnonymousUser()
response = AuthorizeView.as_view()(request)
return response
def test_missing_nonce(self):
"""
The `nonce` parameter is REQUIRED if you use the Implicit Flow.
"""
data = {
'client_id': self.client.client_id,
'response_type': self.client.response_type,
'redirect_uri': self.client.default_redirect_uri,
'scope': 'openid email',
'state': self.state,
}
response = self._auth_request('get', data, is_user_authenticated=True)
self.assertEqual('#error=invalid_request' in response['Location'], True)
def test_id_token_token_response(self):
"""
Implicit client requesting `id_token token` receives both id token
and access token as the result of the authorization request.
"""
data = {
'client_id': self.client.client_id,
'redirect_uri': self.client.default_redirect_uri,
'response_type': self.client.response_type,
'scope': 'openid email',
'state': self.state,
'nonce': self.nonce,
'allow': 'Accept',
}
response = self._auth_request('post', data, is_user_authenticated=True)
self.assertIn('access_token', response['Location'])
self.assertIn('id_token', response['Location'])
# same for public client
data['client_id'] = self.client_public.client_id,
data['redirect_uri'] = self.client_public.default_redirect_uri,
data['response_type'] = self.client_public.response_type,
response = self._auth_request('post', data, is_user_authenticated=True)
self.assertIn('access_token', response['Location'])
self.assertIn('id_token', response['Location'])
def test_id_token_response(self):
"""
Implicit client requesting `id_token` receives
only an id token as the result of the authorization request.
"""
data = {
'client_id': self.client_no_access.client_id,
'redirect_uri': self.client_no_access.default_redirect_uri,
'response_type': self.client_no_access.response_type,
'scope': 'openid email',
'state': self.state,
'nonce': self.nonce,
'allow': 'Accept',
}
response = self._auth_request('post', data, is_user_authenticated=True)
self.assertNotIn('access_token', response['Location'])
self.assertIn('id_token', response['Location'])
# same for public client
data['client_id'] = self.client_public_no_access.client_id,
data['redirect_uri'] = self.client_public_no_access.default_redirect_uri,
data['response_type'] = self.client_public_no_access.response_type,
response = self._auth_request('post', data, is_user_authenticated=True)
self.assertNotIn('access_token', response['Location'])
self.assertIn('id_token', response['Location'])
def test_id_token_token_at_hash(self):
"""
Implicit client requesting `id_token token` receives
`at_hash` in `id_token`.
"""
data = {
'client_id': self.client.client_id,
'redirect_uri': self.client.default_redirect_uri,
'response_type': self.client.response_type,
'scope': 'openid email',
'state': self.state,
'nonce': self.nonce,
'allow': 'Accept',
}
response = self._auth_request('post', data, is_user_authenticated=True)
self.assertIn('id_token', response['Location'])
# obtain `id_token` portion of Location
components = urlsplit(response['Location'])
fragment = parse_qs(components[4])
id_token = JWT().unpack(fragment["id_token"][0].encode('utf-8')).payload()
self.assertIn('at_hash', id_token)
def test_id_token_at_hash(self):
"""
Implicit client requesting `id_token` should not receive
`at_hash` in `id_token`.
"""
data = {
'client_id': self.client_no_access.client_id,
'redirect_uri': self.client_no_access.default_redirect_uri,
'response_type': self.client_no_access.response_type,
'scope': 'openid email',
'state': self.state,
'nonce': self.nonce,
'allow': 'Accept',
}
response = self._auth_request('post', data, is_user_authenticated=True)
self.assertIn('id_token', response['Location'])
# obtain `id_token` portion of Location
components = urlsplit(response['Location'])
fragment = parse_qs(components[4])
id_token = JWT().unpack(fragment["id_token"][0].encode('utf-8')).payload()
self.assertNotIn('at_hash', id_token)

View file

@ -304,6 +304,21 @@ class TokenTestCase(TestCase):
self.assertEqual(id_token.get('nonce'), None)
def test_id_token_contains_at_hash(self):
"""
If access_token is included, the id_token SHOULD contain an at_hash.
"""
code = self._create_code()
post_data = self._auth_code_post_data(code=code.code)
response = self._post_request(post_data)
response_dic = json.loads(response.content.decode('utf-8'))
id_token = JWT().unpack(response_dic['id_token'].encode('utf-8')).payload()
self.assertTrue(id_token.get('at_hash'))
def test_idtoken_sign_validation(self):
"""
We MUST validate the signature of the ID Token according to JWS

View file

@ -193,8 +193,8 @@ class ProviderInfoView(View):
# See: http://openid.net/specs/openid-connect-core-1_0.html#SubjectIDTypes
dic['subject_types_supported'] = ['public']
dic['token_endpoint_auth_methods_supported'] = [ 'client_secret_post',
'client_secret_basic' ]
dic['token_endpoint_auth_methods_supported'] = ['client_secret_post',
'client_secret_basic']
return JsonResponse(dic)
@ -205,7 +205,7 @@ class JwksView(View):
dic = dict(keys=[])
for rsakey in RSAKey.objects.all():
public_key = RSA.importKey(rsakey.key).publickey()
public_key = RSA.importKey(rsakey.key).publickey()
dic['keys'].append({
'kty': 'RSA',
'alg': 'RS256',

112
runtests.py Normal file
View file

@ -0,0 +1,112 @@
#!/usr/bin/env python
import os
import sys
import django
from django.conf import settings
DEFAULT_SETTINGS = dict(
DEBUG = False,
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': ':memory:',
}
},
SITE_ID = 1,
MIDDLEWARE_CLASSES = [
'django.middleware.common.CommonMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
],
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [],
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.debug',
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
],
},
},
],
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'handlers': {
'console': {
'class': 'logging.StreamHandler',
},
},
'loggers': {
'oidc_provider': {
'handlers': ['console'],
'level': os.getenv('DJANGO_LOG_LEVEL', 'DEBUG'),
},
},
},
INSTALLED_APPS = [
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.sites',
'django.contrib.messages',
'django.contrib.admin',
'oidc_provider',
],
SECRET_KEY = 'this-should-be-top-secret',
ROOT_URLCONF = 'oidc_provider.tests.app.urls',
TEMPLATE_DIRS = [
'oidc_provider/tests/templates',
],
USE_TZ = True,
# OIDC Provider settings.
SITE_URL = 'http://localhost:8000',
OIDC_USERINFO = 'oidc_provider.tests.app.utils.userinfo',
)
def runtests(*test_args):
if not settings.configured:
settings.configure(**DEFAULT_SETTINGS)
django.setup()
parent = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, parent)
try:
from django.test.runner import DiscoverRunner
runner_class = DiscoverRunner
test_args = ["oidc_provider.tests"]
except ImportError:
from django.test.simple import DjangoTestSuiteRunner
runner_class = DjangoTestSuiteRunner
test_args = ["tests"]
failures = runner_class(verbosity=1, interactive=True, failfast=False).run_tests(test_args)
sys.exit(failures)
if __name__ == "__main__":
runtests(*sys.argv[1:])

View file

@ -34,6 +34,7 @@ setup(
'Topic :: Internet :: WWW/HTTP',
'Topic :: Internet :: WWW/HTTP :: Dynamic Content',
],
test_suite="runtests.runtests",
tests_require=[
'pyjwkest==1.1.0',
'mock==2.0.0',