9d5c2b39fa
The response can simply omit fields that require id_token as the only required field is "active" according to RFC 7662. I think it would be nice for introspection of client credentials tokens to be supported by default without needing to override OIDC_INTROSPECTION_VALIDATE_AUDIENCE_SCOPE, but this is a start.
105 lines
4 KiB
Python
105 lines
4 KiB
Python
import logging
|
|
|
|
from django.http import JsonResponse
|
|
|
|
from oidc_provider.lib.errors import TokenIntrospectionError
|
|
from oidc_provider.lib.utils.common import run_processing_hook
|
|
from oidc_provider.lib.utils.oauth2 import extract_client_auth
|
|
from oidc_provider.models import Token, Client
|
|
from oidc_provider import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
INTROSPECTION_SCOPE = 'token_introspection'
|
|
|
|
|
|
class TokenIntrospectionEndpoint(object):
|
|
|
|
def __init__(self, request):
|
|
self.request = request
|
|
self.params = {}
|
|
self.token = None
|
|
self.id_token = None
|
|
self.client = None
|
|
self._extract_params()
|
|
|
|
def _extract_params(self):
|
|
# Introspection only supports POST requests
|
|
self.params['token'] = self.request.POST.get('token')
|
|
client_id, client_secret = extract_client_auth(self.request)
|
|
self.params['client_id'] = client_id
|
|
self.params['client_secret'] = client_secret
|
|
|
|
def validate_params(self):
|
|
if not (self.params['client_id'] and self.params['client_secret']):
|
|
logger.debug('[Introspection] No client credentials provided')
|
|
raise TokenIntrospectionError()
|
|
if not self.params['token']:
|
|
logger.debug('[Introspection] No token provided')
|
|
raise TokenIntrospectionError()
|
|
try:
|
|
self.token = Token.objects.get(access_token=self.params['token'])
|
|
except Token.DoesNotExist:
|
|
logger.debug('[Introspection] Token does not exist: %s', self.params['token'])
|
|
raise TokenIntrospectionError()
|
|
if self.token.has_expired():
|
|
logger.debug('[Introspection] Token is not valid: %s', self.params['token'])
|
|
raise TokenIntrospectionError()
|
|
|
|
try:
|
|
self.client = Client.objects.get(
|
|
client_id=self.params['client_id'],
|
|
client_secret=self.params['client_secret'])
|
|
except Client.DoesNotExist:
|
|
logger.debug('[Introspection] No valid client for id: %s',
|
|
self.params['client_id'])
|
|
raise TokenIntrospectionError()
|
|
if INTROSPECTION_SCOPE not in self.client.scope:
|
|
logger.debug('[Introspection] Client %s does not have introspection scope',
|
|
self.params['client_id'])
|
|
raise TokenIntrospectionError()
|
|
|
|
self.id_token = self.token.id_token
|
|
|
|
if settings.get('OIDC_INTROSPECTION_VALIDATE_AUDIENCE_SCOPE'):
|
|
if not self.token.id_token:
|
|
logger.debug('[Introspection] Token not an authentication token: %s',
|
|
self.params['token'])
|
|
raise TokenIntrospectionError()
|
|
|
|
audience = self.token.id_token.get('aud')
|
|
if not audience:
|
|
logger.debug('[Introspection] No audience found for token: %s',
|
|
self.params['token'])
|
|
raise TokenIntrospectionError()
|
|
|
|
if audience not in self.client.scope:
|
|
logger.debug('[Introspection] Client %s does not audience scope %s',
|
|
self.params['client_id'], audience)
|
|
raise TokenIntrospectionError()
|
|
|
|
def create_response_dic(self):
|
|
response_dic = {}
|
|
if self.id_token:
|
|
for k in ('aud', 'sub', 'exp', 'iat', 'iss'):
|
|
response_dic[k] = self.id_token[k]
|
|
response_dic['active'] = True
|
|
response_dic['client_id'] = self.token.client.client_id
|
|
|
|
response_dic = run_processing_hook(response_dic,
|
|
'OIDC_INTROSPECTION_PROCESSING_HOOK',
|
|
client=self.client,
|
|
id_token=self.id_token)
|
|
|
|
return response_dic
|
|
|
|
@classmethod
|
|
def response(cls, dic, status=200):
|
|
"""
|
|
Create and return a response object.
|
|
"""
|
|
response = JsonResponse(dic, status=status)
|
|
response['Cache-Control'] = 'no-store'
|
|
response['Pragma'] = 'no-cache'
|
|
|
|
return response
|