Better tests for errors, disable grant type password by default
This commit is contained in:
parent
fc6241d3d0
commit
39111a8388
|
@ -14,6 +14,7 @@ from django.http import JsonResponse
|
||||||
|
|
||||||
from oidc_provider.lib.errors import (
|
from oidc_provider.lib.errors import (
|
||||||
TokenError,
|
TokenError,
|
||||||
|
UserAuthError,
|
||||||
)
|
)
|
||||||
from oidc_provider.lib.utils.token import (
|
from oidc_provider.lib.utils.token import (
|
||||||
create_id_token,
|
create_id_token,
|
||||||
|
@ -123,13 +124,16 @@ class TokenEndpoint(object):
|
||||||
raise TokenError('invalid_grant')
|
raise TokenError('invalid_grant')
|
||||||
|
|
||||||
elif self.params['grant_type'] == 'password':
|
elif self.params['grant_type'] == 'password':
|
||||||
|
if not settings.get('OIDC_GRANT_TYPE_PASSWORD_ENABLE'):
|
||||||
|
raise TokenError('unsupported_grant_type')
|
||||||
|
|
||||||
user = authenticate(
|
user = authenticate(
|
||||||
username=self.params['username'],
|
username=self.params['username'],
|
||||||
password=self.params['password']
|
password=self.params['password']
|
||||||
)
|
)
|
||||||
|
|
||||||
if not user:
|
if not user:
|
||||||
raise TokenError('Invalid user credentials')
|
raise UserAuthError()
|
||||||
|
|
||||||
self.user = user
|
self.user = user
|
||||||
|
|
||||||
|
|
|
@ -16,6 +16,21 @@ class ClientIdError(Exception):
|
||||||
description = 'The client identifier (client_id) is missing or invalid.'
|
description = 'The client identifier (client_id) is missing or invalid.'
|
||||||
|
|
||||||
|
|
||||||
|
class UserAuthError(Exception):
|
||||||
|
"""
|
||||||
|
Specific to the Resource Owner Password Credentials flow when
|
||||||
|
the Resource Owners credentials are not valid.
|
||||||
|
"""
|
||||||
|
error = 'access_denied'
|
||||||
|
description = 'The resource owner or authorization server denied ' \
|
||||||
|
'the request'
|
||||||
|
|
||||||
|
def create_dict(self):
|
||||||
|
return {
|
||||||
|
'error': self.error,
|
||||||
|
'error_description': self.description,
|
||||||
|
}
|
||||||
|
|
||||||
class AuthorizeError(Exception):
|
class AuthorizeError(Exception):
|
||||||
|
|
||||||
_errors = {
|
_errors = {
|
||||||
|
|
|
@ -115,6 +115,22 @@ class DefaultSettings(object):
|
||||||
"""
|
"""
|
||||||
return 'oidc_provider.lib.utils.common.default_idtoken_processing_hook'
|
return 'oidc_provider.lib.utils.common.default_idtoken_processing_hook'
|
||||||
|
|
||||||
|
@property
|
||||||
|
def OIDC_GRANT_TYPE_PASSWORD_ENABLE(self):
|
||||||
|
"""
|
||||||
|
OPTIONAL. A boolean to set whether to allow the Resource Owner Password
|
||||||
|
Credentials Grant. https://tools.ietf.org/html/rfc6749#section-4.3
|
||||||
|
|
||||||
|
From the specification:
|
||||||
|
Since this access token request utilizes the resource owner's
|
||||||
|
password, the authorization server MUST protect the endpoint
|
||||||
|
against brute force attacks (e.g., using rate-limitation or
|
||||||
|
generating alerts).
|
||||||
|
|
||||||
|
How you do this, is up to you.
|
||||||
|
"""
|
||||||
|
return False
|
||||||
|
|
||||||
default_settings = DefaultSettings()
|
default_settings = DefaultSettings()
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -164,6 +164,20 @@ class TokenTestCase(TestCase):
|
||||||
# o validate the resource owner password credentials using its
|
# o validate the resource owner password credentials using its
|
||||||
# existing password validation algorithm.
|
# existing password validation algorithm.
|
||||||
|
|
||||||
|
def test_default_setting_does_not_allow_grant_type_password(self):
|
||||||
|
post_data = self._password_grant_post_data()
|
||||||
|
|
||||||
|
response = self._post_request(
|
||||||
|
post_data=post_data,
|
||||||
|
extras=self._auth_header()
|
||||||
|
)
|
||||||
|
|
||||||
|
response_dict = json.loads(response.content.decode('utf-8'))
|
||||||
|
|
||||||
|
self.assertEqual(400, response.status_code)
|
||||||
|
self.assertEqual('unsupported_grant_type', response_dict['error'])
|
||||||
|
|
||||||
|
@override_settings(OIDC_GRANT_TYPE_PASSWORD_ENABLE=True)
|
||||||
def test_password_grant_get_access_token_without_scope(self):
|
def test_password_grant_get_access_token_without_scope(self):
|
||||||
post_data = self._password_grant_post_data()
|
post_data = self._password_grant_post_data()
|
||||||
del (post_data['scope'])
|
del (post_data['scope'])
|
||||||
|
@ -176,6 +190,7 @@ class TokenTestCase(TestCase):
|
||||||
response_dict = json.loads(response.content.decode('utf-8'))
|
response_dict = json.loads(response.content.decode('utf-8'))
|
||||||
self.assertIn('access_token', response_dict)
|
self.assertIn('access_token', response_dict)
|
||||||
|
|
||||||
|
@override_settings(OIDC_GRANT_TYPE_PASSWORD_ENABLE=True)
|
||||||
def test_password_grant_get_access_token_with_scope(self):
|
def test_password_grant_get_access_token_with_scope(self):
|
||||||
response = self._post_request(
|
response = self._post_request(
|
||||||
post_data=self._password_grant_post_data(),
|
post_data=self._password_grant_post_data(),
|
||||||
|
@ -185,6 +200,7 @@ class TokenTestCase(TestCase):
|
||||||
response_dict = json.loads(response.content.decode('utf-8'))
|
response_dict = json.loads(response.content.decode('utf-8'))
|
||||||
self.assertIn('access_token', response_dict)
|
self.assertIn('access_token', response_dict)
|
||||||
|
|
||||||
|
@override_settings(OIDC_GRANT_TYPE_PASSWORD_ENABLE=True)
|
||||||
def test_password_grant_get_access_token_invalid_user_credentials(self):
|
def test_password_grant_get_access_token_invalid_user_credentials(self):
|
||||||
invalid_post = self._password_grant_post_data()
|
invalid_post = self._password_grant_post_data()
|
||||||
invalid_post['password'] = 'wrong!'
|
invalid_post['password'] = 'wrong!'
|
||||||
|
@ -194,7 +210,11 @@ class TokenTestCase(TestCase):
|
||||||
extras=self._auth_header()
|
extras=self._auth_header()
|
||||||
)
|
)
|
||||||
|
|
||||||
|
response_dict = json.loads(response.content.decode('utf-8'))
|
||||||
|
print(response_dict)
|
||||||
|
|
||||||
self.assertEqual(400, response.status_code)
|
self.assertEqual(400, response.status_code)
|
||||||
|
self.assertEqual('access_denied', response_dict['error'])
|
||||||
|
|
||||||
def test_password_grant_get_access_token_invalid_client_credentials(self):
|
def test_password_grant_get_access_token_invalid_client_credentials(self):
|
||||||
self.client.client_id = 'foo'
|
self.client.client_id = 'foo'
|
||||||
|
@ -205,10 +225,14 @@ class TokenTestCase(TestCase):
|
||||||
extras=self._auth_header()
|
extras=self._auth_header()
|
||||||
)
|
)
|
||||||
|
|
||||||
|
response_dict = json.loads(response.content.decode('utf-8'))
|
||||||
|
|
||||||
self.assertEqual(400, response.status_code)
|
self.assertEqual(400, response.status_code)
|
||||||
|
self.assertEqual('invalid_client', response_dict['error'])
|
||||||
|
|
||||||
@patch('oidc_provider.lib.utils.token.uuid')
|
@patch('oidc_provider.lib.utils.token.uuid')
|
||||||
@override_settings(OIDC_TOKEN_EXPIRE=120)
|
@override_settings(OIDC_TOKEN_EXPIRE=120,
|
||||||
|
OIDC_GRANT_TYPE_PASSWORD_ENABLE=True)
|
||||||
def test_password_grant_full_response(self, mock_uuid):
|
def test_password_grant_full_response(self, mock_uuid):
|
||||||
test_hex = 'fake_token'
|
test_hex = 'fake_token'
|
||||||
mock_uuid4 = Mock(spec=uuid.uuid4)
|
mock_uuid4 = Mock(spec=uuid.uuid4)
|
||||||
|
|
|
@ -28,7 +28,7 @@ from oidc_provider.lib.errors import (
|
||||||
ClientIdError,
|
ClientIdError,
|
||||||
RedirectUriError,
|
RedirectUriError,
|
||||||
TokenError,
|
TokenError,
|
||||||
)
|
UserAuthError)
|
||||||
from oidc_provider.lib.utils.common import (
|
from oidc_provider.lib.utils.common import (
|
||||||
redirect,
|
redirect,
|
||||||
get_site_url,
|
get_site_url,
|
||||||
|
@ -167,7 +167,7 @@ class TokenView(View):
|
||||||
|
|
||||||
return TokenEndpoint.response(dic)
|
return TokenEndpoint.response(dic)
|
||||||
|
|
||||||
except (TokenError) as error:
|
except (TokenError, UserAuthError) as error:
|
||||||
return TokenEndpoint.response(error.create_dict(), status=400)
|
return TokenEndpoint.response(error.create_dict(), status=400)
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue