django-oidc-provider/oidc_provider/tests/cases/test_token_endpoint.py
Tuomas Suutari 93420461b4 Merge branch 'develop' of github.com:juanifioren/django-oidc-provider
* 'develop' of github.com:juanifioren/django-oidc-provider:
  Update changelog.rst
  include request in password grant authenticate call
  Update setup.py
  Update changelog.rst
  Update changelog.rst
  Adjust import order and method order in introspection tests
  Replace resource with client in docs.
  Update settings docs to add extra introspection setting
  Update README.md
  Update README.md
  Remove the Resource model
  Skip csrf protection on introspection endpoint
  Add token introspection endpoint to satisfy https://tools.ietf.org/html/rfc7662
  Test docs with tox.
  Remove Django 1.7 for travis.
  Drop support for Django 1.7.
  Move extract_client_auth to oauth2 utils.
  Remove duplicate link in docs.
  Bump version v0.6.0.
  Fix BaseCodeTokenModel and user attr.
  Update README.md
  Edit README and contribute doc.
  Edit changelog.
  Update changelog.rst
  Add protected_resource_view test using client_credentials.
  Fix docs.
  Improve docs.
  Client credentials implementation.
  Move changelog into docs.
  Update README.md
  Update CHANGELOG.md
  Fixed infinite callback loop in check-session iframe
  Fix PEP8. New migration.
  Update example project.
  Fix PEP8.
  Fix PEP8.
  PEP8 errors and urls.
  PEP8 models.
  Fix contribute docs.
  Fix tox for checking PEP8 all files.
  Update README.md
  Update README.md
  Simplify test suit.
  Update CHANGELOG.md
  Bump version 0.5.3.
  Update installation.rst
  Update CHANGELOG.md
  Fixed wrong Object in Template
  Update project to support Django 2.0
  Now passing along the token to create_id_token function.
  Made token and token_refresh endpoint return requested claims.
  Sphinx documentation fixes (#219)
  Use request.user.is_authenticated as a bool with recent Django (#216)
  Fixed client id retrieval when aud is a list of str. (#210)
  Add owner field to Client (#211)
  Update CHANGELOG
  removed tab char
  Add pep8 compliance and checker
  Bump version
  Update CHANGELOG.md
  Preparing v0.5.2 (#201)
  Fix Django 2.0 deprecation warnings (#185)
  Fix infinite login loop if "prompt=login" (#198)
  fixed typos
  Bump version
  Fix scope handling of token endpoint (#193)
  Fixes #192
  Use stored user consent for public clients too (#189)
  Redirect URIs must match exactly. (#191)
  Bug #187 prompt handling (#188)
  Don't pin exact versions in install_requires.
2018-05-24 00:16:26 +03:00

818 lines
30 KiB
Python

import json
import time
import uuid
from base64 import b64encode
try:
from urllib.parse import urlencode
except ImportError:
from urllib import urlencode
from django.core.management import call_command
from django.http import JsonResponse
try:
from django.urls import reverse
except ImportError:
from django.core.urlresolvers import reverse
from django.test import (
RequestFactory,
override_settings,
)
from django.test import TestCase
from django.views.decorators.http import require_http_methods
from jwkest.jwk import KEYS
from jwkest.jws import JWS
from jwkest.jwt import JWT
from mock import patch
from oidc_provider.lib.utils.oauth2 import protected_resource_view
from oidc_provider.lib.utils.token import create_code
from oidc_provider.models import Token
from oidc_provider.tests.app.utils import (
create_fake_user,
create_fake_client,
FAKE_CODE_CHALLENGE,
FAKE_CODE_VERIFIER,
FAKE_NONCE,
FAKE_RANDOM_STRING,
)
from oidc_provider.views import (
JwksView,
TokenView,
userinfo,
)
class TokenTestCase(TestCase):
"""
To obtain an Access Token and an ID Token, the RP Client sends a
Token Request to the Token Endpoint to obtain a Token Response
when using the Authorization Code Flow.
"""
def setUp(self):
call_command('creatersakey')
self.factory = RequestFactory()
self.user = create_fake_user()
self.client = create_fake_client(response_type='code')
def _password_grant_post_data(self, scope=None):
result = {
'username': 'johndoe',
'password': '1234',
'grant_type': 'password',
'scope': 'openid email',
}
if scope is not None:
result['scope'] = ' '.join(scope)
return result
def _auth_code_post_data(self, code, scope=None):
"""
All the data that will be POSTed to the Token Endpoint.
"""
post_data = {
'client_id': self.client.client_id,
'client_secret': self.client.client_secret,
'redirect_uri': self.client.default_redirect_uri,
'grant_type': 'authorization_code',
'code': code,
'state': uuid.uuid4().hex,
}
if scope is not None:
post_data['scope'] = ' '.join(scope)
return post_data
def _refresh_token_post_data(self, refresh_token, scope=None):
"""
All the data that will be POSTed to the Token Endpoint.
"""
post_data = {
'client_id': self.client.client_id,
'client_secret': self.client.client_secret,
'grant_type': 'refresh_token',
'refresh_token': refresh_token,
}
if scope is not None:
post_data['scope'] = ' '.join(scope)
return post_data
def _post_request(self, post_data, extras={}):
"""
Makes a request to the token endpoint by sending the
`post_data` parameters using the 'application/x-www-form-urlencoded'
format.
"""
url = reverse('oidc_provider:token')
request = self.factory.post(
url,
data=urlencode(post_data),
content_type='application/x-www-form-urlencoded',
**extras)
response = TokenView.as_view()(request)
return response
def _create_code(self, scope=None):
"""
Generate a valid grant code.
"""
code = create_code(
user=self.user,
client=self.client,
scope=(scope if scope else ['openid', 'email']),
nonce=FAKE_NONCE,
is_authentication=True)
code.save()
return code
def _get_keys(self):
"""
Get public key from discovery.
"""
request = self.factory.get(reverse('oidc_provider:jwks'))
response = JwksView.as_view()(request)
jwks_dic = json.loads(response.content.decode('utf-8'))
SIGKEYS = KEYS()
SIGKEYS.load_dict(jwks_dic)
return SIGKEYS
def _get_userinfo(self, access_token):
url = reverse('oidc_provider:userinfo')
request = self.factory.get(url)
request.META['HTTP_AUTHORIZATION'] = 'Bearer ' + access_token
return userinfo(request)
def _password_grant_auth_header(self):
user_pass = self.client.client_id + ':' + self.client.client_secret
auth = b'Basic ' + b64encode(user_pass.encode('utf-8'))
auth_header = {'HTTP_AUTHORIZATION': auth.decode('utf-8')}
return auth_header
def test_default_setting_does_not_allow_grant_type_password(self):
post_data = self._password_grant_post_data()
response = self._post_request(
post_data=post_data,
extras=self._password_grant_auth_header()
)
response_dict = json.loads(response.content.decode('utf-8'))
self.assertEqual(400, response.status_code)
self.assertEqual('unsupported_grant_type', response_dict['error'])
@override_settings(OIDC_GRANT_TYPE_PASSWORD_ENABLE=True)
def test_password_grant_get_access_token_without_scope(self):
post_data = self._password_grant_post_data()
del (post_data['scope'])
response = self._post_request(
post_data=post_data,
extras=self._password_grant_auth_header()
)
response_dict = json.loads(response.content.decode('utf-8'))
self.assertIn('access_token', response_dict)
@override_settings(OIDC_GRANT_TYPE_PASSWORD_ENABLE=True)
def test_password_grant_get_access_token_with_scope(self):
response = self._post_request(
post_data=self._password_grant_post_data(),
extras=self._password_grant_auth_header()
)
response_dict = json.loads(response.content.decode('utf-8'))
self.assertIn('access_token', response_dict)
@override_settings(OIDC_GRANT_TYPE_PASSWORD_ENABLE=True)
def test_password_grant_get_access_token_invalid_user_credentials(self):
invalid_post = self._password_grant_post_data()
invalid_post['password'] = 'wrong!'
response = self._post_request(
post_data=invalid_post,
extras=self._password_grant_auth_header()
)
response_dict = json.loads(response.content.decode('utf-8'))
self.assertEqual(403, response.status_code)
self.assertEqual('access_denied', response_dict['error'])
def test_password_grant_get_access_token_invalid_client_credentials(self):
self.client.client_id = 'foo'
self.client.client_secret = 'bar'
response = self._post_request(
post_data=self._password_grant_post_data(),
extras=self._password_grant_auth_header()
)
response_dict = json.loads(response.content.decode('utf-8'))
self.assertEqual(400, response.status_code)
self.assertEqual('invalid_client', response_dict['error'])
def test_password_grant_full_response(self):
self.check_password_grant(scope=['openid', 'email'])
def test_password_grant_scope(self):
self.check_password_grant(scope=['openid', 'profile'])
@override_settings(OIDC_TOKEN_EXPIRE=120,
OIDC_GRANT_TYPE_PASSWORD_ENABLE=True)
def check_password_grant(self, scope):
response = self._post_request(
post_data=self._password_grant_post_data(scope),
extras=self._password_grant_auth_header()
)
response_dict = json.loads(response.content.decode('utf-8'))
id_token = JWS().verify_compact(
response_dict['id_token'].encode('utf-8'), self._get_keys())
token = Token.objects.get(user=self.user)
self.assertEqual(response_dict['access_token'], token.access_token)
self.assertEqual(response_dict['refresh_token'], token.refresh_token)
self.assertEqual(response_dict['expires_in'], 120)
self.assertEqual(response_dict['token_type'], 'bearer')
self.assertEqual(id_token['sub'], str(self.user.id))
self.assertEqual(id_token['aud'], self.client.client_id)
# Check the scope is honored by checking the claims in the userinfo
userinfo_response = self._get_userinfo(response_dict['access_token'])
userinfo = json.loads(userinfo_response.content.decode('utf-8'))
for (scope_param, claim) in [('email', 'email'), ('profile', 'name')]:
if scope_param in scope:
self.assertIn(claim, userinfo)
else:
self.assertNotIn(claim, userinfo)
@override_settings(OIDC_GRANT_TYPE_PASSWORD_ENABLE=True,
AUTHENTICATION_BACKENDS=("oidc_provider.tests.app.utils.TestAuthBackend",))
def test_password_grant_passes_request_to_backend(self):
response = self._post_request(
post_data=self._password_grant_post_data(),
extras=self._password_grant_auth_header()
)
response_dict = json.loads(response.content.decode('utf-8'))
self.assertIn('access_token', response_dict)
@override_settings(OIDC_TOKEN_EXPIRE=720)
def test_authorization_code(self):
"""
We MUST validate the signature of the ID Token according to JWS
using the algorithm specified in the alg Header Parameter of
the JOSE Header.
"""
SIGKEYS = self._get_keys()
code = self._create_code()
post_data = self._auth_code_post_data(code=code.code)
response = self._post_request(post_data)
response_dic = json.loads(response.content.decode('utf-8'))
id_token = JWS().verify_compact(response_dic['id_token'].encode('utf-8'), SIGKEYS)
token = Token.objects.get(user=self.user)
self.assertEqual(response_dic['access_token'], token.access_token)
self.assertEqual(response_dic['refresh_token'], token.refresh_token)
self.assertEqual(response_dic['token_type'], 'bearer')
self.assertEqual(response_dic['expires_in'], 720)
self.assertEqual(id_token['sub'], str(self.user.id))
self.assertEqual(id_token['aud'], self.client.client_id)
@override_settings(OIDC_TOKEN_EXPIRE=720,
OIDC_IDTOKEN_INCLUDE_CLAIMS=True)
def test_scope_is_ignored_for_auth_code(self):
"""
Scope is ignored for token respones to auth code grant type.
This comes down to that the scopes requested in authorize are returned.
"""
SIGKEYS = self._get_keys()
for code_scope in [['openid'], ['openid', 'email'], ['openid', 'profile']]:
code = self._create_code(code_scope)
post_data = self._auth_code_post_data(
code=code.code, scope=code_scope)
response = self._post_request(post_data)
response_dic = json.loads(response.content.decode('utf-8'))
self.assertEqual(response.status_code, 200)
id_token = JWS().verify_compact(response_dic['id_token'].encode('utf-8'), SIGKEYS)
if 'email' in code_scope:
self.assertIn('email', id_token)
self.assertIn('email_verified', id_token)
else:
self.assertNotIn('email', id_token)
if 'profile' in code_scope:
self.assertIn('given_name', id_token)
else:
self.assertNotIn('given_name', id_token)
def test_refresh_token(self):
"""
A request to the Token Endpoint can also use a Refresh Token
by using the grant_type value refresh_token, as described in
Section 6 of OAuth 2.0 [RFC6749].
"""
self.do_refresh_token_check()
def test_refresh_token_invalid_scope(self):
"""
Extending scope in refresh token is not allowed.
Try to get a refresh token with "profile" in the scope even
though the original authorized scope in the authorization code
request is only ['openid', 'email'].
"""
self.do_refresh_token_check(scope=['openid', 'profile'])
def test_refresh_token_narrowed_scope(self):
"""
Narrowing scope in refresh token is allowed.
Try to get a refresh token with just "openid" in the scope even
though the original authorized scope in the authorization code
request is ['openid', 'email'].
"""
self.do_refresh_token_check(scope=['openid'])
@override_settings(OIDC_IDTOKEN_INCLUDE_CLAIMS=True)
def do_refresh_token_check(self, scope=None):
SIGKEYS = self._get_keys()
# Retrieve refresh token
code = self._create_code()
self.assertEqual(code.scope, ['openid', 'email'])
post_data = self._auth_code_post_data(code=code.code)
start_time = time.time()
with patch('oidc_provider.lib.utils.token.time.time') as time_func:
time_func.return_value = start_time
response = self._post_request(post_data)
response_dic1 = json.loads(response.content.decode('utf-8'))
id_token1 = JWS().verify_compact(response_dic1['id_token'].encode('utf-8'), SIGKEYS)
# Use refresh token to obtain new token
post_data = self._refresh_token_post_data(
response_dic1['refresh_token'], scope)
with patch('oidc_provider.lib.utils.token.time.time') as time_func:
time_func.return_value = start_time + 600
response = self._post_request(post_data)
response_dic2 = json.loads(response.content.decode('utf-8'))
if scope and set(scope) - set(code.scope): # too broad scope
self.assertEqual(response.status_code, 400) # Bad Request
self.assertIn('error', response_dic2)
self.assertEqual(response_dic2['error'], 'invalid_scope')
return # No more checks
id_token2 = JWS().verify_compact(response_dic2['id_token'].encode('utf-8'), SIGKEYS)
if scope and 'email' not in scope: # narrowed scope The auth
# The auth code request had email in scope, so it should be
# in the first id token
self.assertIn('email', id_token1)
# but the refresh request had no email in scope
self.assertNotIn('email', id_token2, 'email was not requested')
self.assertNotEqual(response_dic1['id_token'], response_dic2['id_token'])
self.assertNotEqual(response_dic1['access_token'], response_dic2['access_token'])
self.assertNotEqual(response_dic1['refresh_token'], response_dic2['refresh_token'])
# http://openid.net/specs/openid-connect-core-1_0.html#rfc.section.12.2
self.assertEqual(id_token1['iss'], id_token2['iss'])
self.assertEqual(id_token1['sub'], id_token2['sub'])
self.assertNotEqual(id_token1['iat'], id_token2['iat'])
self.assertEqual(id_token1['iat'], int(start_time))
self.assertEqual(id_token2['iat'], int(start_time + 600))
self.assertEqual(id_token1['aud'], id_token2['aud'])
self.assertEqual(id_token1['auth_time'], id_token2['auth_time'])
self.assertEqual(id_token1.get('azp'), id_token2.get('azp'))
# Refresh token can't be reused
post_data = self._refresh_token_post_data(response_dic1['refresh_token'])
response = self._post_request(post_data)
self.assertIn('invalid_grant', response.content.decode('utf-8'))
# Old access token is invalidated
self.assertEqual(self._get_userinfo(response_dic1['access_token']).status_code, 401)
self.assertEqual(self._get_userinfo(response_dic2['access_token']).status_code, 200)
# Empty refresh token is invalid
post_data = self._refresh_token_post_data('')
response = self._post_request(post_data)
self.assertIn('invalid_grant', response.content.decode('utf-8'))
# No refresh token is invalid
post_data = self._refresh_token_post_data('')
del post_data['refresh_token']
response = self._post_request(post_data)
self.assertIn('invalid_grant', response.content.decode('utf-8'))
def test_client_redirect_uri(self):
"""
Validate that client redirect URIs exactly match registered
URIs, and that unregistered URIs or URIs with query parameters are rejected.
See http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest and
http://openid.net/specs/openid-connect-core-1_0.html#HybridTokenRequest.
"""
code = self._create_code()
post_data = self._auth_code_post_data(code=code.code)
# Unregistered URI
post_data['redirect_uri'] = 'http://invalid.example.org'
response = self._post_request(post_data)
self.assertIn('invalid_client', response.content.decode('utf-8'))
# Registered URI, but with query string appended
post_data['redirect_uri'] = self.client.default_redirect_uri + '?foo=bar'
response = self._post_request(post_data)
self.assertIn('invalid_client', response.content.decode('utf-8'))
# Registered URI
post_data['redirect_uri'] = self.client.default_redirect_uri
response = self._post_request(post_data)
self.assertNotIn('invalid_client', response.content.decode('utf-8'))
def test_request_methods(self):
"""
Client sends an HTTP POST request to the Token Endpoint. Other request
methods MUST NOT be allowed.
"""
url = reverse('oidc_provider:token')
requests = [
self.factory.get(url),
self.factory.put(url),
self.factory.delete(url),
]
for request in requests:
response = TokenView.as_view()(request)
self.assertEqual(
response.status_code, 405,
msg=request.method + ' request does not return a 405 status.')
request = self.factory.post(url)
response = TokenView.as_view()(request)
self.assertEqual(
response.status_code, 400,
msg=request.method + ' request does not return a 400 status.')
def test_client_authentication(self):
"""
The authorization server support including the
client credentials in the request-body using the `client_id` and
`client_secret`parameters.
See: http://tools.ietf.org/html/rfc6749#section-2.3.1
"""
code = self._create_code()
# Test a valid request to the token endpoint.
post_data = self._auth_code_post_data(code=code.code)
response = self._post_request(post_data)
self.assertNotIn(
'invalid_client',
response.content.decode('utf-8'),
msg='Client authentication fails using request-body credentials.')
# Now, test with an invalid client_id.
invalid_data = post_data.copy()
invalid_data['client_id'] = self.client.client_id * 2 # Fake id.
# Create another grant code.
code = self._create_code()
invalid_data['code'] = code.code
response = self._post_request(invalid_data)
self.assertIn(
'invalid_client',
response.content.decode('utf-8'),
msg='Client authentication success with an invalid "client_id".')
# Now, test using HTTP Basic Authentication method.
basicauth_data = post_data.copy()
# Create another grant code.
code = self._create_code()
basicauth_data['code'] = code.code
del basicauth_data['client_id']
del basicauth_data['client_secret']
response = self._post_request(basicauth_data, self._password_grant_auth_header())
response.content.decode('utf-8')
self.assertNotIn(
'invalid_client',
response.content.decode('utf-8'),
msg='Client authentication fails using HTTP Basic Auth.')
def test_access_token_contains_nonce(self):
"""
If present in the Authentication Request, Authorization Servers MUST
include a nonce Claim in the ID Token with the Claim Value being the
nonce value sent in the Authentication Request.
If the client does not supply a nonce parameter, it SHOULD not be
included in the `id_token`.
See http://openid.net/specs/openid-connect-core-1_0.html#IDToken
"""
code = self._create_code()
post_data = self._auth_code_post_data(code=code.code)
response = self._post_request(post_data)
response_dic = json.loads(response.content.decode('utf-8'))
id_token = JWT().unpack(response_dic['id_token'].encode('utf-8')).payload()
self.assertEqual(id_token.get('nonce'), FAKE_NONCE)
# Client does not supply a nonce parameter.
code.nonce = ''
code.save()
response = self._post_request(post_data)
response_dic = json.loads(response.content.decode('utf-8'))
id_token = JWT().unpack(response_dic['id_token'].encode('utf-8')).payload()
self.assertEqual(id_token.get('nonce'), None)
def test_id_token_contains_at_hash(self):
"""
If access_token is included, the id_token SHOULD contain an at_hash.
"""
code = self._create_code()
post_data = self._auth_code_post_data(code=code.code)
response = self._post_request(post_data)
response_dic = json.loads(response.content.decode('utf-8'))
id_token = JWT().unpack(response_dic['id_token'].encode('utf-8')).payload()
self.assertTrue(id_token.get('at_hash'))
def test_idtoken_sign_validation(self):
"""
We MUST validate the signature of the ID Token according to JWS
using the algorithm specified in the alg Header Parameter of
the JOSE Header.
"""
SIGKEYS = self._get_keys()
RSAKEYS = [k for k in SIGKEYS if k.kty == 'RSA']
code = self._create_code()
post_data = self._auth_code_post_data(code=code.code)
response = self._post_request(post_data)
response_dic = json.loads(response.content.decode('utf-8'))
JWS().verify_compact(response_dic['id_token'].encode('utf-8'), RSAKEYS)
@override_settings(
OIDC_IDTOKEN_SUB_GENERATOR='oidc_provider.tests.app.utils.fake_sub_generator')
def test_custom_sub_generator(self):
"""
Test custom function for setting OIDC_IDTOKEN_SUB_GENERATOR.
"""
code = self._create_code()
post_data = self._auth_code_post_data(code=code.code)
response = self._post_request(post_data)
response_dic = json.loads(response.content.decode('utf-8'))
id_token = JWT().unpack(response_dic['id_token'].encode('utf-8')).payload()
self.assertEqual(id_token.get('sub'), self.user.email)
@override_settings(
OIDC_IDTOKEN_PROCESSING_HOOK='oidc_provider.tests.app.utils.fake_idtoken_processing_hook')
def test_additional_idtoken_processing_hook(self):
"""
Test custom function for setting OIDC_IDTOKEN_PROCESSING_HOOK.
"""
code = self._create_code()
post_data = self._auth_code_post_data(code=code.code)
response = self._post_request(post_data)
response_dic = json.loads(response.content.decode('utf-8'))
id_token = JWT().unpack(response_dic['id_token'].encode('utf-8')).payload()
self.assertEqual(id_token.get('test_idtoken_processing_hook'), FAKE_RANDOM_STRING)
self.assertEqual(id_token.get('test_idtoken_processing_hook_user_email'), self.user.email)
@override_settings(
OIDC_IDTOKEN_PROCESSING_HOOK=(
'oidc_provider.tests.app.utils.fake_idtoken_processing_hook',
)
)
def test_additional_idtoken_processing_hook_one_element_in_tuple(self):
"""
Test custom function for setting OIDC_IDTOKEN_PROCESSING_HOOK.
"""
code = self._create_code()
post_data = self._auth_code_post_data(code=code.code)
response = self._post_request(post_data)
response_dic = json.loads(response.content.decode('utf-8'))
id_token = JWT().unpack(response_dic['id_token'].encode('utf-8')).payload()
self.assertEqual(id_token.get('test_idtoken_processing_hook'), FAKE_RANDOM_STRING)
self.assertEqual(id_token.get('test_idtoken_processing_hook_user_email'), self.user.email)
@override_settings(
OIDC_IDTOKEN_PROCESSING_HOOK=[
'oidc_provider.tests.app.utils.fake_idtoken_processing_hook',
]
)
def test_additional_idtoken_processing_hook_one_element_in_list(self):
"""
Test custom function for setting OIDC_IDTOKEN_PROCESSING_HOOK.
"""
code = self._create_code()
post_data = self._auth_code_post_data(code=code.code)
response = self._post_request(post_data)
response_dic = json.loads(response.content.decode('utf-8'))
id_token = JWT().unpack(response_dic['id_token'].encode('utf-8')).payload()
self.assertEqual(id_token.get('test_idtoken_processing_hook'), FAKE_RANDOM_STRING)
self.assertEqual(id_token.get('test_idtoken_processing_hook_user_email'), self.user.email)
@override_settings(
OIDC_IDTOKEN_PROCESSING_HOOK=[
'oidc_provider.tests.app.utils.fake_idtoken_processing_hook',
'oidc_provider.tests.app.utils.fake_idtoken_processing_hook2',
]
)
def test_additional_idtoken_processing_hook_two_elements_in_list(self):
"""
Test custom function for setting OIDC_IDTOKEN_PROCESSING_HOOK.
"""
code = self._create_code()
post_data = self._auth_code_post_data(code=code.code)
response = self._post_request(post_data)
response_dic = json.loads(response.content.decode('utf-8'))
id_token = JWT().unpack(response_dic['id_token'].encode('utf-8')).payload()
self.assertEqual(id_token.get('test_idtoken_processing_hook'), FAKE_RANDOM_STRING)
self.assertEqual(id_token.get('test_idtoken_processing_hook_user_email'), self.user.email)
self.assertEqual(id_token.get('test_idtoken_processing_hook2'), FAKE_RANDOM_STRING)
self.assertEqual(id_token.get('test_idtoken_processing_hook_user_email2'), self.user.email)
@override_settings(
OIDC_IDTOKEN_PROCESSING_HOOK=(
'oidc_provider.tests.app.utils.fake_idtoken_processing_hook',
'oidc_provider.tests.app.utils.fake_idtoken_processing_hook2',
)
)
def test_additional_idtoken_processing_hook_two_elements_in_tuple(self):
"""
Test custom function for setting OIDC_IDTOKEN_PROCESSING_HOOK.
"""
code = self._create_code()
post_data = self._auth_code_post_data(code=code.code)
response = self._post_request(post_data)
response_dic = json.loads(response.content.decode('utf-8'))
id_token = JWT().unpack(response_dic['id_token'].encode('utf-8')).payload()
self.assertEqual(id_token.get('test_idtoken_processing_hook'), FAKE_RANDOM_STRING)
self.assertEqual(id_token.get('test_idtoken_processing_hook_user_email'), self.user.email)
self.assertEqual(id_token.get('test_idtoken_processing_hook2'), FAKE_RANDOM_STRING)
self.assertEqual(id_token.get('test_idtoken_processing_hook_user_email2'), self.user.email)
@override_settings(
OIDC_IDTOKEN_PROCESSING_HOOK=(
'oidc_provider.tests.app.utils.fake_idtoken_processing_hook3'))
def test_additional_idtoken_processing_hook_scope_param(self):
"""
Test scope parameter is passed to OIDC_IDTOKEN_PROCESSING_HOOK.
"""
code = self._create_code(['openid', 'email', 'profile', 'dummy'])
post_data = self._auth_code_post_data(code=code.code)
response = self._post_request(post_data)
response_dic = json.loads(response.content.decode('utf-8'))
id_token = JWT().unpack(response_dic['id_token'].encode('utf-8')).payload()
self.assertEqual(
id_token.get('scope_passed_to_processing_hook'),
['openid', 'email', 'profile', 'dummy'])
def test_pkce_parameters(self):
"""
Test Proof Key for Code Exchange by OAuth Public Clients.
https://tools.ietf.org/html/rfc7636
"""
code = create_code(user=self.user, client=self.client,
scope=['openid', 'email'], nonce=FAKE_NONCE, is_authentication=True,
code_challenge=FAKE_CODE_CHALLENGE, code_challenge_method='S256')
code.save()
post_data = self._auth_code_post_data(code=code.code)
# Add parameters.
post_data['code_verifier'] = FAKE_CODE_VERIFIER
response = self._post_request(post_data)
json.loads(response.content.decode('utf-8'))
def test_client_credentials_grant_type(self):
fake_scopes_list = ['scopeone', 'scopetwo']
# Add scope for this client.
self.client.scope = fake_scopes_list
self.client.save()
post_data = {
'client_id': self.client.client_id,
'client_secret': self.client.client_secret,
'grant_type': 'client_credentials',
}
response = self._post_request(post_data)
response_dict = json.loads(response.content.decode('utf-8'))
# Ensure access token exists in the response, also check if scopes are
# the ones we registered previously.
self.assertTrue('access_token' in response_dict)
self.assertEqual(' '.join(fake_scopes_list), response_dict['scope'])
# Create a protected resource and test the access_token.
@require_http_methods(['GET'])
@protected_resource_view(fake_scopes_list)
def protected_api(request, *args, **kwargs):
return JsonResponse({'protected': 'information'}, status=200)
# Deploy view on some url. So, base url could be anything.
request = self.factory.get(
'/api/protected/?access_token={0}'.format(response_dict['access_token']))
response = protected_api(request)
response_dict = json.loads(response.content.decode('utf-8'))
self.assertEqual(response.status_code, 200)
self.assertTrue('protected' in response_dict)
# Protected resource test ends here.
# Clean scopes for this client.
self.client.scope = ''
self.client.save()
response = self._post_request(post_data)
response_dict = json.loads(response.content.decode('utf-8'))
# It should fail when client does not have any scope added.
self.assertEqual(400, response.status_code)
self.assertEqual('invalid_scope', response_dict['error'])