Fix PEP8. New migration.

This commit is contained in:
Juan Ignacio Fiorentino 2018-03-27 17:15:06 -03:00
parent e85b47bf11
commit 582587f337
3 changed files with 72 additions and 29 deletions

View file

@ -49,7 +49,8 @@ class AuthorizeEndpoint(object):
self.grant_type = 'authorization_code' self.grant_type = 'authorization_code'
elif self.params['response_type'] in ['id_token', 'id_token token', 'token']: elif self.params['response_type'] in ['id_token', 'id_token token', 'token']:
self.grant_type = 'implicit' self.grant_type = 'implicit'
elif self.params['response_type'] in ['code token', 'code id_token', 'code id_token token']: elif self.params['response_type'] in [
'code token', 'code id_token', 'code id_token token']:
self.grant_type = 'hybrid' self.grant_type = 'hybrid'
else: else:
self.grant_type = None self.grant_type = None
@ -76,7 +77,8 @@ class AuthorizeEndpoint(object):
self.params['state'] = query_dict.get('state', '') self.params['state'] = query_dict.get('state', '')
self.params['nonce'] = query_dict.get('nonce', '') self.params['nonce'] = query_dict.get('nonce', '')
self.params['prompt'] = self._allowed_prompt_params.intersection(set(query_dict.get('prompt', '').split())) self.params['prompt'] = self._allowed_prompt_params.intersection(
set(query_dict.get('prompt', '').split()))
self.params['code_challenge'] = query_dict.get('code_challenge', '') self.params['code_challenge'] = query_dict.get('code_challenge', '')
self.params['code_challenge_method'] = query_dict.get('code_challenge_method', '') self.params['code_challenge_method'] = query_dict.get('code_challenge_method', '')
@ -100,10 +102,11 @@ class AuthorizeEndpoint(object):
# Grant type validation. # Grant type validation.
if not self.grant_type: if not self.grant_type:
logger.debug('[Authorize] Invalid response type: %s', self.params['response_type']) logger.debug('[Authorize] Invalid response type: %s', self.params['response_type'])
raise AuthorizeError(self.params['redirect_uri'], 'unsupported_response_type', self.grant_type) raise AuthorizeError(
self.params['redirect_uri'], 'unsupported_response_type', self.grant_type)
if (not self.is_authentication and if (not self.is_authentication and (self.grant_type == 'hybrid' or
(self.grant_type == 'hybrid' or self.params['response_type'] in ['id_token', 'id_token token'])): self.params['response_type'] in ['id_token', 'id_token token'])):
logger.debug('[Authorize] Missing openid scope.') logger.debug('[Authorize] Missing openid scope.')
raise AuthorizeError(self.params['redirect_uri'], 'invalid_scope', self.grant_type) raise AuthorizeError(self.params['redirect_uri'], 'invalid_scope', self.grant_type)
@ -118,7 +121,8 @@ class AuthorizeEndpoint(object):
# PKCE validation of the transformation method. # PKCE validation of the transformation method.
if self.params['code_challenge']: if self.params['code_challenge']:
if not (self.params['code_challenge_method'] in ['plain', 'S256']): if not (self.params['code_challenge_method'] in ['plain', 'S256']):
raise AuthorizeError(self.params['redirect_uri'], 'invalid_request', self.grant_type) raise AuthorizeError(
self.params['redirect_uri'], 'invalid_request', self.grant_type)
def create_response_uri(self): def create_response_uri(self):
uri = urlsplit(self.params['redirect_uri']) uri = urlsplit(self.params['redirect_uri'])
@ -147,7 +151,8 @@ class AuthorizeEndpoint(object):
scope=self.params['scope']) scope=self.params['scope'])
# Check if response_type must include access_token in the response. # Check if response_type must include access_token in the response.
if self.params['response_type'] in ['id_token token', 'token', 'code token', 'code id_token token']: if (self.params['response_type'] in
['id_token token', 'token', 'code token', 'code id_token token']):
query_fragment['access_token'] = token.access_token query_fragment['access_token'] = token.access_token
# We don't need id_token if it's an OAuth2 request. # We don't need id_token if it's an OAuth2 request.
@ -188,7 +193,8 @@ class AuthorizeEndpoint(object):
if settings.get('OIDC_SESSION_MANAGEMENT_ENABLE'): if settings.get('OIDC_SESSION_MANAGEMENT_ENABLE'):
# Generate client origin URI from the redirect_uri param. # Generate client origin URI from the redirect_uri param.
redirect_uri_parsed = urlsplit(self.params['redirect_uri']) redirect_uri_parsed = urlsplit(self.params['redirect_uri'])
client_origin = '{0}://{1}'.format(redirect_uri_parsed.scheme, redirect_uri_parsed.netloc) client_origin = '{0}://{1}'.format(
redirect_uri_parsed.scheme, redirect_uri_parsed.netloc)
# Create random salt. # Create random salt.
salt = md5(uuid4().hex.encode()).hexdigest() salt = md5(uuid4().hex.encode()).hexdigest()
@ -213,7 +219,8 @@ class AuthorizeEndpoint(object):
raise AuthorizeError(self.params['redirect_uri'], 'server_error', self.grant_type) raise AuthorizeError(self.params['redirect_uri'], 'server_error', self.grant_type)
uri = uri._replace( uri = uri._replace(
query=urlencode(query_params, doseq=True), fragment=uri.fragment + urlencode(query_fragment, doseq=True)) query=urlencode(query_params, doseq=True),
fragment=uri.fragment + urlencode(query_fragment, doseq=True))
return urlunsplit(uri) return urlunsplit(uri)
@ -266,8 +273,8 @@ class AuthorizeEndpoint(object):
""" """
scopes = StandardScopeClaims.get_scopes_info(self.params['scope']) scopes = StandardScopeClaims.get_scopes_info(self.params['scope'])
if settings.get('OIDC_EXTRA_SCOPE_CLAIMS'): if settings.get('OIDC_EXTRA_SCOPE_CLAIMS'):
scopes_extra = settings.get('OIDC_EXTRA_SCOPE_CLAIMS', import_str=True).get_scopes_info( scopes_extra = settings.get(
self.params['scope']) 'OIDC_EXTRA_SCOPE_CLAIMS', import_str=True).get_scopes_info(self.params['scope'])
for index_extra, scope_extra in enumerate(scopes_extra): for index_extra, scope_extra in enumerate(scopes_extra):
for index, scope in enumerate(scopes[:]): for index, scope in enumerate(scopes[:]):
if scope_extra['scope'] == scope['scope']: if scope_extra['scope'] == scope['scope']:

