Client credentials implementation.

This commit is contained in:
Juan Ignacio Fiorentino 2018-04-08 17:43:24 -03:00
parent ff3d6ebe1a
commit dbed87aa78
5 changed files with 142 additions and 73 deletions

View file

@ -56,7 +56,7 @@ class ClientAdmin(admin.ModelAdmin):
'require_consent', 'reuse_consent'),
}],
[_(u'Credentials'), {
'fields': ('client_id', 'client_secret'),
'fields': ('client_id', 'client_secret', '_scope'),
}],
[_(u'Information'), {
'fields': ('contact_email', 'website_url', 'terms_url', 'logo', 'date_created'),

View file

@ -141,7 +141,10 @@ class TokenEndpoint(object):
logger.debug(
'[Token] Refresh token does not exist: %s', self.params['refresh_token'])
raise TokenError('invalid_grant')
elif self.params['grant_type'] == 'client_credentials':
if not self.client._scope:
logger.debug('[Token] Client using client credentials with empty scope')
raise TokenError('invalid_scope')
else:
logger.debug('[Token] Invalid grant type: %s', self.params['grant_type'])
raise TokenError('unsupported_grant_type')
@ -153,34 +156,8 @@ class TokenEndpoint(object):
return self.create_refresh_response_dic()
elif self.params['grant_type'] == 'password':
return self.create_access_token_response_dic()
def create_access_token_response_dic(self):
# See https://tools.ietf.org/html/rfc6749#section-4.3
token = create_token(
self.user,
self.client,
self.params['scope'].split(' '))
id_token_dic = create_id_token(
user=self.user,
aud=self.client.client_id,
nonce='self.code.nonce',
at_hash=token.at_hash,
request=self.request,
scope=token.scope,
)
token.id_token = id_token_dic
token.save()
return {
'access_token': token.access_token,
'refresh_token': token.refresh_token,
'expires_in': settings.get('OIDC_TOKEN_EXPIRE'),
'token_type': 'bearer',
'id_token': encode_id_token(id_token_dic, token.client),
}
elif self.params['grant_type'] == 'client_credentials':
return self.create_client_credentials_response_dic()
def create_code_response_dic(self):
# See https://tools.ietf.org/html/rfc6749#section-4.1
@ -263,6 +240,51 @@ class TokenEndpoint(object):
return dic
def create_access_token_response_dic(self):
# See https://tools.ietf.org/html/rfc6749#section-4.3
token = create_token(
self.user,
self.client,
self.params['scope'].split(' '))
id_token_dic = create_id_token(
user=self.user,
aud=self.client.client_id,
nonce='self.code.nonce',
at_hash=token.at_hash,
request=self.request,
scope=token.scope,
)
token.id_token = id_token_dic
token.save()
return {
'access_token': token.access_token,
'refresh_token': token.refresh_token,
'expires_in': settings.get('OIDC_TOKEN_EXPIRE'),
'token_type': 'bearer',
'id_token': encode_id_token(id_token_dic, token.client),
}
def create_client_credentials_response_dic(self):
# See https://tools.ietf.org/html/rfc6749#section-4.4.3
token = create_token(
user=None,
client=self.client,
scope=self.client.scope)
token.save()
return {
'access_token': token.access_token,
'expires_in': settings.get('OIDC_TOKEN_EXPIRE'),
'token_type': 'bearer',
'scope': self.client._scope,
}
@classmethod
def response(cls, dic, status=200):
"""

View file

@ -0,0 +1,25 @@
# Generated by Django 2.0.3 on 2018-04-07 21:51
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('oidc_provider', '0024_auto_20180327_1959'),
]
operations = [
migrations.AddField(
model_name='client',
name='_scope',
field=models.TextField(blank=True, default='', help_text='Specifies the authorized scope values for the client app.', verbose_name='Scopes'),
),
migrations.AlterField(
model_name='token',
name='user',
field=models.ForeignKey(null=True, on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL, verbose_name='User'),
),
]

View file

@ -75,32 +75,19 @@ class Client(models.Model):
default=True,
verbose_name=_('Require Consent?'),
help_text=_('If disabled, the Server will NEVER ask the user for consent.'))
_redirect_uris = models.TextField(
default='', verbose_name=_(u'Redirect URIs'),
help_text=_(u'Enter each URI on a new line.'))
@property
def redirect_uris(self):
return self._redirect_uris.splitlines()
@redirect_uris.setter
def redirect_uris(self, value):
self._redirect_uris = '\n'.join(value)
_post_logout_redirect_uris = models.TextField(
blank=True,
default='',
verbose_name=_(u'Post Logout Redirect URIs'),
help_text=_(u'Enter each URI on a new line.'))
@property
def post_logout_redirect_uris(self):
return self._post_logout_redirect_uris.splitlines()
@post_logout_redirect_uris.setter
def post_logout_redirect_uris(self, value):
self._post_logout_redirect_uris = '\n'.join(value)
_scope = models.TextField(
blank=True,
default='',
verbose_name=_(u'Scopes'),
help_text=_('Specifies the authorized scope values for the client app.'))
class Meta:
verbose_name = _(u'Client')
@ -112,6 +99,30 @@ class Client(models.Model):
def __unicode__(self):
return self.__str__()
@property
def redirect_uris(self):
return self._redirect_uris.splitlines()
@redirect_uris.setter
def redirect_uris(self, value):
self._redirect_uris = '\n'.join(value)
@property
def post_logout_redirect_uris(self):
return self._post_logout_redirect_uris.splitlines()
@post_logout_redirect_uris.setter
def post_logout_redirect_uris(self, value):
self._post_logout_redirect_uris = '\n'.join(value)
@property
def scope(self):
return self._scope.split()
@scope.setter
def scope(self, value):
self._scope = ' '.join(value)
@property
def default_redirect_uri(self):
return self.redirect_uris[0] if self.redirect_uris else ''
@ -125,6 +136,9 @@ class BaseCodeTokenModel(models.Model):
expires_at = models.DateTimeField(verbose_name=_(u'Expiration Date'))
_scope = models.TextField(default='', verbose_name=_(u'Scopes'))
class Meta:
abstract = True
@property
def scope(self):
return self._scope.split()
@ -142,9 +156,6 @@ class BaseCodeTokenModel(models.Model):
def __unicode__(self):
return self.__str__()
class Meta:
abstract = True
class Code(BaseCodeTokenModel):
@ -162,6 +173,8 @@ class Code(BaseCodeTokenModel):
class Token(BaseCodeTokenModel):
user = models.ForeignKey(
settings.AUTH_USER_MODEL, null=True, verbose_name=_(u'User'), on_delete=models.CASCADE)
access_token = models.CharField(max_length=255, unique=True, verbose_name=_(u'Access Token'))
refresh_token = models.CharField(max_length=255, unique=True, verbose_name=_(u'Refresh Token'))
_id_token = models.TextField(verbose_name=_(u'ID Token'))

View file

@ -152,28 +152,6 @@ class TokenTestCase(TestCase):
auth_header = {'HTTP_AUTHORIZATION': auth.decode('utf-8')}
return auth_header
# Resource Owner Password Credentials Grant
# requirements to satisfy in all test_password_grant methods
# https://tools.ietf.org/html/rfc6749#section-4.3.2
#
# grant_type
# REQUIRED. Value MUST be set to "password".
# username
# REQUIRED. The resource owner username.
# password
# REQUIRED. The resource owner password.
# scope
# OPTIONAL. The scope of the access request as described by
# Section 3.3.
#
# The authorization server MUST:
# o require client authentication for confidential clients or for any
# client that was issued client credentials (or with other
# authentication requirements),
# o authenticate the client if client authentication is included, and
# o validate the resource owner password credentials using its
# existing password validation algorithm.
def test_default_setting_does_not_allow_grant_type_password(self):
post_data = self._password_grant_post_data()
@ -744,3 +722,34 @@ class TokenTestCase(TestCase):
response = self._post_request(post_data)
json.loads(response.content.decode('utf-8'))
def test_client_credentials_grant_type(self):
fake_scopes_list = ['scopeone', 'scopetwo']
# Add scope for this client.
self.client.scope = fake_scopes_list
self.client.save()
post_data = {
'client_id': self.client.client_id,
'client_secret': self.client.client_secret,
'grant_type': 'client_credentials',
}
response = self._post_request(post_data)
response_dict = json.loads(response.content.decode('utf-8'))
# Ensure access token exists in the response, also check if scopes are
# the ones we registered previously.
self.assertTrue('access_token' in response_dict)
self.assertEqual(' '.join(fake_scopes_list), response_dict['scope'])
# Clean scopes for this client.
self.client.scope = ''
self.client.save()
response = self._post_request(post_data)
response_dict = json.loads(response.content.decode('utf-8'))
# It should fail when client does not have any scope added.
self.assertEqual(400, response.status_code)
self.assertEqual('invalid_scope', response_dict['error'])