Add test for Hybrid flow. Plus refactoring.
This commit is contained in:
parent
f8dbfa5c04
commit
439774aeeb
2 changed files with 77 additions and 58 deletions
|
@ -60,15 +60,14 @@ def create_fake_client(response_type, is_public=False):
|
|||
|
||||
def is_code_valid(url, user, client):
|
||||
"""
|
||||
Check if the code inside the url is valid.
|
||||
Check if the code inside the url is valid. Supporting both query string and fragment.
|
||||
"""
|
||||
try:
|
||||
parsed = urlsplit(url)
|
||||
params = parse_qs(parsed.query)
|
||||
params = parse_qs(parsed.query or parsed.fragment)
|
||||
code = params['code'][0]
|
||||
code = Code.objects.get(code=code)
|
||||
is_code_ok = (code.client == client) and \
|
||||
(code.user == user)
|
||||
is_code_ok = (code.client == client) and (code.user == user)
|
||||
except:
|
||||
is_code_ok = False
|
||||
|
||||
|
|
|
@ -25,19 +25,7 @@ from oidc_provider.tests.app.utils import (
|
|||
from oidc_provider.views import AuthorizeView
|
||||
|
||||
|
||||
class AuthorizationCodeFlowTestCase(TestCase):
|
||||
"""
|
||||
Test cases for Authorize Endpoint using Code Flow.
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
call_command('creatersakey')
|
||||
self.factory = RequestFactory()
|
||||
self.user = create_fake_user()
|
||||
self.client = create_fake_client(response_type='code')
|
||||
self.client_public = create_fake_client(response_type='code', is_public=True)
|
||||
self.state = uuid.uuid4().hex
|
||||
self.nonce = uuid.uuid4().hex
|
||||
class AuthorizeEndpointMixin(object):
|
||||
|
||||
def _auth_request(self, method, data={}, is_user_authenticated=False):
|
||||
url = reverse('oidc_provider:authorize')
|
||||
|
@ -59,6 +47,21 @@ class AuthorizationCodeFlowTestCase(TestCase):
|
|||
|
||||
return response
|
||||
|
||||
|
||||
class AuthorizationCodeFlowTestCase(TestCase, AuthorizeEndpointMixin):
|
||||
"""
|
||||
Test cases for Authorize Endpoint using Code Flow.
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
call_command('creatersakey')
|
||||
self.factory = RequestFactory()
|
||||
self.user = create_fake_user()
|
||||
self.client = create_fake_client(response_type='code')
|
||||
self.client_public = create_fake_client(response_type='code', is_public=True)
|
||||
self.state = uuid.uuid4().hex
|
||||
self.nonce = uuid.uuid4().hex
|
||||
|
||||
def test_missing_parameters(self):
|
||||
"""
|
||||
If the request fails due to a missing, invalid, or mismatching
|
||||
|
@ -94,8 +97,7 @@ class AuthorizationCodeFlowTestCase(TestCase):
|
|||
self.assertEqual(response.has_header('Location'), True)
|
||||
|
||||
# Should be an 'error' component in query.
|
||||
query_exists = 'error=' in response['Location']
|
||||
self.assertEqual(query_exists, True)
|
||||
self.assertIn('error=', response['Location'])
|
||||
|
||||
def test_user_not_logged(self):
|
||||
"""
|
||||
|
@ -115,8 +117,7 @@ class AuthorizationCodeFlowTestCase(TestCase):
|
|||
response = self._auth_request('get', data)
|
||||
|
||||
# Check if user was redirected to the login view.
|
||||
login_url_exists = settings.get('LOGIN_URL') in response['Location']
|
||||
self.assertEqual(login_url_exists, True)
|
||||
self.assertIn(settings.get('LOGIN_URL'), response['Location'])
|
||||
|
||||
def test_user_consent_inputs(self):
|
||||
"""
|
||||
|
@ -183,10 +184,8 @@ class AuthorizationCodeFlowTestCase(TestCase):
|
|||
|
||||
# Because user doesn't allow app, SHOULD exists an error parameter
|
||||
# in the query.
|
||||
self.assertEqual('error=' in response['Location'], True,
|
||||
msg='error param is missing in query.')
|
||||
self.assertEqual('access_denied' in response['Location'], True,
|
||||
msg='"access_denied" code is missing in query.')
|
||||
self.assertIn('error=', response['Location'], msg='error param is missing in query.')
|
||||
self.assertIn('access_denied', response['Location'], msg='"access_denied" code is missing in query.')
|
||||
|
||||
# Simulate user authorization.
|
||||
data['allow'] = 'Accept' # Will be the value of the button.
|
||||
|
@ -201,8 +200,7 @@ class AuthorizationCodeFlowTestCase(TestCase):
|
|||
|
||||
# Check if the state is returned.
|
||||
state = (response['Location'].split('state='))[1].split('&')[0]
|
||||
self.assertEqual(state == self.state, True,
|
||||
msg='State change or is missing.')
|
||||
self.assertEqual(state, self.state, msg='State change or is missing.')
|
||||
|
||||
def test_user_consent_skipped(self):
|
||||
"""
|
||||
|
@ -227,8 +225,7 @@ class AuthorizationCodeFlowTestCase(TestCase):
|
|||
with self.settings(OIDC_SKIP_CONSENT_ALWAYS=True):
|
||||
response = self._auth_request('post', data, is_user_authenticated=True)
|
||||
|
||||
self.assertEqual('code' in response['Location'], True,
|
||||
msg='Code is missing in the returned url.')
|
||||
self.assertIn('code', response['Location'], msg='Code is missing in the returned url.')
|
||||
|
||||
response = self._auth_request('post', data, is_user_authenticated=True)
|
||||
|
||||
|
@ -274,7 +271,7 @@ class AuthorizationCodeFlowTestCase(TestCase):
|
|||
with self.settings(OIDC_SKIP_CONSENT_ALWAYS=True):
|
||||
response = self._auth_request('get', data, is_user_authenticated=True)
|
||||
|
||||
self.assertEqual('Request for Permission' in response.content.decode('utf-8'), True)
|
||||
self.assertIn('Request for Permission', response.content.decode('utf-8'))
|
||||
|
||||
def test_prompt_parameter(self):
|
||||
"""
|
||||
|
@ -294,15 +291,15 @@ class AuthorizationCodeFlowTestCase(TestCase):
|
|||
response = self._auth_request('get', data)
|
||||
|
||||
# An error is returned if an End-User is not already authenticated.
|
||||
self.assertEqual('login_required' in response['Location'], True)
|
||||
self.assertIn('login_required', response['Location'])
|
||||
|
||||
response = self._auth_request('get', data, is_user_authenticated=True)
|
||||
|
||||
# An error is returned if the Client does not have pre-configured consent for the requested Claims.
|
||||
self.assertEqual('interaction_required' in response['Location'], True)
|
||||
self.assertIn('interaction_required', response['Location'])
|
||||
|
||||
|
||||
class AuthorizationImplicitFlowTestCase(TestCase):
|
||||
class AuthorizationImplicitFlowTestCase(TestCase, AuthorizeEndpointMixin):
|
||||
"""
|
||||
Test cases for Authorization Endpoint using Implicit Flow.
|
||||
"""
|
||||
|
@ -318,26 +315,6 @@ class AuthorizationImplicitFlowTestCase(TestCase):
|
|||
self.state = uuid.uuid4().hex
|
||||
self.nonce = uuid.uuid4().hex
|
||||
|
||||
def _auth_request(self, method, data={}, is_user_authenticated=False):
|
||||
url = reverse('oidc_provider:authorize')
|
||||
|
||||
if method.lower() == 'get':
|
||||
query_str = urlencode(data).replace('+', '%20')
|
||||
if query_str:
|
||||
url += '?' + query_str
|
||||
request = self.factory.get(url)
|
||||
elif method.lower() == 'post':
|
||||
request = self.factory.post(url, data=data)
|
||||
else:
|
||||
raise Exception('Method unsupported for an Authorization Request.')
|
||||
|
||||
# Simulate that the user is logged.
|
||||
request.user = self.user if is_user_authenticated else AnonymousUser()
|
||||
|
||||
response = AuthorizeView.as_view()(request)
|
||||
|
||||
return response
|
||||
|
||||
def test_missing_nonce(self):
|
||||
"""
|
||||
The `nonce` parameter is REQUIRED if you use the Implicit Flow.
|
||||
|
@ -352,9 +329,9 @@ class AuthorizationImplicitFlowTestCase(TestCase):
|
|||
|
||||
response = self._auth_request('get', data, is_user_authenticated=True)
|
||||
|
||||
self.assertEqual('#error=invalid_request' in response['Location'], True)
|
||||
self.assertIn('#error=invalid_request', response['Location'])
|
||||
|
||||
def test_id_token_token_response(self):
|
||||
def test_idtoken_token_response(self):
|
||||
"""
|
||||
Implicit client requesting `id_token token` receives both id token
|
||||
and access token as the result of the authorization request.
|
||||
|
@ -384,7 +361,7 @@ class AuthorizationImplicitFlowTestCase(TestCase):
|
|||
self.assertIn('access_token', response['Location'])
|
||||
self.assertIn('id_token', response['Location'])
|
||||
|
||||
def test_id_token_response(self):
|
||||
def test_idtoken_response(self):
|
||||
"""
|
||||
Implicit client requesting `id_token` receives
|
||||
only an id token as the result of the authorization request.
|
||||
|
@ -414,7 +391,7 @@ class AuthorizationImplicitFlowTestCase(TestCase):
|
|||
self.assertNotIn('access_token', response['Location'])
|
||||
self.assertIn('id_token', response['Location'])
|
||||
|
||||
def test_id_token_token_at_hash(self):
|
||||
def test_idtoken_token_at_hash(self):
|
||||
"""
|
||||
Implicit client requesting `id_token token` receives
|
||||
`at_hash` in `id_token`.
|
||||
|
@ -440,7 +417,7 @@ class AuthorizationImplicitFlowTestCase(TestCase):
|
|||
|
||||
self.assertIn('at_hash', id_token)
|
||||
|
||||
def test_id_token_at_hash(self):
|
||||
def test_idtoken_at_hash(self):
|
||||
"""
|
||||
Implicit client requesting `id_token` should not receive
|
||||
`at_hash` in `id_token`.
|
||||
|
@ -465,3 +442,46 @@ class AuthorizationImplicitFlowTestCase(TestCase):
|
|||
id_token = JWT().unpack(fragment["id_token"][0].encode('utf-8')).payload()
|
||||
|
||||
self.assertNotIn('at_hash', id_token)
|
||||
|
||||
|
||||
class AuthorizationHybridFlowTestCase(TestCase, AuthorizeEndpointMixin):
|
||||
"""
|
||||
Test cases for Authorization Endpoint using Hybrid Flow.
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
call_command('creatersakey')
|
||||
self.factory = RequestFactory()
|
||||
self.user = create_fake_user()
|
||||
self.client_code_idtoken_token = create_fake_client(response_type='code id_token token', is_public=True)
|
||||
self.state = uuid.uuid4().hex
|
||||
self.nonce = uuid.uuid4().hex
|
||||
|
||||
def test_code_idtoken_token_response(self):
|
||||
"""
|
||||
Implicit client requesting `id_token token` receives both id token
|
||||
and access token as the result of the authorization request.
|
||||
"""
|
||||
data = {
|
||||
'client_id': self.client_code_idtoken_token.client_id,
|
||||
'redirect_uri': self.client_code_idtoken_token.default_redirect_uri,
|
||||
'response_type': self.client_code_idtoken_token.response_type,
|
||||
'scope': 'openid email',
|
||||
'state': self.state,
|
||||
'nonce': self.nonce,
|
||||
'allow': 'Accept',
|
||||
}
|
||||
|
||||
response = self._auth_request('post', data, is_user_authenticated=True)
|
||||
|
||||
self.assertIn('#', response['Location'])
|
||||
self.assertIn('access_token', response['Location'])
|
||||
self.assertIn('id_token', response['Location'])
|
||||
self.assertIn('state', response['Location'])
|
||||
self.assertIn('code', response['Location'])
|
||||
|
||||
# Validate code.
|
||||
is_code_ok = is_code_valid(url=response['Location'],
|
||||
user=self.user,
|
||||
client=self.client_code_idtoken_token)
|
||||
self.assertEqual(is_code_ok, True, msg='Code returned is invalid.')
|
||||
|
|
Loading…
Reference in a new issue