View file

@ -0,0 +1,18 @@
# Generated by Django 2.0.3 on 2018-03-27 19:59
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('oidc_provider', '0023_client_owner'),
]
operations = [
migrations.AlterField(
model_name='client',
name='reuse_consent',
field=models.BooleanField(default=True, help_text="If enabled, server will save the user consent given to a specific client, so that user won't be prompted for the same authorization multiple times.", verbose_name='Reuse Consent?'),
),
]

View file

@ -70,7 +70,8 @@ class AuthorizationCodeFlowTestCase(TestCase, AuthorizeEndpointMixin):
self.factory = RequestFactory() self.factory = RequestFactory()
self.user = create_fake_user() self.user = create_fake_user()
self.client = create_fake_client(response_type='code') self.client = create_fake_client(response_type='code')
self.client_with_no_consent = create_fake_client(response_type='code', require_consent=False) self.client_with_no_consent = create_fake_client(
response_type='code', require_consent=False)
self.client_public = create_fake_client(response_type='code', is_public=True) self.client_public = create_fake_client(response_type='code', is_public=True)
self.client_public_with_no_consent = create_fake_client( self.client_public_with_no_consent = create_fake_client(
response_type='code', is_public=True, require_consent=False) response_type='code', is_public=True, require_consent=False)
@ -199,7 +200,8 @@ class AuthorizationCodeFlowTestCase(TestCase, AuthorizeEndpointMixin):
# Because user doesn't allow app, SHOULD exists an error parameter # Because user doesn't allow app, SHOULD exists an error parameter
# in the query. # in the query.
self.assertIn('error=', response['Location'], msg='error param is missing in query.') self.assertIn('error=', response['Location'], msg='error param is missing in query.')
self.assertIn('access_denied', response['Location'], msg='"access_denied" code is missing in query.') self.assertIn(
'access_denied', response['Location'], msg='"access_denied" code is missing in query.')
# Simulate user authorization. # Simulate user authorization.
data['allow'] = 'Accept' # Will be the value of the button. data['allow'] = 'Accept' # Will be the value of the button.
@ -280,10 +282,13 @@ class AuthorizationCodeFlowTestCase(TestCase, AuthorizeEndpointMixin):
client=self.client) client=self.client)
self.assertTrue(is_code_ok, msg='Code returned is invalid or missing') self.assertTrue(is_code_ok, msg='Code returned is invalid or missing')
self.assertEquals(set(params.keys()), {'state', 'code'}, msg='More than state or code appended as query params') self.assertEquals(
set(params.keys()), {'state', 'code'},
msg='More than state or code appended as query params')
self.assertTrue( self.assertTrue(
response['Location'].startswith(self.client.default_redirect_uri), msg='Different redirect_uri returned') response['Location'].startswith(self.client.default_redirect_uri),
msg='Different redirect_uri returned')
def test_unknown_redirect_uris_are_rejected(self): def test_unknown_redirect_uris_are_rejected(self):
""" """
@ -299,7 +304,8 @@ class AuthorizationCodeFlowTestCase(TestCase, AuthorizeEndpointMixin):
} }
response = self._auth_request('get', data) response = self._auth_request('get', data)
self.assertIn(RedirectUriError.error, response.content.decode('utf-8'), msg='No redirect_uri error') self.assertIn(
RedirectUriError.error, response.content.decode('utf-8'), msg='No redirect_uri error')
def test_manipulated_redirect_uris_are_rejected(self): def test_manipulated_redirect_uris_are_rejected(self):
""" """
@ -315,11 +321,13 @@ class AuthorizationCodeFlowTestCase(TestCase, AuthorizeEndpointMixin):
} }
response = self._auth_request('get', data) response = self._auth_request('get', data)
self.assertIn(RedirectUriError.error, response.content.decode('utf-8'), msg='No redirect_uri error') self.assertIn(
RedirectUriError.error, response.content.decode('utf-8'), msg='No redirect_uri error')
def test_public_client_auto_approval(self): def test_public_client_auto_approval(self):
""" """
It's recommended not auto-approving requests for non-confidential clients using Authorization Code. It's recommended not auto-approving requests for non-confidential
clients using Authorization Code.
""" """
data = { data = {
'client_id': self.client_public_with_no_consent.client_id, 'client_id': self.client_public_with_no_consent.client_id,
@ -335,7 +343,8 @@ class AuthorizationCodeFlowTestCase(TestCase, AuthorizeEndpointMixin):
def test_prompt_none_parameter(self): def test_prompt_none_parameter(self):
""" """
Specifies whether the Authorization Server prompts the End-User for reauthentication and consent. Specifies whether the Authorization Server prompts the End-User for
reauthentication and consent.
See: http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest See: http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest
""" """
data = { data = {
@ -354,13 +363,15 @@ class AuthorizationCodeFlowTestCase(TestCase, AuthorizeEndpointMixin):
response = self._auth_request('get', data, is_user_authenticated=True) response = self._auth_request('get', data, is_user_authenticated=True)
# An error is returned if the Client does not have pre-configured consent for the requested Claims. # An error is returned if the Client does not have pre-configured
# consent for the requested Claims.
self.assertIn('consent_required', response['Location']) self.assertIn('consent_required', response['Location'])
@patch('oidc_provider.views.django_user_logout') @patch('oidc_provider.views.django_user_logout')
def test_prompt_login_parameter(self, logout_function): def test_prompt_login_parameter(self, logout_function):
""" """
Specifies whether the Authorization Server prompts the End-User for reauthentication and consent. Specifies whether the Authorization Server prompts the End-User for
reauthentication and consent.
See: http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest See: http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest
""" """
data = { data = {
@ -393,7 +404,8 @@ class AuthorizationCodeFlowTestCase(TestCase, AuthorizeEndpointMixin):
def test_prompt_login_none_parameter(self): def test_prompt_login_none_parameter(self):
""" """
Specifies whether the Authorization Server prompts the End-User for reauthentication and consent. Specifies whether the Authorization Server prompts the End-User for
reauthentication and consent.
See: http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest See: http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest
""" """
data = { data = {
@ -414,7 +426,8 @@ class AuthorizationCodeFlowTestCase(TestCase, AuthorizeEndpointMixin):
@patch('oidc_provider.views.render') @patch('oidc_provider.views.render')
def test_prompt_consent_parameter(self, render_patched): def test_prompt_consent_parameter(self, render_patched):
""" """
Specifies whether the Authorization Server prompts the End-User for reauthentication and consent. Specifies whether the Authorization Server prompts the End-User for
reauthentication and consent.
See: http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest See: http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest
""" """
data = { data = {
@ -431,11 +444,13 @@ class AuthorizationCodeFlowTestCase(TestCase, AuthorizeEndpointMixin):
response = self._auth_request('get', data, is_user_authenticated=True) response = self._auth_request('get', data, is_user_authenticated=True)
render_patched.assert_called_once() render_patched.assert_called_once()
self.assertTrue(render_patched.call_args[0][1], settings.get('OIDC_TEMPLATES')['authorize']) self.assertTrue(
render_patched.call_args[0][1], settings.get('OIDC_TEMPLATES')['authorize'])
def test_prompt_consent_none_parameter(self): def test_prompt_consent_none_parameter(self):
""" """
Specifies whether the Authorization Server prompts the End-User for reauthentication and consent. Specifies whether the Authorization Server prompts the End-User for
reauthentication and consent.
See: http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest See: http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest
""" """
data = { data = {
@ -633,7 +648,8 @@ class AuthorizationHybridFlowTestCase(TestCase, AuthorizeEndpointMixin):
call_command('creatersakey') call_command('creatersakey')
self.factory = RequestFactory() self.factory = RequestFactory()
self.user = create_fake_user() self.user = create_fake_user()
self.client_code_idtoken_token = create_fake_client(response_type='code id_token token', is_public=True) self.client_code_idtoken_token = create_fake_client(
response_type='code id_token token', is_public=True)
self.state = uuid.uuid4().hex self.state = uuid.uuid4().hex
self.nonce = uuid.uuid4().hex self.nonce = uuid.uuid4().hex
@ -698,8 +714,9 @@ class TestCreateResponseURI(TestCase):
@patch('oidc_provider.lib.endpoints.authorize.logger.exception') @patch('oidc_provider.lib.endpoints.authorize.logger.exception')
def test_create_response_uri_logs_to_error(self, log_exception, create_code): def test_create_response_uri_logs_to_error(self, log_exception, create_code):
""" """
A lot can go wrong when creating a response uri and this is caught with a general Exception error. The A lot can go wrong when creating a response uri and this is caught
information contained within this error should show up in the error log so production servers have something with a general Exception error. The information contained within this
error should show up in the error log so production servers have something
to work with when things don't work as expected. to work with when things don't work as expected.
""" """
exception = Exception("Something went wrong!") exception = Exception("Something went wrong!")
@ -711,7 +728,8 @@ class TestCreateResponseURI(TestCase):
with self.assertRaises(Exception): with self.assertRaises(Exception):
authorization_endpoint.create_response_uri() authorization_endpoint.create_response_uri()
log_exception.assert_called_once_with('[Authorize] Error when trying to create response uri: %s', exception) log_exception.assert_called_once_with(
'[Authorize] Error when trying to create response uri: %s', exception)
@override_settings(OIDC_SESSION_MANAGEMENT_ENABLE=True) @override_settings(OIDC_SESSION_MANAGEMENT_ENABLE=True)
def test_create_response_uri_generates_session_state_if_session_management_enabled(self): def test_create_response_uri_generates_session_state_if_session_management_enabled(self):