b1e994aa7e
Based on the OAuth 2.0 Token Introspection spec the "aud" field should be based on the token. Previously "aud" was populated with the id of the client making the introspection request which seems wrong. This changes the endpoint to return the value from the token. The "client_id" field is then changed to return the client id for the client that originally requested the token rather than returning the "aud" value from the token. From the spec https://tools.ietf.org/html/rfc7662: client_id OPTIONAL. Client identifier for the OAuth 2.0 client that requested this token. aud OPTIONAL. Service-specific string identifier or list of string identifiers representing the intended audience for this token, as defined in JWT [RFC7519].
98 lines
3.9 KiB
Python
98 lines
3.9 KiB
Python
import logging
|
|
|
|
from django.http import JsonResponse
|
|
|
|
from oidc_provider.lib.errors import TokenIntrospectionError
|
|
from oidc_provider.lib.utils.common import run_processing_hook
|
|
from oidc_provider.lib.utils.oauth2 import extract_client_auth
|
|
from oidc_provider.models import Token, Client
|
|
from oidc_provider import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
INTROSPECTION_SCOPE = 'token_introspection'
|
|
|
|
|
|
class TokenIntrospectionEndpoint(object):
|
|
|
|
def __init__(self, request):
|
|
self.request = request
|
|
self.params = {}
|
|
self.token = None
|
|
self.id_token = None
|
|
self.client = None
|
|
self._extract_params()
|
|
|
|
def _extract_params(self):
|
|
# Introspection only supports POST requests
|
|
self.params['token'] = self.request.POST.get('token')
|
|
client_id, client_secret = extract_client_auth(self.request)
|
|
self.params['client_id'] = client_id
|
|
self.params['client_secret'] = client_secret
|
|
|
|
def validate_params(self):
|
|
if not (self.params['client_id'] and self.params['client_secret']):
|
|
logger.debug('[Introspection] No client credentials provided')
|
|
raise TokenIntrospectionError()
|
|
if not self.params['token']:
|
|
logger.debug('[Introspection] No token provided')
|
|
raise TokenIntrospectionError()
|
|
try:
|
|
self.token = Token.objects.get(access_token=self.params['token'])
|
|
except Token.DoesNotExist:
|
|
logger.debug('[Introspection] Token does not exist: %s', self.params['token'])
|
|
raise TokenIntrospectionError()
|
|
if self.token.has_expired():
|
|
logger.debug('[Introspection] Token is not valid: %s', self.params['token'])
|
|
raise TokenIntrospectionError()
|
|
if not self.token.id_token:
|
|
logger.debug('[Introspection] Token not an authentication token: %s',
|
|
self.params['token'])
|
|
raise TokenIntrospectionError()
|
|
|
|
self.id_token = self.token.id_token
|
|
audience = self.id_token.get('aud')
|
|
if not audience:
|
|
logger.debug('[Introspection] No audience found for token: %s', self.params['token'])
|
|
raise TokenIntrospectionError()
|
|
|
|
try:
|
|
self.client = Client.objects.get(
|
|
client_id=self.params['client_id'],
|
|
client_secret=self.params['client_secret'])
|
|
except Client.DoesNotExist:
|
|
logger.debug('[Introspection] No valid client for id: %s',
|
|
self.params['client_id'])
|
|
raise TokenIntrospectionError()
|
|
if INTROSPECTION_SCOPE not in self.client.scope:
|
|
logger.debug('[Introspection] Client %s does not have introspection scope',
|
|
self.params['client_id'])
|
|
raise TokenIntrospectionError()
|
|
if settings.get('OIDC_INTROSPECTION_VALIDATE_AUDIENCE_SCOPE') \
|
|
and audience not in self.client.scope:
|
|
logger.debug('[Introspection] Client %s does not audience scope %s',
|
|
self.params['client_id'], audience)
|
|
raise TokenIntrospectionError()
|
|
|
|
def create_response_dic(self):
|
|
response_dic = dict((k, self.id_token[k]) for k in ('aud', 'sub', 'exp', 'iat', 'iss'))
|
|
response_dic['active'] = True
|
|
response_dic['client_id'] = self.token.client.client_id
|
|
|
|
response_dic = run_processing_hook(response_dic,
|
|
'OIDC_INTROSPECTION_PROCESSING_HOOK',
|
|
client=self.client,
|
|
id_token=self.id_token)
|
|
|
|
return response_dic
|
|
|
|
@classmethod
|
|
def response(cls, dic, status=200):
|
|
"""
|
|
Create and return a response object.
|
|
"""
|
|
response = JsonResponse(dic, status=status)
|
|
response['Cache-Control'] = 'no-store'
|
|
response['Pragma'] = 'no-cache'
|
|
|
|
return response
|