Remove the Resource model
This commit is contained in:
parent
00f3efa158
commit
8eeaf5cf33
9 changed files with 74 additions and 240 deletions
|
@ -6,9 +6,7 @@ from django.forms import ModelForm
|
||||||
from django.contrib import admin
|
from django.contrib import admin
|
||||||
from django.utils.translation import ugettext_lazy as _
|
from django.utils.translation import ugettext_lazy as _
|
||||||
|
|
||||||
from oidc_provider.models import Client, Code, Token, RSAKey, get_resource_model
|
from oidc_provider.models import Client, Code, Token, RSAKey
|
||||||
|
|
||||||
Resource = get_resource_model()
|
|
||||||
|
|
||||||
|
|
||||||
class ClientForm(ModelForm):
|
class ClientForm(ModelForm):
|
||||||
|
@ -74,43 +72,6 @@ class ClientAdmin(admin.ModelAdmin):
|
||||||
raw_id_fields = ['owner']
|
raw_id_fields = ['owner']
|
||||||
|
|
||||||
|
|
||||||
class ResourceForm(ModelForm):
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
super(ResourceForm, self).__init__(*args, **kwargs)
|
|
||||||
self.fields['resource_secret'].required = False
|
|
||||||
|
|
||||||
def clean_resource_secret(self):
|
|
||||||
if self.cleaned_data['resource_secret']:
|
|
||||||
secret = self.cleaned_data['resource_secret']
|
|
||||||
else:
|
|
||||||
secret = sha224(uuid4().hex.encode()).hexdigest()
|
|
||||||
return secret
|
|
||||||
|
|
||||||
class Meta:
|
|
||||||
model = Resource
|
|
||||||
exclude = []
|
|
||||||
|
|
||||||
|
|
||||||
@admin.register(Resource)
|
|
||||||
class ResourceAdmin(admin.ModelAdmin):
|
|
||||||
fieldsets = [
|
|
||||||
[None, {
|
|
||||||
'fields': ('name', 'owner', 'active',),
|
|
||||||
}],
|
|
||||||
[_('Credentials'), {
|
|
||||||
'fields': ('resource_id', 'resource_secret',),
|
|
||||||
}],
|
|
||||||
[_('Permissions'), {
|
|
||||||
'fields': ('allowed_clients',),
|
|
||||||
}],
|
|
||||||
]
|
|
||||||
form = ResourceForm
|
|
||||||
list_display = ['name', 'resource_id', 'date_created']
|
|
||||||
readonly_fields = ['date_created']
|
|
||||||
search_fields = ['name']
|
|
||||||
raw_id_fields = ['owner']
|
|
||||||
|
|
||||||
|
|
||||||
@admin.register(Code)
|
@admin.register(Code)
|
||||||
class CodeAdmin(admin.ModelAdmin):
|
class CodeAdmin(admin.ModelAdmin):
|
||||||
|
|
||||||
|
|
|
@ -3,32 +3,35 @@ import logging
|
||||||
from django.http import JsonResponse
|
from django.http import JsonResponse
|
||||||
|
|
||||||
from oidc_provider.lib.errors import TokenIntrospectionError
|
from oidc_provider.lib.errors import TokenIntrospectionError
|
||||||
from oidc_provider.lib.utils.common import get_basic_client_credentials, run_processing_hook
|
from oidc_provider.lib.utils.common import run_processing_hook
|
||||||
from oidc_provider.models import Token, get_resource_model
|
from oidc_provider.lib.utils.oauth2 import extract_client_auth
|
||||||
|
from oidc_provider.models import Token, Client
|
||||||
|
from oidc_provider import settings
|
||||||
Resource = get_resource_model()
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
INTROSPECTION_SCOPE = 'token_introspection'
|
||||||
|
|
||||||
|
|
||||||
class TokenIntrospectionEndpoint(object):
|
class TokenIntrospectionEndpoint(object):
|
||||||
|
|
||||||
def __init__(self, request):
|
def __init__(self, request):
|
||||||
self.request = request
|
self.request = request
|
||||||
self.params = {}
|
self.params = {}
|
||||||
|
self.id_token = None
|
||||||
|
self.client = None
|
||||||
self._extract_params()
|
self._extract_params()
|
||||||
|
|
||||||
def _extract_params(self):
|
def _extract_params(self):
|
||||||
# Introspection only supports POST requests
|
# Introspection only supports POST requests
|
||||||
self.params['token'] = self.request.POST.get('token')
|
self.params['token'] = self.request.POST.get('token')
|
||||||
resource_id, resource_secret = get_basic_client_credentials(self.request)
|
client_id, client_secret = extract_client_auth(self.request)
|
||||||
self.params['resource_id'] = resource_id
|
self.params['client_id'] = client_id
|
||||||
self.params['resource_secret'] = resource_secret
|
self.params['client_secret'] = client_secret
|
||||||
|
|
||||||
def validate_params(self):
|
def validate_params(self):
|
||||||
if not (self.params['resource_id'] and self.params['resource_secret']):
|
if not (self.params['client_id'] and self.params['client_secret']):
|
||||||
logger.debug('[Introspection] No resource credentials provided')
|
logger.debug('[Introspection] No client credentials provided')
|
||||||
raise TokenIntrospectionError()
|
raise TokenIntrospectionError()
|
||||||
if not self.params['token']:
|
if not self.params['token']:
|
||||||
logger.debug('[Introspection] No token provided')
|
logger.debug('[Introspection] No token provided')
|
||||||
|
@ -42,7 +45,8 @@ class TokenIntrospectionEndpoint(object):
|
||||||
logger.debug('[Introspection] Token is not valid: %s', self.params['token'])
|
logger.debug('[Introspection] Token is not valid: %s', self.params['token'])
|
||||||
raise TokenIntrospectionError()
|
raise TokenIntrospectionError()
|
||||||
if not token.id_token:
|
if not token.id_token:
|
||||||
logger.debug('[Introspection] Token not an authentication token: %s', self.params['token'])
|
logger.debug('[Introspection] Token not an authentication token: %s',
|
||||||
|
self.params['token'])
|
||||||
raise TokenIntrospectionError()
|
raise TokenIntrospectionError()
|
||||||
|
|
||||||
self.id_token = token.id_token
|
self.id_token = token.id_token
|
||||||
|
@ -52,24 +56,32 @@ class TokenIntrospectionEndpoint(object):
|
||||||
raise TokenIntrospectionError()
|
raise TokenIntrospectionError()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.resource = Resource.objects.get(
|
self.client = Client.objects.get(
|
||||||
resource_id=self.params['resource_id'],
|
client_id=self.params['client_id'],
|
||||||
resource_secret=self.params['resource_secret'],
|
client_secret=self.params['client_secret'])
|
||||||
active=True,
|
except Client.DoesNotExist:
|
||||||
allowed_clients__client_id__contains=audience)
|
logger.debug('[Introspection] No valid client for id: %s',
|
||||||
except Resource.DoesNotExist:
|
self.params['client_id'])
|
||||||
logger.debug('[Introspection] No valid resource id and audience: %s, %s',
|
raise TokenIntrospectionError()
|
||||||
self.params['resource_id'], audience)
|
if INTROSPECTION_SCOPE not in self.client.scope:
|
||||||
|
logger.debug('[Introspection] Client %s does not have introspection scope',
|
||||||
|
self.params['client_id'])
|
||||||
|
raise TokenIntrospectionError()
|
||||||
|
if settings.get('OIDC_INTROSPECTION_VALIDATE_AUDIENCE_SCOPE') \
|
||||||
|
and audience not in self.client.scope:
|
||||||
|
logger.debug('[Introspection] Client %s does not audience scope %s',
|
||||||
|
self.params['client_id'], audience)
|
||||||
raise TokenIntrospectionError()
|
raise TokenIntrospectionError()
|
||||||
|
|
||||||
def create_response_dic(self):
|
def create_response_dic(self):
|
||||||
response_dic = dict((k, self.id_token[k]) for k in ('sub', 'exp', 'iat', 'iss'))
|
response_dic = dict((k, self.id_token[k]) for k in ('sub', 'exp', 'iat', 'iss'))
|
||||||
response_dic['active'] = True
|
response_dic['active'] = True
|
||||||
response_dic['client_id'] = self.id_token.get('aud')
|
response_dic['client_id'] = self.id_token.get('aud')
|
||||||
response_dic['aud'] = self.resource.resource_id
|
response_dic['aud'] = self.client.client_id
|
||||||
|
|
||||||
response_dic = run_processing_hook(response_dic, 'OIDC_INTROSPECTION_PROCESSING_HOOK',
|
response_dic = run_processing_hook(response_dic,
|
||||||
resource=self.resource,
|
'OIDC_INTROSPECTION_PROCESSING_HOOK',
|
||||||
|
client=self.client,
|
||||||
id_token=self.id_token)
|
id_token=self.id_token)
|
||||||
|
|
||||||
return response_dic
|
return response_dic
|
||||||
|
|
|
@ -1,11 +1,7 @@
|
||||||
from base64 import b64decode
|
|
||||||
from hashlib import sha224
|
from hashlib import sha224
|
||||||
from django.http import HttpResponse
|
|
||||||
from oidc_provider import settings
|
|
||||||
|
|
||||||
import django
|
import django
|
||||||
from django.http import HttpResponse
|
from django.http import HttpResponse
|
||||||
import re
|
|
||||||
|
|
||||||
from oidc_provider import settings
|
from oidc_provider import settings
|
||||||
|
|
||||||
|
@ -16,9 +12,6 @@ else:
|
||||||
from django.core.urlresolvers import reverse
|
from django.core.urlresolvers import reverse
|
||||||
|
|
||||||
|
|
||||||
basic_re = re.compile('^Basic\s(.+)$', re.I)
|
|
||||||
|
|
||||||
|
|
||||||
def redirect(uri):
|
def redirect(uri):
|
||||||
"""
|
"""
|
||||||
Custom Response object for redirecting to a Non-HTTP url scheme.
|
Custom Response object for redirecting to a Non-HTTP url scheme.
|
||||||
|
@ -130,11 +123,11 @@ def default_idtoken_processing_hook(id_token, user):
|
||||||
return id_token
|
return id_token
|
||||||
|
|
||||||
|
|
||||||
def default_introspection_processing_hook(introspection_response, resource, id_token):
|
def default_introspection_processing_hook(introspection_response, client, id_token):
|
||||||
"""
|
"""
|
||||||
Hook to customise the returned data from the token introspection endpoint
|
Hook to customise the returned data from the token introspection endpoint
|
||||||
:param introspection_response:
|
:param introspection_response:
|
||||||
:param resource:
|
:param client:
|
||||||
:param id_token:
|
:param id_token:
|
||||||
:return:
|
:return:
|
||||||
"""
|
"""
|
||||||
|
@ -150,31 +143,6 @@ def get_browser_state_or_default(request):
|
||||||
return sha224(key.encode('utf-8')).hexdigest()
|
return sha224(key.encode('utf-8')).hexdigest()
|
||||||
|
|
||||||
|
|
||||||
def get_basic_client_credentials(request):
|
|
||||||
"""
|
|
||||||
Get client credentials using HTTP Basic Authentication method.
|
|
||||||
Or try getting parameters via POST.
|
|
||||||
See: http://tools.ietf.org/html/rfc6750#section-2.1
|
|
||||||
|
|
||||||
:param request:
|
|
||||||
:return: tuple of client_id, client_secret
|
|
||||||
:rtype: tuple
|
|
||||||
"""
|
|
||||||
auth_header = request.META.get('HTTP_AUTHORIZATION', '')
|
|
||||||
result = basic_re.match(auth_header)
|
|
||||||
if result:
|
|
||||||
b64_user_pass = result.group(1)
|
|
||||||
try:
|
|
||||||
user_pass = b64decode(b64_user_pass).decode('utf-8').split(':', 1)
|
|
||||||
client_id, client_secret = tuple(user_pass)
|
|
||||||
except (ValueError, UnicodeDecodeError):
|
|
||||||
client_id = client_secret = ''
|
|
||||||
else:
|
|
||||||
client_id = request.POST.get('client_id')
|
|
||||||
client_secret = request.POST.get('client_secret')
|
|
||||||
return client_id, client_secret
|
|
||||||
|
|
||||||
|
|
||||||
def run_processing_hook(subject, hook_settings_name, **kwargs):
|
def run_processing_hook(subject, hook_settings_name, **kwargs):
|
||||||
processing_hook = settings.get(hook_settings_name)
|
processing_hook = settings.get(hook_settings_name)
|
||||||
if isinstance(processing_hook, (list, tuple)):
|
if isinstance(processing_hook, (list, tuple)):
|
||||||
|
|
|
@ -1,35 +0,0 @@
|
||||||
# -*- coding: utf-8 -*-
|
|
||||||
# Generated by Django 1.10 on 2018-02-05 14:19
|
|
||||||
from __future__ import unicode_literals
|
|
||||||
|
|
||||||
from django.conf import settings
|
|
||||||
from django.db import migrations, models
|
|
||||||
import django.db.models.deletion
|
|
||||||
|
|
||||||
|
|
||||||
class Migration(migrations.Migration):
|
|
||||||
|
|
||||||
dependencies = [
|
|
||||||
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
|
|
||||||
('oidc_provider', '0023_client_owner'),
|
|
||||||
]
|
|
||||||
|
|
||||||
operations = [
|
|
||||||
migrations.CreateModel(
|
|
||||||
name='Resource',
|
|
||||||
fields=[
|
|
||||||
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
|
|
||||||
('name', models.CharField(default='', max_length=100, verbose_name='Name')),
|
|
||||||
('resource_id', models.CharField(max_length=255, unique=True, verbose_name='Resource ID')),
|
|
||||||
('resource_secret', models.CharField(max_length=255, verbose_name='Resource Secret')),
|
|
||||||
('date_created', models.DateField(auto_now_add=True, verbose_name='Date Created')),
|
|
||||||
('date_updated', models.DateField(auto_now=True, verbose_name='Date Updated')),
|
|
||||||
('active', models.BooleanField(default=False, verbose_name='Is Active')),
|
|
||||||
('allowed_clients', models.ManyToManyField(blank=True, help_text='Select which clients can be used to access this resource.', related_name='accessible_resources', to='oidc_provider.Client', verbose_name='Allowed Clients')),
|
|
||||||
('owner', models.ForeignKey(blank=True, default=None, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='oidc_resource_set', to=settings.AUTH_USER_MODEL, verbose_name='Owner')),
|
|
||||||
],
|
|
||||||
options={
|
|
||||||
'swappable': 'OIDC_RESOURCE_MODEL',
|
|
||||||
},
|
|
||||||
),
|
|
||||||
]
|
|
|
@ -4,7 +4,6 @@ import binascii
|
||||||
from hashlib import md5, sha256
|
from hashlib import md5, sha256
|
||||||
import json
|
import json
|
||||||
|
|
||||||
from django.apps import apps
|
|
||||||
from django.db import models
|
from django.db import models
|
||||||
from django.utils import timezone
|
from django.utils import timezone
|
||||||
from django.utils.translation import ugettext_lazy as _
|
from django.utils.translation import ugettext_lazy as _
|
||||||
|
@ -129,45 +128,6 @@ class Client(models.Model):
|
||||||
return self.redirect_uris[0] if self.redirect_uris else ''
|
return self.redirect_uris[0] if self.redirect_uris else ''
|
||||||
|
|
||||||
|
|
||||||
class AbstractResource(models.Model):
|
|
||||||
name = models.CharField(max_length=100, default='', verbose_name=_(u'Name'))
|
|
||||||
owner = models.ForeignKey(settings.AUTH_USER_MODEL,
|
|
||||||
verbose_name=_(u'Owner'),
|
|
||||||
blank=True, null=True, default=None,
|
|
||||||
on_delete=models.SET_NULL,
|
|
||||||
related_name='oidc_resource_set')
|
|
||||||
|
|
||||||
resource_id = models.CharField(max_length=255, unique=True, verbose_name=_(u'Resource ID'))
|
|
||||||
resource_secret = models.CharField(max_length=255, verbose_name=_(u'Resource Secret'))
|
|
||||||
|
|
||||||
date_created = models.DateField(auto_now_add=True, verbose_name=_(u'Date Created'))
|
|
||||||
date_updated = models.DateField(auto_now=True, verbose_name=_(u'Date Updated'))
|
|
||||||
|
|
||||||
active = models.BooleanField(default=False, verbose_name=_(u'Is Active'))
|
|
||||||
|
|
||||||
allowed_clients = models.ManyToManyField(Client,
|
|
||||||
blank=True,
|
|
||||||
verbose_name=_(u'Allowed Clients'),
|
|
||||||
related_name='accessible_resources',
|
|
||||||
help_text=_(u'Select which clients can be used to access this resource.'))
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
return u'{0}'.format(self.name)
|
|
||||||
|
|
||||||
def __unicode__(self):
|
|
||||||
return self.__str__()
|
|
||||||
|
|
||||||
class Meta:
|
|
||||||
verbose_name = _(u'Resource')
|
|
||||||
verbose_name_plural = _(u'Resources')
|
|
||||||
abstract = True
|
|
||||||
|
|
||||||
|
|
||||||
class Resource(AbstractResource):
|
|
||||||
class Meta:
|
|
||||||
swappable = 'OIDC_RESOURCE_MODEL'
|
|
||||||
|
|
||||||
|
|
||||||
class BaseCodeTokenModel(models.Model):
|
class BaseCodeTokenModel(models.Model):
|
||||||
|
|
||||||
client = models.ForeignKey(Client, verbose_name=_(u'Client'), on_delete=models.CASCADE)
|
client = models.ForeignKey(Client, verbose_name=_(u'Client'), on_delete=models.CASCADE)
|
||||||
|
@ -272,7 +232,3 @@ class RSAKey(models.Model):
|
||||||
@property
|
@property
|
||||||
def kid(self):
|
def kid(self):
|
||||||
return u'{0}'.format(md5(self.key.encode('utf-8')).hexdigest() if self.key else '')
|
return u'{0}'.format(md5(self.key.encode('utf-8')).hexdigest() if self.key else '')
|
||||||
|
|
||||||
|
|
||||||
def get_resource_model():
|
|
||||||
return apps.get_model(getattr(settings, 'OIDC_RESOURCE_MODEL', 'oidc_provider.Resource'))
|
|
||||||
|
|
|
@ -131,8 +131,20 @@ class DefaultSettings(object):
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def OIDC_INTROSPECTION_PROCESSING_HOOK(self):
|
def OIDC_INTROSPECTION_PROCESSING_HOOK(self):
|
||||||
|
"""
|
||||||
|
OPTIONAL. A string with the location of your function.
|
||||||
|
Used to update the response for a valid introspection token request.
|
||||||
|
"""
|
||||||
return 'oidc_provider.lib.utils.common.default_introspection_processing_hook'
|
return 'oidc_provider.lib.utils.common.default_introspection_processing_hook'
|
||||||
|
|
||||||
|
@property
|
||||||
|
def OIDC_INTROSPECTION_VALIDATE_AUDIENCE_SCOPE(self):
|
||||||
|
"""
|
||||||
|
OPTIONAL: A boolean to specify whether or not to verify that the introspection
|
||||||
|
resource has the requesting client id as one of its scopes.
|
||||||
|
"""
|
||||||
|
return True
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def OIDC_GRANT_TYPE_PASSWORD_ENABLE(self):
|
def OIDC_GRANT_TYPE_PASSWORD_ENABLE(self):
|
||||||
"""
|
"""
|
||||||
|
@ -156,14 +168,6 @@ class DefaultSettings(object):
|
||||||
'error': 'oidc_provider/error.html'
|
'error': 'oidc_provider/error.html'
|
||||||
}
|
}
|
||||||
|
|
||||||
@property
|
|
||||||
def OIDC_RESOURCE_MODEL(self):
|
|
||||||
"""
|
|
||||||
Model w
|
|
||||||
:return:
|
|
||||||
"""
|
|
||||||
return 'oidc_provider.Resource'
|
|
||||||
|
|
||||||
|
|
||||||
default_settings = DefaultSettings()
|
default_settings = DefaultSettings()
|
||||||
|
|
||||||
|
|
|
@ -11,9 +11,7 @@ from django.contrib.auth.models import User
|
||||||
from oidc_provider.models import (
|
from oidc_provider.models import (
|
||||||
Client,
|
Client,
|
||||||
Code,
|
Code,
|
||||||
Token, get_resource_model)
|
Token)
|
||||||
|
|
||||||
Resource = get_resource_model()
|
|
||||||
|
|
||||||
|
|
||||||
FAKE_NONCE = 'cb584e44c43ed6bd0bc2d9c7e242837d'
|
FAKE_NONCE = 'cb584e44c43ed6bd0bc2d9c7e242837d'
|
||||||
|
@ -65,22 +63,9 @@ def create_fake_client(response_type, is_public=False, require_consent=True):
|
||||||
return client
|
return client
|
||||||
|
|
||||||
|
|
||||||
def create_fake_resource(allowed_clients, active=True):
|
|
||||||
resource = Resource(name='Some API',
|
|
||||||
resource_id=str(random.randint(1, 999999)).zfill(6),
|
|
||||||
resource_secret=str(random.randint(1, 999999)).zfill(6),
|
|
||||||
active=active)
|
|
||||||
resource.name = 'Some API'
|
|
||||||
resource.save()
|
|
||||||
resource.allowed_clients.add(*allowed_clients)
|
|
||||||
resource.save()
|
|
||||||
|
|
||||||
return resource
|
|
||||||
|
|
||||||
|
|
||||||
def create_fake_token(user, scopes, client):
|
def create_fake_token(user, scopes, client):
|
||||||
expires_at = timezone.now() + timezone.timedelta(seconds=60)
|
expires_at = timezone.now() + timezone.timedelta(seconds=60)
|
||||||
token = Token(user=user, client=client, expires_at=expires_at, access_token=str(random.randint(1, 999999)).zfill(6))
|
token = Token(user=user, client=client, expires_at=expires_at)
|
||||||
token.scope = scopes
|
token.scope = scopes
|
||||||
|
|
||||||
token.save()
|
token.save()
|
||||||
|
@ -143,6 +128,6 @@ def fake_idtoken_processing_hook2(id_token, user):
|
||||||
return id_token
|
return id_token
|
||||||
|
|
||||||
|
|
||||||
def fake_introspection_processing_hook(response_dict, resource, id_token):
|
def fake_introspection_processing_hook(response_dict, client, id_token):
|
||||||
response_dict['test_introspection_processing_hook'] = FAKE_RANDOM_STRING
|
response_dict['test_introspection_processing_hook'] = FAKE_RANDOM_STRING
|
||||||
return response_dict
|
return response_dict
|
||||||
|
|
|
@ -1,4 +1,7 @@
|
||||||
import time
|
import time
|
||||||
|
import random
|
||||||
|
|
||||||
|
import django
|
||||||
from mock import patch
|
from mock import patch
|
||||||
|
|
||||||
from django.utils.encoding import force_text
|
from django.utils.encoding import force_text
|
||||||
|
@ -12,16 +15,18 @@ except ImportError:
|
||||||
|
|
||||||
from django.core.management import call_command
|
from django.core.management import call_command
|
||||||
from django.test import TestCase, RequestFactory, override_settings
|
from django.test import TestCase, RequestFactory, override_settings
|
||||||
from django.core.urlresolvers import reverse
|
|
||||||
from django.utils import timezone
|
from django.utils import timezone
|
||||||
|
|
||||||
from oidc_provider.tests.app.utils import (
|
from oidc_provider.tests.app.utils import (
|
||||||
create_fake_user,
|
create_fake_user,
|
||||||
create_fake_client,
|
create_fake_client,
|
||||||
create_fake_resource,
|
|
||||||
create_fake_token,
|
create_fake_token,
|
||||||
FAKE_RANDOM_STRING)
|
FAKE_RANDOM_STRING)
|
||||||
from oidc_provider.views import TokenIntrospectionView
|
from oidc_provider.views import TokenIntrospectionView
|
||||||
|
if django.VERSION >= (1, 11):
|
||||||
|
from django.urls import reverse
|
||||||
|
else:
|
||||||
|
from django.core.urlresolvers import reverse
|
||||||
|
|
||||||
|
|
||||||
class IntrospectionTestCase(TestCase):
|
class IntrospectionTestCase(TestCase):
|
||||||
|
@ -31,13 +36,15 @@ class IntrospectionTestCase(TestCase):
|
||||||
self.factory = RequestFactory()
|
self.factory = RequestFactory()
|
||||||
self.user = create_fake_user()
|
self.user = create_fake_user()
|
||||||
self.client = create_fake_client(response_type='id_token token')
|
self.client = create_fake_client(response_type='id_token token')
|
||||||
self.resource = create_fake_resource(allowed_clients=[self.client])
|
self.resource = create_fake_client(response_type='id_token token')
|
||||||
self.scopes = ['openid', 'profile']
|
self.resource.scope = ['token_introspection', self.client.client_id]
|
||||||
self.token = create_fake_token(self.user, self.scopes, self.client)
|
self.resource.save()
|
||||||
|
self.token = create_fake_token(self.user, self.client.scope, self.client)
|
||||||
|
self.token.access_token = str(random.randint(1, 999999)).zfill(6)
|
||||||
self.now = time.time()
|
self.now = time.time()
|
||||||
with patch('oidc_provider.lib.utils.token.time.time') as time_func:
|
with patch('oidc_provider.lib.utils.token.time.time') as time_func:
|
||||||
time_func.return_value = self.now
|
time_func.return_value = self.now
|
||||||
self.token.id_token = create_id_token(self.user, self.client.client_id)
|
self.token.id_token = create_id_token(self.token, self.user, self.client.client_id)
|
||||||
self.token.save()
|
self.token.save()
|
||||||
|
|
||||||
def test_no_client_params_returns_inactive(self):
|
def test_no_client_params_returns_inactive(self):
|
||||||
|
@ -56,14 +63,8 @@ class IntrospectionTestCase(TestCase):
|
||||||
response = self._make_request(access_token='invalid')
|
response = self._make_request(access_token='invalid')
|
||||||
self._assert_inactive(response)
|
self._assert_inactive(response)
|
||||||
|
|
||||||
def test_no_allowed_clients_returns_inactive(self):
|
def test_scope_no_audience_returns_inactive(self):
|
||||||
self.resource.allowed_clients.clear()
|
self.resource.scope = ['token_introspection']
|
||||||
self.resource.save()
|
|
||||||
response = self._make_request()
|
|
||||||
self._assert_inactive(response)
|
|
||||||
|
|
||||||
def test_resource_inactive_returns_inactive(self):
|
|
||||||
self.resource.active = False
|
|
||||||
self.resource.save()
|
self.resource.save()
|
||||||
response = self._make_request()
|
response = self._make_request()
|
||||||
self._assert_inactive(response)
|
self._assert_inactive(response)
|
||||||
|
@ -79,7 +80,7 @@ class IntrospectionTestCase(TestCase):
|
||||||
self.assertEqual(response.status_code, 200)
|
self.assertEqual(response.status_code, 200)
|
||||||
self.assertJSONEqual(force_text(response.content), {
|
self.assertJSONEqual(force_text(response.content), {
|
||||||
'active': True,
|
'active': True,
|
||||||
'aud': self.resource.resource_id,
|
'aud': self.resource.client_id,
|
||||||
'client_id': self.client.client_id,
|
'client_id': self.client.client_id,
|
||||||
'sub': str(self.user.pk),
|
'sub': str(self.user.pk),
|
||||||
'iat': int(self.now),
|
'iat': int(self.now),
|
||||||
|
@ -87,14 +88,13 @@ class IntrospectionTestCase(TestCase):
|
||||||
'iss': 'http://localhost:8000/openid',
|
'iss': 'http://localhost:8000/openid',
|
||||||
})
|
})
|
||||||
|
|
||||||
@override_settings(
|
@override_settings(OIDC_INTROSPECTION_PROCESSING_HOOK='oidc_provider.tests.app.utils.fake_introspection_processing_hook') # NOQA
|
||||||
OIDC_INTROSPECTION_PROCESSING_HOOK='oidc_provider.tests.app.utils.fake_introspection_processing_hook')
|
|
||||||
def test_custom_introspection_hook_called_on_valid_request(self):
|
def test_custom_introspection_hook_called_on_valid_request(self):
|
||||||
response = self._make_request()
|
response = self._make_request()
|
||||||
self.assertEqual(response.status_code, 200)
|
self.assertEqual(response.status_code, 200)
|
||||||
self.assertJSONEqual(force_text(response.content), {
|
self.assertJSONEqual(force_text(response.content), {
|
||||||
'active': True,
|
'active': True,
|
||||||
'aud': self.resource.resource_id,
|
'aud': self.resource.client_id,
|
||||||
'client_id': self.client.client_id,
|
'client_id': self.client.client_id,
|
||||||
'sub': str(self.user.pk),
|
'sub': str(self.user.pk),
|
||||||
'iat': int(self.now),
|
'iat': int(self.now),
|
||||||
|
@ -110,11 +110,12 @@ class IntrospectionTestCase(TestCase):
|
||||||
def _make_request(self, **kwargs):
|
def _make_request(self, **kwargs):
|
||||||
url = reverse('oidc_provider:token-introspection')
|
url = reverse('oidc_provider:token-introspection')
|
||||||
data = {
|
data = {
|
||||||
'client_id': kwargs.get('client_id', self.resource.resource_id),
|
'client_id': kwargs.get('client_id', self.resource.client_id),
|
||||||
'client_secret': kwargs.get('client_secret', self.resource.resource_secret),
|
'client_secret': kwargs.get('client_secret', self.resource.client_secret),
|
||||||
'token': kwargs.get('access_token', self.token.access_token),
|
'token': kwargs.get('access_token', self.token.access_token),
|
||||||
}
|
}
|
||||||
|
|
||||||
request = self.factory.post(url, data=urlencode(data), content_type='application/x-www-form-urlencoded')
|
request = self.factory.post(url, data=urlencode(data),
|
||||||
|
content_type='application/x-www-form-urlencoded')
|
||||||
|
|
||||||
return TokenIntrospectionView.as_view()(request)
|
return TokenIntrospectionView.as_view()(request)
|
|
@ -54,24 +54,6 @@ from oidc_provider.models import (
|
||||||
from oidc_provider import settings
|
from oidc_provider import settings
|
||||||
from oidc_provider import signals
|
from oidc_provider import signals
|
||||||
|
|
||||||
try:
|
|
||||||
from urllib import urlencode
|
|
||||||
from urlparse import urlsplit, parse_qs, urlunsplit
|
|
||||||
except ImportError:
|
|
||||||
from urllib.parse import urlsplit, parse_qs, urlunsplit, urlencode
|
|
||||||
|
|
||||||
from Cryptodome.PublicKey import RSA
|
|
||||||
from django.contrib.auth.views import (
|
|
||||||
redirect_to_login,
|
|
||||||
logout,
|
|
||||||
)
|
|
||||||
|
|
||||||
import django
|
|
||||||
if django.VERSION >= (1, 11):
|
|
||||||
from django.urls import reverse
|
|
||||||
else:
|
|
||||||
from django.core.urlresolvers import reverse
|
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue