From afc3a60ee7fed1988978e42c528d990b0a77ff68 Mon Sep 17 00:00:00 2001 From: Brian Rosner Date: Fri, 5 Aug 2016 13:11:01 -0600 Subject: [PATCH 1/4] Added at_hash when access token is present This is required by response type "id_token token", but can be used by other flows if they choose. --- oidc_provider/lib/endpoints/authorize.py | 44 ++++++++++++++---------- oidc_provider/lib/endpoints/token.py | 26 +++++++------- oidc_provider/lib/utils/token.py | 14 +++++--- oidc_provider/models.py | 16 ++++++++- 4 files changed, 63 insertions(+), 37 deletions(-) diff --git a/oidc_provider/lib/endpoints/authorize.py b/oidc_provider/lib/endpoints/authorize.py index d2d1951..b3ea536 100644 --- a/oidc_provider/lib/endpoints/authorize.py +++ b/oidc_provider/lib/endpoints/authorize.py @@ -121,35 +121,41 @@ class AuthorizeEndpoint(object): query_params['state'] = self.params.state if self.params.state else '' elif self.grant_type == 'implicit': - # We don't need id_token if it's an OAuth2 request. - if self.is_authentication: - id_token_dic = create_id_token( - user=self.request.user, - aud=self.client.client_id, - nonce=self.params.nonce, - request=self.request) - query_fragment['id_token'] = encode_id_token(id_token_dic, self.client) - else: - id_token_dic = {} - token = create_token( user=self.request.user, client=self.client, - id_token_dic=id_token_dic, scope=self.params.scope) - # Store the token. - token.save() - - query_fragment['token_type'] = 'bearer' - # TODO: Create setting 'OIDC_TOKEN_EXPIRE'. - query_fragment['expires_in'] = 60 * 10 - # Check if response_type is an OpenID request with value 'id_token token' # or it's an OAuth2 Implicit Flow request. if self.params.response_type in ['id_token token', 'token']: query_fragment['access_token'] = token.access_token + # We don't need id_token if it's an OAuth2 request. + if self.is_authentication: + kwargs = { + "user": self.request.user, + "aud": self.client.client_id, + "nonce": self.params.nonce, + "request": self.request + } + # Include at_hash when access_token is being returned. + if 'access_token' in query_fragment: + kwargs['at_hash'] = token.at_hash + id_token_dic = create_id_token(**kwargs) + query_fragment['id_token'] = encode_id_token(id_token_dic, self.client) + token.id_token = id_token_dic + else: + id_token_dic = {} + + # Store the token. + token.id_token = id_token_dic + token.save() + + query_fragment['token_type'] = 'bearer' + # TODO: Create setting 'OIDC_TOKEN_EXPIRE'. + query_fragment['expires_in'] = 60 * 10 + query_fragment['state'] = self.params.state if self.params.state else '' except Exception as error: diff --git a/oidc_provider/lib/endpoints/token.py b/oidc_provider/lib/endpoints/token.py index 200cb2c..6aa3b83 100644 --- a/oidc_provider/lib/endpoints/token.py +++ b/oidc_provider/lib/endpoints/token.py @@ -131,23 +131,24 @@ class TokenEndpoint(object): return self.create_refresh_response_dic() def create_code_response_dic(self): + token = create_token( + user=self.code.user, + client=self.code.client, + scope=self.code.scope) + if self.code.is_authentication: id_token_dic = create_id_token( user=self.code.user, aud=self.client.client_id, nonce=self.code.nonce, + at_hash=token.at_hash, request=self.request, ) else: id_token_dic = {} - token = create_token( - user=self.code.user, - client=self.code.client, - id_token_dic=id_token_dic, - scope=self.code.scope) - # Store the token. + token.id_token = id_token_dic token.save() # We don't need to store the code anymore. @@ -164,24 +165,25 @@ class TokenEndpoint(object): return dic def create_refresh_response_dic(self): + token = create_token( + user=self.token.user, + client=self.token.client, + scope=self.token.scope) + # If the Token has an id_token it's an Authentication request. if self.token.id_token: id_token_dic = create_id_token( user=self.token.user, aud=self.client.client_id, nonce=None, + at_hash=token.at_hash, request=self.request, ) else: id_token_dic = {} - token = create_token( - user=self.token.user, - client=self.token.client, - id_token_dic=id_token_dic, - scope=self.token.scope) - # Store the token. + token.id_token = id_token_dic token.save() # Forget the old token. diff --git a/oidc_provider/lib/utils/token.py b/oidc_provider/lib/utils/token.py index fc0880d..83291ec 100644 --- a/oidc_provider/lib/utils/token.py +++ b/oidc_provider/lib/utils/token.py @@ -13,7 +13,7 @@ from oidc_provider.models import * from oidc_provider import settings -def create_id_token(user, aud, nonce, request=None): +def create_id_token(user, aud, nonce, at_hash=None, request=None): """ Receives a user object and aud (audience). Then creates the id_token dictionary. @@ -44,6 +44,9 @@ def create_id_token(user, aud, nonce, request=None): if nonce: dic['nonce'] = str(nonce) + if at_hash: + dic['at_hash'] = at_hash + processing_hook = settings.get('OIDC_IDTOKEN_PROCESSING_HOOK') if isinstance(processing_hook, (list, tuple)): @@ -73,13 +76,13 @@ def encode_id_token(payload, client): keys = [SYMKey(key=client.client_secret, alg=alg)] else: raise Exception('Unsupported key algorithm.') - + _jws = JWS(payload, alg=alg) return _jws.sign_compact(keys) -def create_token(user, client, id_token_dic, scope): +def create_token(user, client, scope, id_token_dic=None): """ Create and populate a Token object. @@ -90,7 +93,8 @@ def create_token(user, client, id_token_dic, scope): token.client = client token.access_token = uuid.uuid4().hex - token.id_token = id_token_dic + if id_token_dic is not None: + token.id_token = id_token_dic token.refresh_token = uuid.uuid4().hex token.expires_at = timezone.now() + timedelta( @@ -112,7 +116,7 @@ def create_code(user, client, scope, nonce, is_authentication, code.client = client code.code = uuid.uuid4().hex - + if code_challenge and code_challenge_method: code.code_challenge = code_challenge code.code_challenge_method = code_challenge_method diff --git a/oidc_provider/models.py b/oidc_provider/models.py index 09b36a2..284e858 100644 --- a/oidc_provider/models.py +++ b/oidc_provider/models.py @@ -1,5 +1,7 @@ # -*- coding: utf-8 -*- -from hashlib import md5 +import base64 +import binascii +from hashlib import md5, sha256 import json from django.db import models @@ -117,6 +119,18 @@ class Token(BaseCodeTokenModel): verbose_name = _(u'Token') verbose_name_plural = _(u'Tokens') + @property + def at_hash(self): + # @@@ d-o-p only supports 256 bits (change this if that changes) + hashed_access_token = sha256( + self.access_token.encode('ascii') + ).hexdigest().encode('ascii') + return base64.urlsafe_b64encode( + binascii.unhexlify( + hashed_access_token[:len(hashed_access_token) // 2] + ) + ).rstrip(b'=').decode('ascii') + class UserConsent(BaseCodeTokenModel): From ffddb69f80e49a067ff4841e933f3bccf830cec0 Mon Sep 17 00:00:00 2001 From: Graham Ullrich Date: Mon, 8 Aug 2016 11:24:07 -0600 Subject: [PATCH 2/4] Add tests for at_hash Ensure at_hash is present in id_token when warranted. --- .gitignore | 2 + oidc_provider/lib/endpoints/token.py | 4 +- oidc_provider/runtests.py | 0 .../tests/test_authorize_endpoint.py | 48 +---- oidc_provider/tests/test_code_flow.py | 51 +++++ oidc_provider/tests/test_implicit_flow.py | 185 ++++++++++++++++++ runtests.py | 112 +++++++++++ setup.py | 1 + 8 files changed, 357 insertions(+), 46 deletions(-) create mode 100644 oidc_provider/runtests.py create mode 100644 oidc_provider/tests/test_code_flow.py create mode 100644 oidc_provider/tests/test_implicit_flow.py create mode 100644 runtests.py diff --git a/.gitignore b/.gitignore index fa1d0dc..2c6f875 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,5 @@ src/ .venv .idea docs/_build/ +.eggs/ +.python-version diff --git a/oidc_provider/lib/endpoints/token.py b/oidc_provider/lib/endpoints/token.py index 6aa3b83..8a7832d 100644 --- a/oidc_provider/lib/endpoints/token.py +++ b/oidc_provider/lib/endpoints/token.py @@ -146,9 +146,9 @@ class TokenEndpoint(object): ) else: id_token_dic = {} + token.id_token = id_token_dic # Store the token. - token.id_token = id_token_dic token.save() # We don't need to store the code anymore. @@ -181,9 +181,9 @@ class TokenEndpoint(object): ) else: id_token_dic = {} + token.id_token = id_token_dic # Store the token. - token.id_token = id_token_dic token.save() # Forget the old token. diff --git a/oidc_provider/runtests.py b/oidc_provider/runtests.py new file mode 100644 index 0000000..e69de29 diff --git a/oidc_provider/tests/test_authorize_endpoint.py b/oidc_provider/tests/test_authorize_endpoint.py index dd22800..cb0712c 100644 --- a/oidc_provider/tests/test_authorize_endpoint.py +++ b/oidc_provider/tests/test_authorize_endpoint.py @@ -1,10 +1,9 @@ try: - from urllib.parse import unquote, urlencode + from urllib.parse import urlencode except ImportError: - from urllib import unquote, urlencode + from urllib import urlencode import uuid -from django.contrib.auth import REDIRECT_FIELD_NAME from django.contrib.auth.models import AnonymousUser from django.core.management import call_command from django.core.urlresolvers import reverse @@ -28,7 +27,6 @@ class AuthorizationCodeFlowTestCase(TestCase): self.user = create_fake_user() self.client = create_fake_client(response_type='code') self.client_public = create_fake_client(response_type='code', is_public=True) - self.client_implicit = create_fake_client(response_type='id_token token') self.state = uuid.uuid4().hex self.nonce = uuid.uuid4().hex @@ -52,7 +50,6 @@ class AuthorizationCodeFlowTestCase(TestCase): return response - def test_missing_parameters(self): """ If the request fails due to a missing, invalid, or mismatching @@ -148,7 +145,7 @@ class AuthorizationCodeFlowTestCase(TestCase): for key, value in iter(to_check.items()): is_input_ok = input_html.format(key, value) in response.content.decode('utf-8') self.assertEqual(is_input_ok, True, - msg='Hidden input for "'+key+'" fails.') + msg='Hidden input for "' + key + '" fails.') def test_user_consent_response(self): """ @@ -183,7 +180,7 @@ class AuthorizationCodeFlowTestCase(TestCase): msg='"access_denied" code is missing in query.') # Simulate user authorization. - data['allow'] = 'Accept' # Will be the value of the button. + data['allow'] = 'Accept' # Will be the value of the button. response = self._auth_request('post', data, is_user_authenticated=True) @@ -270,43 +267,6 @@ class AuthorizationCodeFlowTestCase(TestCase): self.assertEqual('Request for Permission' in response.content.decode('utf-8'), True) - def test_implicit_missing_nonce(self): - """ - The `nonce` parameter is REQUIRED if you use the Implicit Flow. - """ - data = { - 'client_id': self.client_implicit.client_id, - 'response_type': self.client_implicit.response_type, - 'redirect_uri': self.client_implicit.default_redirect_uri, - 'scope': 'openid email', - 'state': self.state, - } - - response = self._auth_request('get', data, is_user_authenticated=True) - - self.assertEqual('#error=invalid_request' in response['Location'], True) - - def test_implicit_access_token_response(self): - """ - Unlike the Authorization Code flow, in which the client makes - separate requests for authorization and for an access token, the client - receives the access token as the result of the authorization request. - """ - data = { - 'client_id': self.client_implicit.client_id, - 'redirect_uri': self.client_implicit.default_redirect_uri, - 'response_type': self.client_implicit.response_type, - 'scope': 'openid email', - 'state': self.state, - 'nonce': self.nonce, - 'allow': 'Accept', - } - - response = self._auth_request('post', data, is_user_authenticated=True) - - self.assertEqual('access_token' in response['Location'], True) - - def test_prompt_parameter(self): """ Specifies whether the Authorization Server prompts the End-User for reauthentication and consent. diff --git a/oidc_provider/tests/test_code_flow.py b/oidc_provider/tests/test_code_flow.py new file mode 100644 index 0000000..60a4e54 --- /dev/null +++ b/oidc_provider/tests/test_code_flow.py @@ -0,0 +1,51 @@ +try: + from urllib.parse import unquote, urlencode +except ImportError: + from urllib import unquote, urlencode +import uuid + +from django.contrib.auth import REDIRECT_FIELD_NAME +from django.contrib.auth.models import AnonymousUser +from django.core.management import call_command +from django.core.urlresolvers import reverse +from django.test import RequestFactory +from django.test import TestCase + +from oidc_provider.models import * +from oidc_provider.tests.app.utils import * +from oidc_provider.views import * + + +class CodeFlowTestCase(TestCase): + """ + Test cases for Authorization Code Flow. + """ + + def setUp(self): + call_command('creatersakey') + self.factory = RequestFactory() + self.user = create_fake_user() + self.client = create_fake_client(response_type='code') + self.client_public = create_fake_client(response_type='code', is_public=True) + self.state = uuid.uuid4().hex + self.nonce = uuid.uuid4().hex + + def _auth_request(self, method, data={}, is_user_authenticated=False): + url = reverse('oidc_provider:authorize') + + if method.lower() == 'get': + query_str = urlencode(data).replace('+', '%20') + if query_str: + url += '?' + query_str + request = self.factory.get(url) + elif method.lower() == 'post': + request = self.factory.post(url, data=data) + else: + raise Exception('Method unsupported for an Authorization Request.') + + # Simulate that the user is logged. + request.user = self.user if is_user_authenticated else AnonymousUser() + + response = AuthorizeView.as_view()(request) + + return response diff --git a/oidc_provider/tests/test_implicit_flow.py b/oidc_provider/tests/test_implicit_flow.py new file mode 100644 index 0000000..1f52579 --- /dev/null +++ b/oidc_provider/tests/test_implicit_flow.py @@ -0,0 +1,185 @@ +try: + from urllib.parse import urlencode +except ImportError: + from urllib import urlencode +try: + from urllib.parse import urlparse, parse_qs +except ImportError: + from urlparse import urlparse, parse_qs +import uuid + +from django.contrib.auth.models import AnonymousUser +from django.core.management import call_command +from django.core.urlresolvers import reverse +from django.test import RequestFactory +from django.test import TestCase +from jwkest.jwt import JWT + +from oidc_provider.models import * +from oidc_provider.tests.app.utils import * +from oidc_provider.views import * + + +class ImplicitFlowTestCase(TestCase): + """ + Test cases for Authorization Implicit Flow. + """ + + def setUp(self): + call_command('creatersakey') + self.factory = RequestFactory() + self.user = create_fake_user() + self.client = create_fake_client(response_type='id_token token') + self.client_public = create_fake_client(response_type='id_token token', is_public=True) + self.client_no_access = create_fake_client(response_type='id_token') + self.client_public_no_access = create_fake_client(response_type='id_token', is_public=True) + self.state = uuid.uuid4().hex + self.nonce = uuid.uuid4().hex + + def _auth_request(self, method, data={}, is_user_authenticated=False): + url = reverse('oidc_provider:authorize') + + if method.lower() == 'get': + query_str = urlencode(data).replace('+', '%20') + if query_str: + url += '?' + query_str + request = self.factory.get(url) + elif method.lower() == 'post': + request = self.factory.post(url, data=data) + else: + raise Exception('Method unsupported for an Authorization Request.') + + # Simulate that the user is logged. + request.user = self.user if is_user_authenticated else AnonymousUser() + + response = AuthorizeView.as_view()(request) + + return response + + def test_missing_nonce(self): + """ + The `nonce` parameter is REQUIRED if you use the Implicit Flow. + """ + data = { + 'client_id': self.client.client_id, + 'response_type': self.client.response_type, + 'redirect_uri': self.client.default_redirect_uri, + 'scope': 'openid email', + 'state': self.state, + } + + response = self._auth_request('get', data, is_user_authenticated=True) + + self.assertEqual('#error=invalid_request' in response['Location'], True) + + def test_id_token_token_response(self): + """ + Implicit client requesting `id_token token` receives both id token + and access token as the result of the authorization request. + """ + data = { + 'client_id': self.client.client_id, + 'redirect_uri': self.client.default_redirect_uri, + 'response_type': self.client.response_type, + 'scope': 'openid email', + 'state': self.state, + 'nonce': self.nonce, + 'allow': 'Accept', + } + + response = self._auth_request('post', data, is_user_authenticated=True) + + self.assertIn('access_token', response['Location']) + self.assertIn('id_token', response['Location']) + + # same for public client + data['client_id'] = self.client_public.client_id, + data['redirect_uri'] = self.client_public.default_redirect_uri, + data['response_type'] = self.client_public.response_type, + + response = self._auth_request('post', data, is_user_authenticated=True) + + self.assertIn('access_token', response['Location']) + self.assertIn('id_token', response['Location']) + + def test_id_token_response(self): + """ + Implicit client requesting `id_token` receives + only an id token as the result of the authorization request. + """ + data = { + 'client_id': self.client_no_access.client_id, + 'redirect_uri': self.client_no_access.default_redirect_uri, + 'response_type': self.client_no_access.response_type, + 'scope': 'openid email', + 'state': self.state, + 'nonce': self.nonce, + 'allow': 'Accept', + } + + response = self._auth_request('post', data, is_user_authenticated=True) + + self.assertNotIn('access_token', response['Location']) + self.assertIn('id_token', response['Location']) + + # same for public client + data['client_id'] = self.client_public_no_access.client_id, + data['redirect_uri'] = self.client_public_no_access.default_redirect_uri, + data['response_type'] = self.client_public_no_access.response_type, + + response = self._auth_request('post', data, is_user_authenticated=True) + + self.assertNotIn('access_token', response['Location']) + self.assertIn('id_token', response['Location']) + + def test_id_token_token_at_hash(self): + """ + Implicit client requesting `id_token token` receives + `at_hash` in `id_token`. + """ + data = { + 'client_id': self.client.client_id, + 'redirect_uri': self.client.default_redirect_uri, + 'response_type': self.client.response_type, + 'scope': 'openid email', + 'state': self.state, + 'nonce': self.nonce, + 'allow': 'Accept', + } + + response = self._auth_request('post', data, is_user_authenticated=True) + + self.assertIn('id_token', response['Location']) + + # obtain `id_token` portion of Location + components = urlsplit(response['Location']) + fragment = parse_qs(components[4]) + id_token = JWT().unpack(fragment["id_token"][0].encode('utf-8')).payload() + + self.assertIn('at_hash', id_token) + + def test_id_token_at_hash(self): + """ + Implicit client requesting `id_token` should not receive + `at_hash` in `id_token`. + """ + data = { + 'client_id': self.client_no_access.client_id, + 'redirect_uri': self.client_no_access.default_redirect_uri, + 'response_type': self.client_no_access.response_type, + 'scope': 'openid email', + 'state': self.state, + 'nonce': self.nonce, + 'allow': 'Accept', + } + + response = self._auth_request('post', data, is_user_authenticated=True) + + self.assertIn('id_token', response['Location']) + + # obtain `id_token` portion of Location + components = urlsplit(response['Location']) + fragment = parse_qs(components[4]) + id_token = JWT().unpack(fragment["id_token"][0].encode('utf-8')).payload() + + self.assertNotIn('at_hash', id_token) diff --git a/runtests.py b/runtests.py new file mode 100644 index 0000000..388ce99 --- /dev/null +++ b/runtests.py @@ -0,0 +1,112 @@ +#!/usr/bin/env python +import os +import sys + +import django + +from django.conf import settings + + +DEFAULT_SETTINGS = dict( + + DEBUG = False, + + DATABASES = { + 'default': { + 'ENGINE': 'django.db.backends.sqlite3', + 'NAME': ':memory:', + } + }, + + SITE_ID = 1, + + MIDDLEWARE_CLASSES = [ + 'django.middleware.common.CommonMiddleware', + 'django.contrib.sessions.middleware.SessionMiddleware', + 'django.contrib.auth.middleware.AuthenticationMiddleware', + ], + + TEMPLATES = [ + { + 'BACKEND': 'django.template.backends.django.DjangoTemplates', + 'DIRS': [], + 'APP_DIRS': True, + 'OPTIONS': { + 'context_processors': [ + 'django.template.context_processors.debug', + 'django.template.context_processors.request', + 'django.contrib.auth.context_processors.auth', + 'django.contrib.messages.context_processors.messages', + ], + }, + }, + ], + + LOGGING = { + 'version': 1, + 'disable_existing_loggers': False, + 'handlers': { + 'console': { + 'class': 'logging.StreamHandler', + }, + }, + 'loggers': { + 'oidc_provider': { + 'handlers': ['console'], + 'level': os.getenv('DJANGO_LOG_LEVEL', 'DEBUG'), + }, + }, + }, + + INSTALLED_APPS = [ + 'django.contrib.auth', + 'django.contrib.contenttypes', + 'django.contrib.sessions', + 'django.contrib.sites', + 'django.contrib.messages', + 'django.contrib.admin', + 'oidc_provider', + ], + + SECRET_KEY = 'this-should-be-top-secret', + + ROOT_URLCONF = 'oidc_provider.tests.app.urls', + + TEMPLATE_DIRS = [ + 'oidc_provider/tests/templates', + ], + + USE_TZ = True, + + # OIDC Provider settings. + + SITE_URL = 'http://localhost:8000', + OIDC_USERINFO = 'oidc_provider.tests.app.utils.userinfo', + +) + + +def runtests(*test_args): + if not settings.configured: + settings.configure(**DEFAULT_SETTINGS) + + django.setup() + + parent = os.path.dirname(os.path.abspath(__file__)) + sys.path.insert(0, parent) + + try: + from django.test.runner import DiscoverRunner + runner_class = DiscoverRunner + test_args = ["oidc_provider.tests"] + except ImportError: + from django.test.simple import DjangoTestSuiteRunner + runner_class = DjangoTestSuiteRunner + test_args = ["tests"] + + failures = runner_class(verbosity=1, interactive=True, failfast=False).run_tests(test_args) + sys.exit(failures) + + +if __name__ == "__main__": + runtests(*sys.argv[1:]) diff --git a/setup.py b/setup.py index 049464d..cfde4c4 100644 --- a/setup.py +++ b/setup.py @@ -34,6 +34,7 @@ setup( 'Topic :: Internet :: WWW/HTTP', 'Topic :: Internet :: WWW/HTTP :: Dynamic Content', ], + test_suite="runtests.runtests", tests_require=[ 'pyjwkest==1.1.0', 'mock==2.0.0', From e04d42fedf82f28bfe6a8fa15601b43c1a0f3ff9 Mon Sep 17 00:00:00 2001 From: Graham Ullrich Date: Mon, 8 Aug 2016 11:54:40 -0600 Subject: [PATCH 3/4] flake8 fixes --- oidc_provider/models.py | 11 ++++++++++- oidc_provider/tests/test_code_flow.py | 5 ++--- oidc_provider/tests/test_implicit_flow.py | 4 ++-- oidc_provider/views.py | 6 +++--- 4 files changed, 17 insertions(+), 9 deletions(-) diff --git a/oidc_provider/models.py b/oidc_provider/models.py index 284e858..0adda40 100644 --- a/oidc_provider/models.py +++ b/oidc_provider/models.py @@ -26,6 +26,7 @@ JWT_ALGS = [ ('RS256', 'RS256'), ] + class Client(models.Model): name = models.CharField(max_length=100, default='', verbose_name=_(u'Name')) @@ -51,8 +52,10 @@ class Client(models.Model): def redirect_uris(): def fget(self): return self._redirect_uris.splitlines() + def fset(self, value): self._redirect_uris = '\n'.join(value) + return locals() redirect_uris = property(**redirect_uris()) @@ -71,8 +74,10 @@ class BaseCodeTokenModel(models.Model): def scope(): def fget(self): return self._scope.split() + def fset(self, value): self._scope = ' '.join(value) + return locals() scope = property(**scope()) @@ -107,11 +112,15 @@ class Token(BaseCodeTokenModel): access_token = models.CharField(max_length=255, unique=True, verbose_name=_(u'Access Token')) refresh_token = models.CharField(max_length=255, unique=True, null=True, verbose_name=_(u'Refresh Token')) _id_token = models.TextField(verbose_name=_(u'ID Token')) + def id_token(): + def fget(self): return json.loads(self._id_token) + def fset(self, value): self._id_token = json.dumps(value) + return locals() id_token = property(**id_token()) @@ -156,4 +165,4 @@ class RSAKey(models.Model): @property def kid(self): - return u'{0}'.format(md5(self.key.encode('utf-8')).hexdigest() if self.key else '') + return u'{0}'.format(md5(self.key.encode('utf-8')).hexdigest() if self.key else '') diff --git a/oidc_provider/tests/test_code_flow.py b/oidc_provider/tests/test_code_flow.py index 60a4e54..349d02e 100644 --- a/oidc_provider/tests/test_code_flow.py +++ b/oidc_provider/tests/test_code_flow.py @@ -1,10 +1,9 @@ try: - from urllib.parse import unquote, urlencode + from urllib.parse import urlencode except ImportError: - from urllib import unquote, urlencode + from urllib import urlencode import uuid -from django.contrib.auth import REDIRECT_FIELD_NAME from django.contrib.auth.models import AnonymousUser from django.core.management import call_command from django.core.urlresolvers import reverse diff --git a/oidc_provider/tests/test_implicit_flow.py b/oidc_provider/tests/test_implicit_flow.py index 1f52579..121d621 100644 --- a/oidc_provider/tests/test_implicit_flow.py +++ b/oidc_provider/tests/test_implicit_flow.py @@ -3,9 +3,9 @@ try: except ImportError: from urllib import urlencode try: - from urllib.parse import urlparse, parse_qs + from urllib.parse import parse_qs, urlsplit except ImportError: - from urlparse import urlparse, parse_qs + from urlparse import parse_qs, urlsplit import uuid from django.contrib.auth.models import AnonymousUser diff --git a/oidc_provider/views.py b/oidc_provider/views.py index a22ca17..ae4bf69 100644 --- a/oidc_provider/views.py +++ b/oidc_provider/views.py @@ -193,8 +193,8 @@ class ProviderInfoView(View): # See: http://openid.net/specs/openid-connect-core-1_0.html#SubjectIDTypes dic['subject_types_supported'] = ['public'] - dic['token_endpoint_auth_methods_supported'] = [ 'client_secret_post', - 'client_secret_basic' ] + dic['token_endpoint_auth_methods_supported'] = ['client_secret_post', + 'client_secret_basic'] return JsonResponse(dic) @@ -205,7 +205,7 @@ class JwksView(View): dic = dict(keys=[]) for rsakey in RSAKey.objects.all(): - public_key = RSA.importKey(rsakey.key).publickey() + public_key = RSA.importKey(rsakey.key).publickey() dic['keys'].append({ 'kty': 'RSA', 'alg': 'RS256', From e822252b6ea4d6f4dc10d9af5377e930d9c5409c Mon Sep 17 00:00:00 2001 From: Graham Ullrich Date: Mon, 8 Aug 2016 12:20:47 -0600 Subject: [PATCH 4/4] Use original test files --- .../tests/test_authorize_endpoint.py | 172 +++++++++++++++- oidc_provider/tests/test_code_flow.py | 50 ----- oidc_provider/tests/test_implicit_flow.py | 185 ------------------ oidc_provider/tests/test_token_endpoint.py | 15 ++ 4 files changed, 186 insertions(+), 236 deletions(-) delete mode 100644 oidc_provider/tests/test_code_flow.py delete mode 100644 oidc_provider/tests/test_implicit_flow.py diff --git a/oidc_provider/tests/test_authorize_endpoint.py b/oidc_provider/tests/test_authorize_endpoint.py index cb0712c..f892ae2 100644 --- a/oidc_provider/tests/test_authorize_endpoint.py +++ b/oidc_provider/tests/test_authorize_endpoint.py @@ -2,6 +2,10 @@ try: from urllib.parse import urlencode except ImportError: from urllib import urlencode +try: + from urllib.parse import parse_qs, urlsplit +except ImportError: + from urlparse import parse_qs, urlsplit import uuid from django.contrib.auth.models import AnonymousUser @@ -9,6 +13,7 @@ from django.core.management import call_command from django.core.urlresolvers import reverse from django.test import RequestFactory from django.test import TestCase +from jwkest.jwt import JWT from oidc_provider import settings from oidc_provider.models import * @@ -18,7 +23,7 @@ from oidc_provider.views import * class AuthorizationCodeFlowTestCase(TestCase): """ - Test cases for Authorize Endpoint using Authorization Code Flow. + Test cases for Authorize Endpoint using Code Flow. """ def setUp(self): @@ -291,3 +296,168 @@ class AuthorizationCodeFlowTestCase(TestCase): # An error is returned if the Client does not have pre-configured consent for the requested Claims. self.assertEqual('interaction_required' in response['Location'], True) + + +class AuthorizationImplicitFlowTestCase(TestCase): + """ + Test cases for Authorization Endpoint using Implicit Flow. + """ + + def setUp(self): + call_command('creatersakey') + self.factory = RequestFactory() + self.user = create_fake_user() + self.client = create_fake_client(response_type='id_token token') + self.client_public = create_fake_client(response_type='id_token token', is_public=True) + self.client_no_access = create_fake_client(response_type='id_token') + self.client_public_no_access = create_fake_client(response_type='id_token', is_public=True) + self.state = uuid.uuid4().hex + self.nonce = uuid.uuid4().hex + + def _auth_request(self, method, data={}, is_user_authenticated=False): + url = reverse('oidc_provider:authorize') + + if method.lower() == 'get': + query_str = urlencode(data).replace('+', '%20') + if query_str: + url += '?' + query_str + request = self.factory.get(url) + elif method.lower() == 'post': + request = self.factory.post(url, data=data) + else: + raise Exception('Method unsupported for an Authorization Request.') + + # Simulate that the user is logged. + request.user = self.user if is_user_authenticated else AnonymousUser() + + response = AuthorizeView.as_view()(request) + + return response + + def test_missing_nonce(self): + """ + The `nonce` parameter is REQUIRED if you use the Implicit Flow. + """ + data = { + 'client_id': self.client.client_id, + 'response_type': self.client.response_type, + 'redirect_uri': self.client.default_redirect_uri, + 'scope': 'openid email', + 'state': self.state, + } + + response = self._auth_request('get', data, is_user_authenticated=True) + + self.assertEqual('#error=invalid_request' in response['Location'], True) + + def test_id_token_token_response(self): + """ + Implicit client requesting `id_token token` receives both id token + and access token as the result of the authorization request. + """ + data = { + 'client_id': self.client.client_id, + 'redirect_uri': self.client.default_redirect_uri, + 'response_type': self.client.response_type, + 'scope': 'openid email', + 'state': self.state, + 'nonce': self.nonce, + 'allow': 'Accept', + } + + response = self._auth_request('post', data, is_user_authenticated=True) + + self.assertIn('access_token', response['Location']) + self.assertIn('id_token', response['Location']) + + # same for public client + data['client_id'] = self.client_public.client_id, + data['redirect_uri'] = self.client_public.default_redirect_uri, + data['response_type'] = self.client_public.response_type, + + response = self._auth_request('post', data, is_user_authenticated=True) + + self.assertIn('access_token', response['Location']) + self.assertIn('id_token', response['Location']) + + def test_id_token_response(self): + """ + Implicit client requesting `id_token` receives + only an id token as the result of the authorization request. + """ + data = { + 'client_id': self.client_no_access.client_id, + 'redirect_uri': self.client_no_access.default_redirect_uri, + 'response_type': self.client_no_access.response_type, + 'scope': 'openid email', + 'state': self.state, + 'nonce': self.nonce, + 'allow': 'Accept', + } + + response = self._auth_request('post', data, is_user_authenticated=True) + + self.assertNotIn('access_token', response['Location']) + self.assertIn('id_token', response['Location']) + + # same for public client + data['client_id'] = self.client_public_no_access.client_id, + data['redirect_uri'] = self.client_public_no_access.default_redirect_uri, + data['response_type'] = self.client_public_no_access.response_type, + + response = self._auth_request('post', data, is_user_authenticated=True) + + self.assertNotIn('access_token', response['Location']) + self.assertIn('id_token', response['Location']) + + def test_id_token_token_at_hash(self): + """ + Implicit client requesting `id_token token` receives + `at_hash` in `id_token`. + """ + data = { + 'client_id': self.client.client_id, + 'redirect_uri': self.client.default_redirect_uri, + 'response_type': self.client.response_type, + 'scope': 'openid email', + 'state': self.state, + 'nonce': self.nonce, + 'allow': 'Accept', + } + + response = self._auth_request('post', data, is_user_authenticated=True) + + self.assertIn('id_token', response['Location']) + + # obtain `id_token` portion of Location + components = urlsplit(response['Location']) + fragment = parse_qs(components[4]) + id_token = JWT().unpack(fragment["id_token"][0].encode('utf-8')).payload() + + self.assertIn('at_hash', id_token) + + def test_id_token_at_hash(self): + """ + Implicit client requesting `id_token` should not receive + `at_hash` in `id_token`. + """ + data = { + 'client_id': self.client_no_access.client_id, + 'redirect_uri': self.client_no_access.default_redirect_uri, + 'response_type': self.client_no_access.response_type, + 'scope': 'openid email', + 'state': self.state, + 'nonce': self.nonce, + 'allow': 'Accept', + } + + response = self._auth_request('post', data, is_user_authenticated=True) + + self.assertIn('id_token', response['Location']) + + # obtain `id_token` portion of Location + components = urlsplit(response['Location']) + fragment = parse_qs(components[4]) + id_token = JWT().unpack(fragment["id_token"][0].encode('utf-8')).payload() + + self.assertNotIn('at_hash', id_token) diff --git a/oidc_provider/tests/test_code_flow.py b/oidc_provider/tests/test_code_flow.py deleted file mode 100644 index 349d02e..0000000 --- a/oidc_provider/tests/test_code_flow.py +++ /dev/null @@ -1,50 +0,0 @@ -try: - from urllib.parse import urlencode -except ImportError: - from urllib import urlencode -import uuid - -from django.contrib.auth.models import AnonymousUser -from django.core.management import call_command -from django.core.urlresolvers import reverse -from django.test import RequestFactory -from django.test import TestCase - -from oidc_provider.models import * -from oidc_provider.tests.app.utils import * -from oidc_provider.views import * - - -class CodeFlowTestCase(TestCase): - """ - Test cases for Authorization Code Flow. - """ - - def setUp(self): - call_command('creatersakey') - self.factory = RequestFactory() - self.user = create_fake_user() - self.client = create_fake_client(response_type='code') - self.client_public = create_fake_client(response_type='code', is_public=True) - self.state = uuid.uuid4().hex - self.nonce = uuid.uuid4().hex - - def _auth_request(self, method, data={}, is_user_authenticated=False): - url = reverse('oidc_provider:authorize') - - if method.lower() == 'get': - query_str = urlencode(data).replace('+', '%20') - if query_str: - url += '?' + query_str - request = self.factory.get(url) - elif method.lower() == 'post': - request = self.factory.post(url, data=data) - else: - raise Exception('Method unsupported for an Authorization Request.') - - # Simulate that the user is logged. - request.user = self.user if is_user_authenticated else AnonymousUser() - - response = AuthorizeView.as_view()(request) - - return response diff --git a/oidc_provider/tests/test_implicit_flow.py b/oidc_provider/tests/test_implicit_flow.py deleted file mode 100644 index 121d621..0000000 --- a/oidc_provider/tests/test_implicit_flow.py +++ /dev/null @@ -1,185 +0,0 @@ -try: - from urllib.parse import urlencode -except ImportError: - from urllib import urlencode -try: - from urllib.parse import parse_qs, urlsplit -except ImportError: - from urlparse import parse_qs, urlsplit -import uuid - -from django.contrib.auth.models import AnonymousUser -from django.core.management import call_command -from django.core.urlresolvers import reverse -from django.test import RequestFactory -from django.test import TestCase -from jwkest.jwt import JWT - -from oidc_provider.models import * -from oidc_provider.tests.app.utils import * -from oidc_provider.views import * - - -class ImplicitFlowTestCase(TestCase): - """ - Test cases for Authorization Implicit Flow. - """ - - def setUp(self): - call_command('creatersakey') - self.factory = RequestFactory() - self.user = create_fake_user() - self.client = create_fake_client(response_type='id_token token') - self.client_public = create_fake_client(response_type='id_token token', is_public=True) - self.client_no_access = create_fake_client(response_type='id_token') - self.client_public_no_access = create_fake_client(response_type='id_token', is_public=True) - self.state = uuid.uuid4().hex - self.nonce = uuid.uuid4().hex - - def _auth_request(self, method, data={}, is_user_authenticated=False): - url = reverse('oidc_provider:authorize') - - if method.lower() == 'get': - query_str = urlencode(data).replace('+', '%20') - if query_str: - url += '?' + query_str - request = self.factory.get(url) - elif method.lower() == 'post': - request = self.factory.post(url, data=data) - else: - raise Exception('Method unsupported for an Authorization Request.') - - # Simulate that the user is logged. - request.user = self.user if is_user_authenticated else AnonymousUser() - - response = AuthorizeView.as_view()(request) - - return response - - def test_missing_nonce(self): - """ - The `nonce` parameter is REQUIRED if you use the Implicit Flow. - """ - data = { - 'client_id': self.client.client_id, - 'response_type': self.client.response_type, - 'redirect_uri': self.client.default_redirect_uri, - 'scope': 'openid email', - 'state': self.state, - } - - response = self._auth_request('get', data, is_user_authenticated=True) - - self.assertEqual('#error=invalid_request' in response['Location'], True) - - def test_id_token_token_response(self): - """ - Implicit client requesting `id_token token` receives both id token - and access token as the result of the authorization request. - """ - data = { - 'client_id': self.client.client_id, - 'redirect_uri': self.client.default_redirect_uri, - 'response_type': self.client.response_type, - 'scope': 'openid email', - 'state': self.state, - 'nonce': self.nonce, - 'allow': 'Accept', - } - - response = self._auth_request('post', data, is_user_authenticated=True) - - self.assertIn('access_token', response['Location']) - self.assertIn('id_token', response['Location']) - - # same for public client - data['client_id'] = self.client_public.client_id, - data['redirect_uri'] = self.client_public.default_redirect_uri, - data['response_type'] = self.client_public.response_type, - - response = self._auth_request('post', data, is_user_authenticated=True) - - self.assertIn('access_token', response['Location']) - self.assertIn('id_token', response['Location']) - - def test_id_token_response(self): - """ - Implicit client requesting `id_token` receives - only an id token as the result of the authorization request. - """ - data = { - 'client_id': self.client_no_access.client_id, - 'redirect_uri': self.client_no_access.default_redirect_uri, - 'response_type': self.client_no_access.response_type, - 'scope': 'openid email', - 'state': self.state, - 'nonce': self.nonce, - 'allow': 'Accept', - } - - response = self._auth_request('post', data, is_user_authenticated=True) - - self.assertNotIn('access_token', response['Location']) - self.assertIn('id_token', response['Location']) - - # same for public client - data['client_id'] = self.client_public_no_access.client_id, - data['redirect_uri'] = self.client_public_no_access.default_redirect_uri, - data['response_type'] = self.client_public_no_access.response_type, - - response = self._auth_request('post', data, is_user_authenticated=True) - - self.assertNotIn('access_token', response['Location']) - self.assertIn('id_token', response['Location']) - - def test_id_token_token_at_hash(self): - """ - Implicit client requesting `id_token token` receives - `at_hash` in `id_token`. - """ - data = { - 'client_id': self.client.client_id, - 'redirect_uri': self.client.default_redirect_uri, - 'response_type': self.client.response_type, - 'scope': 'openid email', - 'state': self.state, - 'nonce': self.nonce, - 'allow': 'Accept', - } - - response = self._auth_request('post', data, is_user_authenticated=True) - - self.assertIn('id_token', response['Location']) - - # obtain `id_token` portion of Location - components = urlsplit(response['Location']) - fragment = parse_qs(components[4]) - id_token = JWT().unpack(fragment["id_token"][0].encode('utf-8')).payload() - - self.assertIn('at_hash', id_token) - - def test_id_token_at_hash(self): - """ - Implicit client requesting `id_token` should not receive - `at_hash` in `id_token`. - """ - data = { - 'client_id': self.client_no_access.client_id, - 'redirect_uri': self.client_no_access.default_redirect_uri, - 'response_type': self.client_no_access.response_type, - 'scope': 'openid email', - 'state': self.state, - 'nonce': self.nonce, - 'allow': 'Accept', - } - - response = self._auth_request('post', data, is_user_authenticated=True) - - self.assertIn('id_token', response['Location']) - - # obtain `id_token` portion of Location - components = urlsplit(response['Location']) - fragment = parse_qs(components[4]) - id_token = JWT().unpack(fragment["id_token"][0].encode('utf-8')).payload() - - self.assertNotIn('at_hash', id_token) diff --git a/oidc_provider/tests/test_token_endpoint.py b/oidc_provider/tests/test_token_endpoint.py index 0873b23..b993a2a 100644 --- a/oidc_provider/tests/test_token_endpoint.py +++ b/oidc_provider/tests/test_token_endpoint.py @@ -304,6 +304,21 @@ class TokenTestCase(TestCase): self.assertEqual(id_token.get('nonce'), None) + def test_id_token_contains_at_hash(self): + """ + If access_token is included, the id_token SHOULD contain an at_hash. + """ + code = self._create_code() + + post_data = self._auth_code_post_data(code=code.code) + + response = self._post_request(post_data) + + response_dic = json.loads(response.content.decode('utf-8')) + id_token = JWT().unpack(response_dic['id_token'].encode('utf-8')).payload() + + self.assertTrue(id_token.get('at_hash')) + def test_idtoken_sign_validation(self): """ We MUST validate the signature of the ID Token according to JWS