Merge pull request #256 from q3aiml/client-credentials-introspection

support introspection on client credentials tokens
This commit is contained in:
Juan Ignacio Fiorentino 2018-07-19 11:18:35 -03:00 committed by GitHub
commit 59581f50d9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 61 additions and 19 deletions

View file

@ -59,3 +59,5 @@ A successful access token response will like this::
"expires_in" : 3600,
"scope" : "read_books add_books"
}
Token introspection can be used to validate access tokens requested with client credentials if the ``OIDC_INTROSPECTION_VALIDATE_AUDIENCE_SCOPE`` setting is ``False``.

View file

@ -135,6 +135,8 @@ Here you can add extra dictionary values specific to your valid response value f
The function receives an ``introspection_response`` dictionary, a ``client`` instance and an ``id_token`` dictionary.
If the token is generated by client_credentials grant then ``id_token`` is ``None``.
Default is::
def default_introspection_processing_hook(introspection_response, client, id_token):
@ -149,6 +151,8 @@ OPTIONAL ``bool``
A flag which toggles whether the audience is matched against the client resource scope when calling the introspection endpoint.
Must be ``False`` to support introspecting client_crendentials tokens.
Default is ``True``.
OIDC_SESSION_MANAGEMENT_ENABLE

View file

@ -45,16 +45,6 @@ class TokenIntrospectionEndpoint(object):
if self.token.has_expired():
logger.debug('[Introspection] Token is not valid: %s', self.params['token'])
raise TokenIntrospectionError()
if not self.token.id_token:
logger.debug('[Introspection] Token not an authentication token: %s',
self.params['token'])
raise TokenIntrospectionError()
self.id_token = self.token.id_token
audience = self.id_token.get('aud')
if not audience:
logger.debug('[Introspection] No audience found for token: %s', self.params['token'])
raise TokenIntrospectionError()
try:
self.client = Client.objects.get(
@ -68,14 +58,31 @@ class TokenIntrospectionEndpoint(object):
logger.debug('[Introspection] Client %s does not have introspection scope',
self.params['client_id'])
raise TokenIntrospectionError()
if settings.get('OIDC_INTROSPECTION_VALIDATE_AUDIENCE_SCOPE') \
and audience not in self.client.scope:
logger.debug('[Introspection] Client %s does not audience scope %s',
self.params['client_id'], audience)
raise TokenIntrospectionError()
self.id_token = self.token.id_token
if settings.get('OIDC_INTROSPECTION_VALIDATE_AUDIENCE_SCOPE'):
if not self.token.id_token:
logger.debug('[Introspection] Token not an authentication token: %s',
self.params['token'])
raise TokenIntrospectionError()
audience = self.token.id_token.get('aud')
if not audience:
logger.debug('[Introspection] No audience found for token: %s',
self.params['token'])
raise TokenIntrospectionError()
if audience not in self.client.scope:
logger.debug('[Introspection] Client %s does not audience scope %s',
self.params['client_id'], audience)
raise TokenIntrospectionError()
def create_response_dic(self):
response_dic = dict((k, self.id_token[k]) for k in ('aud', 'sub', 'exp', 'iat', 'iss'))
response_dic = {}
if self.id_token:
for k in ('aud', 'sub', 'exp', 'iat', 'iss'):
response_dic[k] = self.id_token[k]
response_dic['active'] = True
response_dic['client_id'] = self.token.client.client_id

View file

@ -185,7 +185,7 @@ class Token(BaseCodeTokenModel):
@property
def id_token(self):
return json.loads(self._id_token)
return json.loads(self._id_token) if self._id_token else None
@id_token.setter
def id_token(self, value):

View file

@ -117,3 +117,16 @@ class IntrospectionTestCase(TestCase):
self.resource.save()
response = self._make_request()
self._assert_active(response)
@override_settings(OIDC_INTROSPECTION_VALIDATE_AUDIENCE_SCOPE=False)
def test_valid_client_grant_token_without_aud_validation(self):
self.token.id_token = None # client_credentials tokens do not have id_token
self.token.save()
self.resource.scope = ['token_introspection']
self.resource.save()
response = self._make_request()
self.assertEqual(response.status_code, 200)
self.assertJSONEqual(force_text(response.content), {
'active': True,
'client_id': self.client.client_id,
})

View file

@ -26,6 +26,7 @@ from jwkest.jws import JWS
from jwkest.jwt import JWT
from mock import patch
from oidc_provider.lib.endpoints.introspection import INTROSPECTION_SCOPE
from oidc_provider.lib.utils.oauth2 import protected_resource_view
from oidc_provider.lib.utils.token import create_code
from oidc_provider.models import Token
@ -55,6 +56,7 @@ class TokenTestCase(TestCase):
call_command('creatersakey')
self.factory = RequestFactory()
self.user = create_fake_user()
self.request_client = self.client
self.client = create_fake_client(response_type='code')
def _password_grant_post_data(self, scope=None):
@ -787,8 +789,9 @@ class TokenTestCase(TestCase):
json.loads(response.content.decode('utf-8'))
@override_settings(OIDC_INTROSPECTION_VALIDATE_AUDIENCE_SCOPE=False)
def test_client_credentials_grant_type(self):
fake_scopes_list = ['scopeone', 'scopetwo']
fake_scopes_list = ['scopeone', 'scopetwo', INTROSPECTION_SCOPE]
# Add scope for this client.
self.client.scope = fake_scopes_list
@ -807,6 +810,8 @@ class TokenTestCase(TestCase):
self.assertTrue('access_token' in response_dict)
self.assertEqual(' '.join(fake_scopes_list), response_dict['scope'])
access_token = response_dict['access_token']
# Create a protected resource and test the access_token.
@require_http_methods(['GET'])
@ -816,7 +821,7 @@ class TokenTestCase(TestCase):
# Deploy view on some url. So, base url could be anything.
request = self.factory.get(
'/api/protected/?access_token={0}'.format(response_dict['access_token']))
'/api/protected/?access_token={0}'.format(access_token))
response = protected_api(request)
response_dict = json.loads(response.content.decode('utf-8'))
@ -825,6 +830,17 @@ class TokenTestCase(TestCase):
# Protected resource test ends here.
# Verify access_token can be validated with token introspection
response = self.request_client.post(
reverse('oidc_provider:token-introspection'), data={'token': access_token},
**self._password_grant_auth_header())
self.assertEqual(response.status_code, 200)
response_dict = json.loads(response.content.decode('utf-8'))
self.assertTrue(response_dict.get('active'))
# End token introspection test
# Clean scopes for this client.
self.client.scope = ''
self.client.save()