Add tests for at_hash
Ensure at_hash is present in id_token when warranted.
This commit is contained in:
parent
86fbfdba60
commit
ffddb69f80
8 changed files with 357 additions and 46 deletions
2
.gitignore
vendored
2
.gitignore
vendored
|
@ -10,3 +10,5 @@ src/
|
||||||
.venv
|
.venv
|
||||||
.idea
|
.idea
|
||||||
docs/_build/
|
docs/_build/
|
||||||
|
.eggs/
|
||||||
|
.python-version
|
||||||
|
|
|
@ -146,9 +146,9 @@ class TokenEndpoint(object):
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
id_token_dic = {}
|
id_token_dic = {}
|
||||||
|
token.id_token = id_token_dic
|
||||||
|
|
||||||
# Store the token.
|
# Store the token.
|
||||||
token.id_token = id_token_dic
|
|
||||||
token.save()
|
token.save()
|
||||||
|
|
||||||
# We don't need to store the code anymore.
|
# We don't need to store the code anymore.
|
||||||
|
@ -181,9 +181,9 @@ class TokenEndpoint(object):
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
id_token_dic = {}
|
id_token_dic = {}
|
||||||
|
token.id_token = id_token_dic
|
||||||
|
|
||||||
# Store the token.
|
# Store the token.
|
||||||
token.id_token = id_token_dic
|
|
||||||
token.save()
|
token.save()
|
||||||
|
|
||||||
# Forget the old token.
|
# Forget the old token.
|
||||||
|
|
0
oidc_provider/runtests.py
Normal file
0
oidc_provider/runtests.py
Normal file
|
@ -1,10 +1,9 @@
|
||||||
try:
|
try:
|
||||||
from urllib.parse import unquote, urlencode
|
from urllib.parse import urlencode
|
||||||
except ImportError:
|
except ImportError:
|
||||||
from urllib import unquote, urlencode
|
from urllib import urlencode
|
||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
from django.contrib.auth import REDIRECT_FIELD_NAME
|
|
||||||
from django.contrib.auth.models import AnonymousUser
|
from django.contrib.auth.models import AnonymousUser
|
||||||
from django.core.management import call_command
|
from django.core.management import call_command
|
||||||
from django.core.urlresolvers import reverse
|
from django.core.urlresolvers import reverse
|
||||||
|
@ -28,7 +27,6 @@ class AuthorizationCodeFlowTestCase(TestCase):
|
||||||
self.user = create_fake_user()
|
self.user = create_fake_user()
|
||||||
self.client = create_fake_client(response_type='code')
|
self.client = create_fake_client(response_type='code')
|
||||||
self.client_public = create_fake_client(response_type='code', is_public=True)
|
self.client_public = create_fake_client(response_type='code', is_public=True)
|
||||||
self.client_implicit = create_fake_client(response_type='id_token token')
|
|
||||||
self.state = uuid.uuid4().hex
|
self.state = uuid.uuid4().hex
|
||||||
self.nonce = uuid.uuid4().hex
|
self.nonce = uuid.uuid4().hex
|
||||||
|
|
||||||
|
@ -52,7 +50,6 @@ class AuthorizationCodeFlowTestCase(TestCase):
|
||||||
|
|
||||||
return response
|
return response
|
||||||
|
|
||||||
|
|
||||||
def test_missing_parameters(self):
|
def test_missing_parameters(self):
|
||||||
"""
|
"""
|
||||||
If the request fails due to a missing, invalid, or mismatching
|
If the request fails due to a missing, invalid, or mismatching
|
||||||
|
@ -148,7 +145,7 @@ class AuthorizationCodeFlowTestCase(TestCase):
|
||||||
for key, value in iter(to_check.items()):
|
for key, value in iter(to_check.items()):
|
||||||
is_input_ok = input_html.format(key, value) in response.content.decode('utf-8')
|
is_input_ok = input_html.format(key, value) in response.content.decode('utf-8')
|
||||||
self.assertEqual(is_input_ok, True,
|
self.assertEqual(is_input_ok, True,
|
||||||
msg='Hidden input for "'+key+'" fails.')
|
msg='Hidden input for "' + key + '" fails.')
|
||||||
|
|
||||||
def test_user_consent_response(self):
|
def test_user_consent_response(self):
|
||||||
"""
|
"""
|
||||||
|
@ -270,43 +267,6 @@ class AuthorizationCodeFlowTestCase(TestCase):
|
||||||
|
|
||||||
self.assertEqual('Request for Permission' in response.content.decode('utf-8'), True)
|
self.assertEqual('Request for Permission' in response.content.decode('utf-8'), True)
|
||||||
|
|
||||||
def test_implicit_missing_nonce(self):
|
|
||||||
"""
|
|
||||||
The `nonce` parameter is REQUIRED if you use the Implicit Flow.
|
|
||||||
"""
|
|
||||||
data = {
|
|
||||||
'client_id': self.client_implicit.client_id,
|
|
||||||
'response_type': self.client_implicit.response_type,
|
|
||||||
'redirect_uri': self.client_implicit.default_redirect_uri,
|
|
||||||
'scope': 'openid email',
|
|
||||||
'state': self.state,
|
|
||||||
}
|
|
||||||
|
|
||||||
response = self._auth_request('get', data, is_user_authenticated=True)
|
|
||||||
|
|
||||||
self.assertEqual('#error=invalid_request' in response['Location'], True)
|
|
||||||
|
|
||||||
def test_implicit_access_token_response(self):
|
|
||||||
"""
|
|
||||||
Unlike the Authorization Code flow, in which the client makes
|
|
||||||
separate requests for authorization and for an access token, the client
|
|
||||||
receives the access token as the result of the authorization request.
|
|
||||||
"""
|
|
||||||
data = {
|
|
||||||
'client_id': self.client_implicit.client_id,
|
|
||||||
'redirect_uri': self.client_implicit.default_redirect_uri,
|
|
||||||
'response_type': self.client_implicit.response_type,
|
|
||||||
'scope': 'openid email',
|
|
||||||
'state': self.state,
|
|
||||||
'nonce': self.nonce,
|
|
||||||
'allow': 'Accept',
|
|
||||||
}
|
|
||||||
|
|
||||||
response = self._auth_request('post', data, is_user_authenticated=True)
|
|
||||||
|
|
||||||
self.assertEqual('access_token' in response['Location'], True)
|
|
||||||
|
|
||||||
|
|
||||||
def test_prompt_parameter(self):
|
def test_prompt_parameter(self):
|
||||||
"""
|
"""
|
||||||
Specifies whether the Authorization Server prompts the End-User for reauthentication and consent.
|
Specifies whether the Authorization Server prompts the End-User for reauthentication and consent.
|
||||||
|
|
51
oidc_provider/tests/test_code_flow.py
Normal file
51
oidc_provider/tests/test_code_flow.py
Normal file
|
@ -0,0 +1,51 @@
|
||||||
|
try:
|
||||||
|
from urllib.parse import unquote, urlencode
|
||||||
|
except ImportError:
|
||||||
|
from urllib import unquote, urlencode
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
from django.contrib.auth import REDIRECT_FIELD_NAME
|
||||||
|
from django.contrib.auth.models import AnonymousUser
|
||||||
|
from django.core.management import call_command
|
||||||
|
from django.core.urlresolvers import reverse
|
||||||
|
from django.test import RequestFactory
|
||||||
|
from django.test import TestCase
|
||||||
|
|
||||||
|
from oidc_provider.models import *
|
||||||
|
from oidc_provider.tests.app.utils import *
|
||||||
|
from oidc_provider.views import *
|
||||||
|
|
||||||
|
|
||||||
|
class CodeFlowTestCase(TestCase):
|
||||||
|
"""
|
||||||
|
Test cases for Authorization Code Flow.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
call_command('creatersakey')
|
||||||
|
self.factory = RequestFactory()
|
||||||
|
self.user = create_fake_user()
|
||||||
|
self.client = create_fake_client(response_type='code')
|
||||||
|
self.client_public = create_fake_client(response_type='code', is_public=True)
|
||||||
|
self.state = uuid.uuid4().hex
|
||||||
|
self.nonce = uuid.uuid4().hex
|
||||||
|
|
||||||
|
def _auth_request(self, method, data={}, is_user_authenticated=False):
|
||||||
|
url = reverse('oidc_provider:authorize')
|
||||||
|
|
||||||
|
if method.lower() == 'get':
|
||||||
|
query_str = urlencode(data).replace('+', '%20')
|
||||||
|
if query_str:
|
||||||
|
url += '?' + query_str
|
||||||
|
request = self.factory.get(url)
|
||||||
|
elif method.lower() == 'post':
|
||||||
|
request = self.factory.post(url, data=data)
|
||||||
|
else:
|
||||||
|
raise Exception('Method unsupported for an Authorization Request.')
|
||||||
|
|
||||||
|
# Simulate that the user is logged.
|
||||||
|
request.user = self.user if is_user_authenticated else AnonymousUser()
|
||||||
|
|
||||||
|
response = AuthorizeView.as_view()(request)
|
||||||
|
|
||||||
|
return response
|
185
oidc_provider/tests/test_implicit_flow.py
Normal file
185
oidc_provider/tests/test_implicit_flow.py
Normal file
|
@ -0,0 +1,185 @@
|
||||||
|
try:
|
||||||
|
from urllib.parse import urlencode
|
||||||
|
except ImportError:
|
||||||
|
from urllib import urlencode
|
||||||
|
try:
|
||||||
|
from urllib.parse import urlparse, parse_qs
|
||||||
|
except ImportError:
|
||||||
|
from urlparse import urlparse, parse_qs
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
from django.contrib.auth.models import AnonymousUser
|
||||||
|
from django.core.management import call_command
|
||||||
|
from django.core.urlresolvers import reverse
|
||||||
|
from django.test import RequestFactory
|
||||||
|
from django.test import TestCase
|
||||||
|
from jwkest.jwt import JWT
|
||||||
|
|
||||||
|
from oidc_provider.models import *
|
||||||
|
from oidc_provider.tests.app.utils import *
|
||||||
|
from oidc_provider.views import *
|
||||||
|
|
||||||
|
|
||||||
|
class ImplicitFlowTestCase(TestCase):
|
||||||
|
"""
|
||||||
|
Test cases for Authorization Implicit Flow.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
call_command('creatersakey')
|
||||||
|
self.factory = RequestFactory()
|
||||||
|
self.user = create_fake_user()
|
||||||
|
self.client = create_fake_client(response_type='id_token token')
|
||||||
|
self.client_public = create_fake_client(response_type='id_token token', is_public=True)
|
||||||
|
self.client_no_access = create_fake_client(response_type='id_token')
|
||||||
|
self.client_public_no_access = create_fake_client(response_type='id_token', is_public=True)
|
||||||
|
self.state = uuid.uuid4().hex
|
||||||
|
self.nonce = uuid.uuid4().hex
|
||||||
|
|
||||||
|
def _auth_request(self, method, data={}, is_user_authenticated=False):
|
||||||
|
url = reverse('oidc_provider:authorize')
|
||||||
|
|
||||||
|
if method.lower() == 'get':
|
||||||
|
query_str = urlencode(data).replace('+', '%20')
|
||||||
|
if query_str:
|
||||||
|
url += '?' + query_str
|
||||||
|
request = self.factory.get(url)
|
||||||
|
elif method.lower() == 'post':
|
||||||
|
request = self.factory.post(url, data=data)
|
||||||
|
else:
|
||||||
|
raise Exception('Method unsupported for an Authorization Request.')
|
||||||
|
|
||||||
|
# Simulate that the user is logged.
|
||||||
|
request.user = self.user if is_user_authenticated else AnonymousUser()
|
||||||
|
|
||||||
|
response = AuthorizeView.as_view()(request)
|
||||||
|
|
||||||
|
return response
|
||||||
|
|
||||||
|
def test_missing_nonce(self):
|
||||||
|
"""
|
||||||
|
The `nonce` parameter is REQUIRED if you use the Implicit Flow.
|
||||||
|
"""
|
||||||
|
data = {
|
||||||
|
'client_id': self.client.client_id,
|
||||||
|
'response_type': self.client.response_type,
|
||||||
|
'redirect_uri': self.client.default_redirect_uri,
|
||||||
|
'scope': 'openid email',
|
||||||
|
'state': self.state,
|
||||||
|
}
|
||||||
|
|
||||||
|
response = self._auth_request('get', data, is_user_authenticated=True)
|
||||||
|
|
||||||
|
self.assertEqual('#error=invalid_request' in response['Location'], True)
|
||||||
|
|
||||||
|
def test_id_token_token_response(self):
|
||||||
|
"""
|
||||||
|
Implicit client requesting `id_token token` receives both id token
|
||||||
|
and access token as the result of the authorization request.
|
||||||
|
"""
|
||||||
|
data = {
|
||||||
|
'client_id': self.client.client_id,
|
||||||
|
'redirect_uri': self.client.default_redirect_uri,
|
||||||
|
'response_type': self.client.response_type,
|
||||||
|
'scope': 'openid email',
|
||||||
|
'state': self.state,
|
||||||
|
'nonce': self.nonce,
|
||||||
|
'allow': 'Accept',
|
||||||
|
}
|
||||||
|
|
||||||
|
response = self._auth_request('post', data, is_user_authenticated=True)
|
||||||
|
|
||||||
|
self.assertIn('access_token', response['Location'])
|
||||||
|
self.assertIn('id_token', response['Location'])
|
||||||
|
|
||||||
|
# same for public client
|
||||||
|
data['client_id'] = self.client_public.client_id,
|
||||||
|
data['redirect_uri'] = self.client_public.default_redirect_uri,
|
||||||
|
data['response_type'] = self.client_public.response_type,
|
||||||
|
|
||||||
|
response = self._auth_request('post', data, is_user_authenticated=True)
|
||||||
|
|
||||||
|
self.assertIn('access_token', response['Location'])
|
||||||
|
self.assertIn('id_token', response['Location'])
|
||||||
|
|
||||||
|
def test_id_token_response(self):
|
||||||
|
"""
|
||||||
|
Implicit client requesting `id_token` receives
|
||||||
|
only an id token as the result of the authorization request.
|
||||||
|
"""
|
||||||
|
data = {
|
||||||
|
'client_id': self.client_no_access.client_id,
|
||||||
|
'redirect_uri': self.client_no_access.default_redirect_uri,
|
||||||
|
'response_type': self.client_no_access.response_type,
|
||||||
|
'scope': 'openid email',
|
||||||
|
'state': self.state,
|
||||||
|
'nonce': self.nonce,
|
||||||
|
'allow': 'Accept',
|
||||||
|
}
|
||||||
|
|
||||||
|
response = self._auth_request('post', data, is_user_authenticated=True)
|
||||||
|
|
||||||
|
self.assertNotIn('access_token', response['Location'])
|
||||||
|
self.assertIn('id_token', response['Location'])
|
||||||
|
|
||||||
|
# same for public client
|
||||||
|
data['client_id'] = self.client_public_no_access.client_id,
|
||||||
|
data['redirect_uri'] = self.client_public_no_access.default_redirect_uri,
|
||||||
|
data['response_type'] = self.client_public_no_access.response_type,
|
||||||
|
|
||||||
|
response = self._auth_request('post', data, is_user_authenticated=True)
|
||||||
|
|
||||||
|
self.assertNotIn('access_token', response['Location'])
|
||||||
|
self.assertIn('id_token', response['Location'])
|
||||||
|
|
||||||
|
def test_id_token_token_at_hash(self):
|
||||||
|
"""
|
||||||
|
Implicit client requesting `id_token token` receives
|
||||||
|
`at_hash` in `id_token`.
|
||||||
|
"""
|
||||||
|
data = {
|
||||||
|
'client_id': self.client.client_id,
|
||||||
|
'redirect_uri': self.client.default_redirect_uri,
|
||||||
|
'response_type': self.client.response_type,
|
||||||
|
'scope': 'openid email',
|
||||||
|
'state': self.state,
|
||||||
|
'nonce': self.nonce,
|
||||||
|
'allow': 'Accept',
|
||||||
|
}
|
||||||
|
|
||||||
|
response = self._auth_request('post', data, is_user_authenticated=True)
|
||||||
|
|
||||||
|
self.assertIn('id_token', response['Location'])
|
||||||
|
|
||||||
|
# obtain `id_token` portion of Location
|
||||||
|
components = urlsplit(response['Location'])
|
||||||
|
fragment = parse_qs(components[4])
|
||||||
|
id_token = JWT().unpack(fragment["id_token"][0].encode('utf-8')).payload()
|
||||||
|
|
||||||
|
self.assertIn('at_hash', id_token)
|
||||||
|
|
||||||
|
def test_id_token_at_hash(self):
|
||||||
|
"""
|
||||||
|
Implicit client requesting `id_token` should not receive
|
||||||
|
`at_hash` in `id_token`.
|
||||||
|
"""
|
||||||
|
data = {
|
||||||
|
'client_id': self.client_no_access.client_id,
|
||||||
|
'redirect_uri': self.client_no_access.default_redirect_uri,
|
||||||
|
'response_type': self.client_no_access.response_type,
|
||||||
|
'scope': 'openid email',
|
||||||
|
'state': self.state,
|
||||||
|
'nonce': self.nonce,
|
||||||
|
'allow': 'Accept',
|
||||||
|
}
|
||||||
|
|
||||||
|
response = self._auth_request('post', data, is_user_authenticated=True)
|
||||||
|
|
||||||
|
self.assertIn('id_token', response['Location'])
|
||||||
|
|
||||||
|
# obtain `id_token` portion of Location
|
||||||
|
components = urlsplit(response['Location'])
|
||||||
|
fragment = parse_qs(components[4])
|
||||||
|
id_token = JWT().unpack(fragment["id_token"][0].encode('utf-8')).payload()
|
||||||
|
|
||||||
|
self.assertNotIn('at_hash', id_token)
|
112
runtests.py
Normal file
112
runtests.py
Normal file
|
@ -0,0 +1,112 @@
|
||||||
|
#!/usr/bin/env python
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
|
||||||
|
import django
|
||||||
|
|
||||||
|
from django.conf import settings
|
||||||
|
|
||||||
|
|
||||||
|
DEFAULT_SETTINGS = dict(
|
||||||
|
|
||||||
|
DEBUG = False,
|
||||||
|
|
||||||
|
DATABASES = {
|
||||||
|
'default': {
|
||||||
|
'ENGINE': 'django.db.backends.sqlite3',
|
||||||
|
'NAME': ':memory:',
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
SITE_ID = 1,
|
||||||
|
|
||||||
|
MIDDLEWARE_CLASSES = [
|
||||||
|
'django.middleware.common.CommonMiddleware',
|
||||||
|
'django.contrib.sessions.middleware.SessionMiddleware',
|
||||||
|
'django.contrib.auth.middleware.AuthenticationMiddleware',
|
||||||
|
],
|
||||||
|
|
||||||
|
TEMPLATES = [
|
||||||
|
{
|
||||||
|
'BACKEND': 'django.template.backends.django.DjangoTemplates',
|
||||||
|
'DIRS': [],
|
||||||
|
'APP_DIRS': True,
|
||||||
|
'OPTIONS': {
|
||||||
|
'context_processors': [
|
||||||
|
'django.template.context_processors.debug',
|
||||||
|
'django.template.context_processors.request',
|
||||||
|
'django.contrib.auth.context_processors.auth',
|
||||||
|
'django.contrib.messages.context_processors.messages',
|
||||||
|
],
|
||||||
|
},
|
||||||
|
},
|
||||||
|
],
|
||||||
|
|
||||||
|
LOGGING = {
|
||||||
|
'version': 1,
|
||||||
|
'disable_existing_loggers': False,
|
||||||
|
'handlers': {
|
||||||
|
'console': {
|
||||||
|
'class': 'logging.StreamHandler',
|
||||||
|
},
|
||||||
|
},
|
||||||
|
'loggers': {
|
||||||
|
'oidc_provider': {
|
||||||
|
'handlers': ['console'],
|
||||||
|
'level': os.getenv('DJANGO_LOG_LEVEL', 'DEBUG'),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
|
||||||
|
INSTALLED_APPS = [
|
||||||
|
'django.contrib.auth',
|
||||||
|
'django.contrib.contenttypes',
|
||||||
|
'django.contrib.sessions',
|
||||||
|
'django.contrib.sites',
|
||||||
|
'django.contrib.messages',
|
||||||
|
'django.contrib.admin',
|
||||||
|
'oidc_provider',
|
||||||
|
],
|
||||||
|
|
||||||
|
SECRET_KEY = 'this-should-be-top-secret',
|
||||||
|
|
||||||
|
ROOT_URLCONF = 'oidc_provider.tests.app.urls',
|
||||||
|
|
||||||
|
TEMPLATE_DIRS = [
|
||||||
|
'oidc_provider/tests/templates',
|
||||||
|
],
|
||||||
|
|
||||||
|
USE_TZ = True,
|
||||||
|
|
||||||
|
# OIDC Provider settings.
|
||||||
|
|
||||||
|
SITE_URL = 'http://localhost:8000',
|
||||||
|
OIDC_USERINFO = 'oidc_provider.tests.app.utils.userinfo',
|
||||||
|
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def runtests(*test_args):
|
||||||
|
if not settings.configured:
|
||||||
|
settings.configure(**DEFAULT_SETTINGS)
|
||||||
|
|
||||||
|
django.setup()
|
||||||
|
|
||||||
|
parent = os.path.dirname(os.path.abspath(__file__))
|
||||||
|
sys.path.insert(0, parent)
|
||||||
|
|
||||||
|
try:
|
||||||
|
from django.test.runner import DiscoverRunner
|
||||||
|
runner_class = DiscoverRunner
|
||||||
|
test_args = ["oidc_provider.tests"]
|
||||||
|
except ImportError:
|
||||||
|
from django.test.simple import DjangoTestSuiteRunner
|
||||||
|
runner_class = DjangoTestSuiteRunner
|
||||||
|
test_args = ["tests"]
|
||||||
|
|
||||||
|
failures = runner_class(verbosity=1, interactive=True, failfast=False).run_tests(test_args)
|
||||||
|
sys.exit(failures)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
runtests(*sys.argv[1:])
|
1
setup.py
1
setup.py
|
@ -34,6 +34,7 @@ setup(
|
||||||
'Topic :: Internet :: WWW/HTTP',
|
'Topic :: Internet :: WWW/HTTP',
|
||||||
'Topic :: Internet :: WWW/HTTP :: Dynamic Content',
|
'Topic :: Internet :: WWW/HTTP :: Dynamic Content',
|
||||||
],
|
],
|
||||||
|
test_suite="runtests.runtests",
|
||||||
tests_require=[
|
tests_require=[
|
||||||
'pyjwkest==1.1.0',
|
'pyjwkest==1.1.0',
|
||||||
'mock==2.0.0',
|
'mock==2.0.0',
|
||||||
|
|
Loading…
Reference in a new issue