785 lines
29 KiB
Python
785 lines
29 KiB
Python
import json
|
|
import time
|
|
import uuid
|
|
|
|
from base64 import b64encode
|
|
try:
|
|
from urllib.parse import urlencode
|
|
except ImportError:
|
|
from urllib import urlencode
|
|
|
|
from django.core.management import call_command
|
|
from django.http import JsonResponse
|
|
try:
|
|
from django.urls import reverse
|
|
except ImportError:
|
|
from django.core.urlresolvers import reverse
|
|
from django.test import (
|
|
RequestFactory,
|
|
override_settings,
|
|
)
|
|
from django.test import TestCase
|
|
from django.views.decorators.http import require_http_methods
|
|
from jwkest.jwk import KEYS
|
|
from jwkest.jws import JWS
|
|
from jwkest.jwt import JWT
|
|
from mock import patch
|
|
|
|
from oidc_provider.lib.utils.oauth2 import protected_resource_view
|
|
from oidc_provider.lib.utils.token import create_code
|
|
from oidc_provider.models import Token
|
|
from oidc_provider.tests.app.utils import (
|
|
create_fake_user,
|
|
create_fake_client,
|
|
FAKE_CODE_CHALLENGE,
|
|
FAKE_CODE_VERIFIER,
|
|
FAKE_NONCE,
|
|
FAKE_RANDOM_STRING,
|
|
)
|
|
from oidc_provider.views import (
|
|
JwksView,
|
|
TokenView,
|
|
userinfo,
|
|
)
|
|
|
|
|
|
class TokenTestCase(TestCase):
|
|
"""
|
|
To obtain an Access Token and an ID Token, the RP Client sends a
|
|
Token Request to the Token Endpoint to obtain a Token Response
|
|
when using the Authorization Code Flow.
|
|
"""
|
|
|
|
def setUp(self):
|
|
call_command('creatersakey')
|
|
self.factory = RequestFactory()
|
|
self.user = create_fake_user()
|
|
self.client = create_fake_client(response_type='code')
|
|
|
|
def _password_grant_post_data(self, scope=None):
|
|
result = {
|
|
'username': 'johndoe',
|
|
'password': '1234',
|
|
'grant_type': 'password',
|
|
'scope': 'openid email',
|
|
}
|
|
if scope is not None:
|
|
result['scope'] = ' '.join(scope)
|
|
return result
|
|
|
|
def _auth_code_post_data(self, code, scope=None):
|
|
"""
|
|
All the data that will be POSTed to the Token Endpoint.
|
|
"""
|
|
post_data = {
|
|
'client_id': self.client.client_id,
|
|
'client_secret': self.client.client_secret,
|
|
'redirect_uri': self.client.default_redirect_uri,
|
|
'grant_type': 'authorization_code',
|
|
'code': code,
|
|
'state': uuid.uuid4().hex,
|
|
}
|
|
if scope is not None:
|
|
post_data['scope'] = ' '.join(scope)
|
|
|
|
return post_data
|
|
|
|
def _refresh_token_post_data(self, refresh_token, scope=None):
|
|
"""
|
|
All the data that will be POSTed to the Token Endpoint.
|
|
"""
|
|
post_data = {
|
|
'client_id': self.client.client_id,
|
|
'client_secret': self.client.client_secret,
|
|
'grant_type': 'refresh_token',
|
|
'refresh_token': refresh_token,
|
|
}
|
|
if scope is not None:
|
|
post_data['scope'] = ' '.join(scope)
|
|
|
|
return post_data
|
|
|
|
def _post_request(self, post_data, extras={}):
|
|
"""
|
|
Makes a request to the token endpoint by sending the
|
|
`post_data` parameters using the 'application/x-www-form-urlencoded'
|
|
format.
|
|
"""
|
|
url = reverse('oidc_provider:token')
|
|
|
|
request = self.factory.post(
|
|
url,
|
|
data=urlencode(post_data),
|
|
content_type='application/x-www-form-urlencoded',
|
|
**extras)
|
|
|
|
response = TokenView.as_view()(request)
|
|
|
|
return response
|
|
|
|
def _create_code(self, scope=None):
|
|
"""
|
|
Generate a valid grant code.
|
|
"""
|
|
code = create_code(
|
|
user=self.user,
|
|
client=self.client,
|
|
scope=(scope if scope else ['openid', 'email']),
|
|
nonce=FAKE_NONCE,
|
|
is_authentication=True)
|
|
code.save()
|
|
|
|
return code
|
|
|
|
def _get_keys(self):
|
|
"""
|
|
Get public key from discovery.
|
|
"""
|
|
request = self.factory.get(reverse('oidc_provider:jwks'))
|
|
response = JwksView.as_view()(request)
|
|
jwks_dic = json.loads(response.content.decode('utf-8'))
|
|
SIGKEYS = KEYS()
|
|
SIGKEYS.load_dict(jwks_dic)
|
|
return SIGKEYS
|
|
|
|
def _get_userinfo(self, access_token):
|
|
url = reverse('oidc_provider:userinfo')
|
|
request = self.factory.get(url)
|
|
request.META['HTTP_AUTHORIZATION'] = 'Bearer ' + access_token
|
|
|
|
return userinfo(request)
|
|
|
|
def _password_grant_auth_header(self):
|
|
user_pass = self.client.client_id + ':' + self.client.client_secret
|
|
auth = b'Basic ' + b64encode(user_pass.encode('utf-8'))
|
|
auth_header = {'HTTP_AUTHORIZATION': auth.decode('utf-8')}
|
|
return auth_header
|
|
|
|
def test_default_setting_does_not_allow_grant_type_password(self):
|
|
post_data = self._password_grant_post_data()
|
|
|
|
response = self._post_request(
|
|
post_data=post_data,
|
|
extras=self._password_grant_auth_header()
|
|
)
|
|
|
|
response_dict = json.loads(response.content.decode('utf-8'))
|
|
|
|
self.assertEqual(400, response.status_code)
|
|
self.assertEqual('unsupported_grant_type', response_dict['error'])
|
|
|
|
@override_settings(OIDC_GRANT_TYPE_PASSWORD_ENABLE=True)
|
|
def test_password_grant_get_access_token_without_scope(self):
|
|
post_data = self._password_grant_post_data()
|
|
del (post_data['scope'])
|
|
|
|
response = self._post_request(
|
|
post_data=post_data,
|
|
extras=self._password_grant_auth_header()
|
|
)
|
|
|
|
response_dict = json.loads(response.content.decode('utf-8'))
|
|
self.assertIn('access_token', response_dict)
|
|
|
|
@override_settings(OIDC_GRANT_TYPE_PASSWORD_ENABLE=True)
|
|
def test_password_grant_get_access_token_with_scope(self):
|
|
response = self._post_request(
|
|
post_data=self._password_grant_post_data(),
|
|
extras=self._password_grant_auth_header()
|
|
)
|
|
|
|
response_dict = json.loads(response.content.decode('utf-8'))
|
|
self.assertIn('access_token', response_dict)
|
|
|
|
@override_settings(OIDC_GRANT_TYPE_PASSWORD_ENABLE=True)
|
|
def test_password_grant_get_access_token_invalid_user_credentials(self):
|
|
invalid_post = self._password_grant_post_data()
|
|
invalid_post['password'] = 'wrong!'
|
|
|
|
response = self._post_request(
|
|
post_data=invalid_post,
|
|
extras=self._password_grant_auth_header()
|
|
)
|
|
|
|
response_dict = json.loads(response.content.decode('utf-8'))
|
|
|
|
self.assertEqual(403, response.status_code)
|
|
self.assertEqual('access_denied', response_dict['error'])
|
|
|
|
def test_password_grant_get_access_token_invalid_client_credentials(self):
|
|
self.client.client_id = 'foo'
|
|
self.client.client_secret = 'bar'
|
|
|
|
response = self._post_request(
|
|
post_data=self._password_grant_post_data(),
|
|
extras=self._password_grant_auth_header()
|
|
)
|
|
|
|
response_dict = json.loads(response.content.decode('utf-8'))
|
|
|
|
self.assertEqual(400, response.status_code)
|
|
self.assertEqual('invalid_client', response_dict['error'])
|
|
|
|
def test_password_grant_full_response(self):
|
|
self.check_password_grant(scope=['openid', 'email'])
|
|
|
|
def test_password_grant_scope(self):
|
|
self.check_password_grant(scope=['openid', 'profile'])
|
|
|
|
@override_settings(OIDC_TOKEN_EXPIRE=120,
|
|
OIDC_GRANT_TYPE_PASSWORD_ENABLE=True)
|
|
def check_password_grant(self, scope):
|
|
response = self._post_request(
|
|
post_data=self._password_grant_post_data(scope),
|
|
extras=self._password_grant_auth_header()
|
|
)
|
|
|
|
response_dict = json.loads(response.content.decode('utf-8'))
|
|
id_token = JWS().verify_compact(
|
|
response_dict['id_token'].encode('utf-8'), self._get_keys())
|
|
|
|
token = Token.objects.get(user=self.user)
|
|
self.assertEqual(response_dict['access_token'], token.access_token)
|
|
self.assertEqual(response_dict['refresh_token'], token.refresh_token)
|
|
self.assertEqual(response_dict['expires_in'], 120)
|
|
self.assertEqual(response_dict['token_type'], 'bearer')
|
|
self.assertEqual(id_token['sub'], str(self.user.id))
|
|
self.assertEqual(id_token['aud'], self.client.client_id)
|
|
|
|
# Check the scope is honored by checking the claims in the userinfo
|
|
userinfo_response = self._get_userinfo(response_dict['access_token'])
|
|
userinfo = json.loads(userinfo_response.content.decode('utf-8'))
|
|
|
|
for (scope_param, claim) in [('email', 'email'), ('profile', 'name')]:
|
|
if scope_param in scope:
|
|
self.assertIn(claim, userinfo)
|
|
else:
|
|
self.assertNotIn(claim, userinfo)
|
|
|
|
@override_settings(OIDC_TOKEN_EXPIRE=720)
|
|
def test_authorization_code(self):
|
|
"""
|
|
We MUST validate the signature of the ID Token according to JWS
|
|
using the algorithm specified in the alg Header Parameter of
|
|
the JOSE Header.
|
|
"""
|
|
SIGKEYS = self._get_keys()
|
|
code = self._create_code()
|
|
|
|
post_data = self._auth_code_post_data(code=code.code)
|
|
|
|
response = self._post_request(post_data)
|
|
response_dic = json.loads(response.content.decode('utf-8'))
|
|
|
|
id_token = JWS().verify_compact(response_dic['id_token'].encode('utf-8'), SIGKEYS)
|
|
|
|
token = Token.objects.get(user=self.user)
|
|
self.assertEqual(response_dic['access_token'], token.access_token)
|
|
self.assertEqual(response_dic['refresh_token'], token.refresh_token)
|
|
self.assertEqual(response_dic['token_type'], 'bearer')
|
|
self.assertEqual(response_dic['expires_in'], 720)
|
|
self.assertEqual(id_token['sub'], str(self.user.id))
|
|
self.assertEqual(id_token['aud'], self.client.client_id)
|
|
|
|
@override_settings(OIDC_TOKEN_EXPIRE=720,
|
|
OIDC_IDTOKEN_INCLUDE_CLAIMS=True)
|
|
def test_scope_is_ignored_for_auth_code(self):
|
|
"""
|
|
Scope is ignored for token respones to auth code grant type.
|
|
This comes down to that the scopes requested in authorize are returned.
|
|
"""
|
|
SIGKEYS = self._get_keys()
|
|
for code_scope in [['openid'], ['openid', 'email'], ['openid', 'profile']]:
|
|
code = self._create_code(code_scope)
|
|
|
|
post_data = self._auth_code_post_data(
|
|
code=code.code, scope=code_scope)
|
|
|
|
response = self._post_request(post_data)
|
|
response_dic = json.loads(response.content.decode('utf-8'))
|
|
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
id_token = JWS().verify_compact(response_dic['id_token'].encode('utf-8'), SIGKEYS)
|
|
|
|
if 'email' in code_scope:
|
|
self.assertIn('email', id_token)
|
|
self.assertIn('email_verified', id_token)
|
|
else:
|
|
self.assertNotIn('email', id_token)
|
|
|
|
if 'profile' in code_scope:
|
|
self.assertIn('given_name', id_token)
|
|
else:
|
|
self.assertNotIn('given_name', id_token)
|
|
|
|
def test_refresh_token(self):
|
|
"""
|
|
A request to the Token Endpoint can also use a Refresh Token
|
|
by using the grant_type value refresh_token, as described in
|
|
Section 6 of OAuth 2.0 [RFC6749].
|
|
"""
|
|
self.do_refresh_token_check()
|
|
|
|
def test_refresh_token_invalid_scope(self):
|
|
"""
|
|
Extending scope in refresh token is not allowed.
|
|
|
|
Try to get a refresh token with "profile" in the scope even
|
|
though the original authorized scope in the authorization code
|
|
request is only ['openid', 'email'].
|
|
"""
|
|
self.do_refresh_token_check(scope=['openid', 'profile'])
|
|
|
|
def test_refresh_token_narrowed_scope(self):
|
|
"""
|
|
Narrowing scope in refresh token is allowed.
|
|
|
|
Try to get a refresh token with just "openid" in the scope even
|
|
though the original authorized scope in the authorization code
|
|
request is ['openid', 'email'].
|
|
"""
|
|
self.do_refresh_token_check(scope=['openid'])
|
|
|
|
@override_settings(OIDC_IDTOKEN_INCLUDE_CLAIMS=True)
|
|
def do_refresh_token_check(self, scope=None):
|
|
SIGKEYS = self._get_keys()
|
|
|
|
# Retrieve refresh token
|
|
code = self._create_code()
|
|
self.assertEqual(code.scope, ['openid', 'email'])
|
|
post_data = self._auth_code_post_data(code=code.code)
|
|
start_time = time.time()
|
|
with patch('oidc_provider.lib.utils.token.time.time') as time_func:
|
|
time_func.return_value = start_time
|
|
response = self._post_request(post_data)
|
|
|
|
response_dic1 = json.loads(response.content.decode('utf-8'))
|
|
id_token1 = JWS().verify_compact(response_dic1['id_token'].encode('utf-8'), SIGKEYS)
|
|
|
|
# Use refresh token to obtain new token
|
|
post_data = self._refresh_token_post_data(
|
|
response_dic1['refresh_token'], scope)
|
|
with patch('oidc_provider.lib.utils.token.time.time') as time_func:
|
|
time_func.return_value = start_time + 600
|
|
response = self._post_request(post_data)
|
|
|
|
response_dic2 = json.loads(response.content.decode('utf-8'))
|
|
|
|
if scope and set(scope) - set(code.scope): # too broad scope
|
|
self.assertEqual(response.status_code, 400) # Bad Request
|
|
self.assertIn('error', response_dic2)
|
|
self.assertEqual(response_dic2['error'], 'invalid_scope')
|
|
return # No more checks
|
|
|
|
id_token2 = JWS().verify_compact(response_dic2['id_token'].encode('utf-8'), SIGKEYS)
|
|
|
|
if scope and 'email' not in scope: # narrowed scope The auth
|
|
# The auth code request had email in scope, so it should be
|
|
# in the first id token
|
|
self.assertIn('email', id_token1)
|
|
# but the refresh request had no email in scope
|
|
self.assertNotIn('email', id_token2, 'email was not requested')
|
|
|
|
self.assertNotEqual(response_dic1['id_token'], response_dic2['id_token'])
|
|
self.assertNotEqual(response_dic1['access_token'], response_dic2['access_token'])
|
|
self.assertNotEqual(response_dic1['refresh_token'], response_dic2['refresh_token'])
|
|
|
|
# http://openid.net/specs/openid-connect-core-1_0.html#rfc.section.12.2
|
|
self.assertEqual(id_token1['iss'], id_token2['iss'])
|
|
self.assertEqual(id_token1['sub'], id_token2['sub'])
|
|
self.assertNotEqual(id_token1['iat'], id_token2['iat'])
|
|
self.assertEqual(id_token1['iat'], int(start_time))
|
|
self.assertEqual(id_token2['iat'], int(start_time + 600))
|
|
self.assertEqual(id_token1['aud'], id_token2['aud'])
|
|
self.assertEqual(id_token1['auth_time'], id_token2['auth_time'])
|
|
self.assertEqual(id_token1.get('azp'), id_token2.get('azp'))
|
|
|
|
# Refresh token can't be reused
|
|
post_data = self._refresh_token_post_data(response_dic1['refresh_token'])
|
|
response = self._post_request(post_data)
|
|
self.assertIn('invalid_grant', response.content.decode('utf-8'))
|
|
|
|
# Old access token is invalidated
|
|
self.assertEqual(self._get_userinfo(response_dic1['access_token']).status_code, 401)
|
|
self.assertEqual(self._get_userinfo(response_dic2['access_token']).status_code, 200)
|
|
|
|
# Empty refresh token is invalid
|
|
post_data = self._refresh_token_post_data('')
|
|
response = self._post_request(post_data)
|
|
self.assertIn('invalid_grant', response.content.decode('utf-8'))
|
|
|
|
# No refresh token is invalid
|
|
post_data = self._refresh_token_post_data('')
|
|
del post_data['refresh_token']
|
|
response = self._post_request(post_data)
|
|
self.assertIn('invalid_grant', response.content.decode('utf-8'))
|
|
|
|
def test_client_redirect_uri(self):
|
|
"""
|
|
Validate that client redirect URIs exactly match registered
|
|
URIs, and that unregistered URIs or URIs with query parameters are rejected.
|
|
See http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest and
|
|
http://openid.net/specs/openid-connect-core-1_0.html#HybridTokenRequest.
|
|
"""
|
|
code = self._create_code()
|
|
post_data = self._auth_code_post_data(code=code.code)
|
|
|
|
# Unregistered URI
|
|
post_data['redirect_uri'] = 'http://invalid.example.org'
|
|
|
|
response = self._post_request(post_data)
|
|
self.assertIn('invalid_client', response.content.decode('utf-8'))
|
|
|
|
# Registered URI, but with query string appended
|
|
post_data['redirect_uri'] = self.client.default_redirect_uri + '?foo=bar'
|
|
|
|
response = self._post_request(post_data)
|
|
self.assertIn('invalid_client', response.content.decode('utf-8'))
|
|
|
|
# Registered URI
|
|
post_data['redirect_uri'] = self.client.default_redirect_uri
|
|
|
|
response = self._post_request(post_data)
|
|
self.assertNotIn('invalid_client', response.content.decode('utf-8'))
|
|
|
|
def test_request_methods(self):
|
|
"""
|
|
Client sends an HTTP POST request to the Token Endpoint. Other request
|
|
methods MUST NOT be allowed.
|
|
"""
|
|
url = reverse('oidc_provider:token')
|
|
|
|
requests = [
|
|
self.factory.get(url),
|
|
self.factory.put(url),
|
|
self.factory.delete(url),
|
|
]
|
|
|
|
for request in requests:
|
|
response = TokenView.as_view()(request)
|
|
|
|
self.assertEqual(
|
|
response.status_code, 405,
|
|
msg=request.method + ' request does not return a 405 status.')
|
|
|
|
request = self.factory.post(url)
|
|
|
|
response = TokenView.as_view()(request)
|
|
|
|
self.assertEqual(
|
|
response.status_code, 400,
|
|
msg=request.method + ' request does not return a 400 status.')
|
|
|
|
def test_client_authentication(self):
|
|
"""
|
|
The authorization server support including the
|
|
client credentials in the request-body using the `client_id` and
|
|
`client_secret`parameters.
|
|
|
|
See: http://tools.ietf.org/html/rfc6749#section-2.3.1
|
|
"""
|
|
code = self._create_code()
|
|
|
|
# Test a valid request to the token endpoint.
|
|
post_data = self._auth_code_post_data(code=code.code)
|
|
|
|
response = self._post_request(post_data)
|
|
|
|
self.assertNotIn(
|
|
'invalid_client',
|
|
response.content.decode('utf-8'),
|
|
msg='Client authentication fails using request-body credentials.')
|
|
|
|
# Now, test with an invalid client_id.
|
|
invalid_data = post_data.copy()
|
|
invalid_data['client_id'] = self.client.client_id * 2 # Fake id.
|
|
|
|
# Create another grant code.
|
|
code = self._create_code()
|
|
invalid_data['code'] = code.code
|
|
|
|
response = self._post_request(invalid_data)
|
|
|
|
self.assertIn(
|
|
'invalid_client',
|
|
response.content.decode('utf-8'),
|
|
msg='Client authentication success with an invalid "client_id".')
|
|
|
|
# Now, test using HTTP Basic Authentication method.
|
|
basicauth_data = post_data.copy()
|
|
|
|
# Create another grant code.
|
|
code = self._create_code()
|
|
basicauth_data['code'] = code.code
|
|
|
|
del basicauth_data['client_id']
|
|
del basicauth_data['client_secret']
|
|
|
|
response = self._post_request(basicauth_data, self._password_grant_auth_header())
|
|
response.content.decode('utf-8')
|
|
|
|
self.assertNotIn(
|
|
'invalid_client',
|
|
response.content.decode('utf-8'),
|
|
msg='Client authentication fails using HTTP Basic Auth.')
|
|
|
|
def test_access_token_contains_nonce(self):
|
|
"""
|
|
If present in the Authentication Request, Authorization Servers MUST
|
|
include a nonce Claim in the ID Token with the Claim Value being the
|
|
nonce value sent in the Authentication Request.
|
|
If the client does not supply a nonce parameter, it SHOULD not be
|
|
included in the `id_token`.
|
|
|
|
See http://openid.net/specs/openid-connect-core-1_0.html#IDToken
|
|
"""
|
|
code = self._create_code()
|
|
|
|
post_data = self._auth_code_post_data(code=code.code)
|
|
|
|
response = self._post_request(post_data)
|
|
|
|
response_dic = json.loads(response.content.decode('utf-8'))
|
|
id_token = JWT().unpack(response_dic['id_token'].encode('utf-8')).payload()
|
|
|
|
self.assertEqual(id_token.get('nonce'), FAKE_NONCE)
|
|
|
|
# Client does not supply a nonce parameter.
|
|
code.nonce = ''
|
|
code.save()
|
|
|
|
response = self._post_request(post_data)
|
|
response_dic = json.loads(response.content.decode('utf-8'))
|
|
|
|
id_token = JWT().unpack(response_dic['id_token'].encode('utf-8')).payload()
|
|
|
|
self.assertEqual(id_token.get('nonce'), None)
|
|
|
|
def test_id_token_contains_at_hash(self):
|
|
"""
|
|
If access_token is included, the id_token SHOULD contain an at_hash.
|
|
"""
|
|
code = self._create_code()
|
|
|
|
post_data = self._auth_code_post_data(code=code.code)
|
|
|
|
response = self._post_request(post_data)
|
|
|
|
response_dic = json.loads(response.content.decode('utf-8'))
|
|
id_token = JWT().unpack(response_dic['id_token'].encode('utf-8')).payload()
|
|
|
|
self.assertTrue(id_token.get('at_hash'))
|
|
|
|
def test_idtoken_sign_validation(self):
|
|
"""
|
|
We MUST validate the signature of the ID Token according to JWS
|
|
using the algorithm specified in the alg Header Parameter of
|
|
the JOSE Header.
|
|
"""
|
|
SIGKEYS = self._get_keys()
|
|
RSAKEYS = [k for k in SIGKEYS if k.kty == 'RSA']
|
|
|
|
code = self._create_code()
|
|
|
|
post_data = self._auth_code_post_data(code=code.code)
|
|
|
|
response = self._post_request(post_data)
|
|
response_dic = json.loads(response.content.decode('utf-8'))
|
|
|
|
JWS().verify_compact(response_dic['id_token'].encode('utf-8'), RSAKEYS)
|
|
|
|
@override_settings(
|
|
OIDC_IDTOKEN_SUB_GENERATOR='oidc_provider.tests.app.utils.fake_sub_generator')
|
|
def test_custom_sub_generator(self):
|
|
"""
|
|
Test custom function for setting OIDC_IDTOKEN_SUB_GENERATOR.
|
|
"""
|
|
code = self._create_code()
|
|
|
|
post_data = self._auth_code_post_data(code=code.code)
|
|
|
|
response = self._post_request(post_data)
|
|
|
|
response_dic = json.loads(response.content.decode('utf-8'))
|
|
id_token = JWT().unpack(response_dic['id_token'].encode('utf-8')).payload()
|
|
|
|
self.assertEqual(id_token.get('sub'), self.user.email)
|
|
|
|
@override_settings(
|
|
OIDC_IDTOKEN_PROCESSING_HOOK='oidc_provider.tests.app.utils.fake_idtoken_processing_hook')
|
|
def test_additional_idtoken_processing_hook(self):
|
|
"""
|
|
Test custom function for setting OIDC_IDTOKEN_PROCESSING_HOOK.
|
|
"""
|
|
code = self._create_code()
|
|
|
|
post_data = self._auth_code_post_data(code=code.code)
|
|
|
|
response = self._post_request(post_data)
|
|
|
|
response_dic = json.loads(response.content.decode('utf-8'))
|
|
id_token = JWT().unpack(response_dic['id_token'].encode('utf-8')).payload()
|
|
|
|
self.assertEqual(id_token.get('test_idtoken_processing_hook'), FAKE_RANDOM_STRING)
|
|
self.assertEqual(id_token.get('test_idtoken_processing_hook_user_email'), self.user.email)
|
|
|
|
@override_settings(
|
|
OIDC_IDTOKEN_PROCESSING_HOOK=(
|
|
'oidc_provider.tests.app.utils.fake_idtoken_processing_hook',
|
|
)
|
|
)
|
|
def test_additional_idtoken_processing_hook_one_element_in_tuple(self):
|
|
"""
|
|
Test custom function for setting OIDC_IDTOKEN_PROCESSING_HOOK.
|
|
"""
|
|
code = self._create_code()
|
|
|
|
post_data = self._auth_code_post_data(code=code.code)
|
|
|
|
response = self._post_request(post_data)
|
|
|
|
response_dic = json.loads(response.content.decode('utf-8'))
|
|
id_token = JWT().unpack(response_dic['id_token'].encode('utf-8')).payload()
|
|
|
|
self.assertEqual(id_token.get('test_idtoken_processing_hook'), FAKE_RANDOM_STRING)
|
|
self.assertEqual(id_token.get('test_idtoken_processing_hook_user_email'), self.user.email)
|
|
|
|
@override_settings(
|
|
OIDC_IDTOKEN_PROCESSING_HOOK=[
|
|
'oidc_provider.tests.app.utils.fake_idtoken_processing_hook',
|
|
]
|
|
)
|
|
def test_additional_idtoken_processing_hook_one_element_in_list(self):
|
|
"""
|
|
Test custom function for setting OIDC_IDTOKEN_PROCESSING_HOOK.
|
|
"""
|
|
code = self._create_code()
|
|
|
|
post_data = self._auth_code_post_data(code=code.code)
|
|
|
|
response = self._post_request(post_data)
|
|
|
|
response_dic = json.loads(response.content.decode('utf-8'))
|
|
id_token = JWT().unpack(response_dic['id_token'].encode('utf-8')).payload()
|
|
|
|
self.assertEqual(id_token.get('test_idtoken_processing_hook'), FAKE_RANDOM_STRING)
|
|
self.assertEqual(id_token.get('test_idtoken_processing_hook_user_email'), self.user.email)
|
|
|
|
@override_settings(
|
|
OIDC_IDTOKEN_PROCESSING_HOOK=[
|
|
'oidc_provider.tests.app.utils.fake_idtoken_processing_hook',
|
|
'oidc_provider.tests.app.utils.fake_idtoken_processing_hook2',
|
|
]
|
|
)
|
|
def test_additional_idtoken_processing_hook_two_elements_in_list(self):
|
|
"""
|
|
Test custom function for setting OIDC_IDTOKEN_PROCESSING_HOOK.
|
|
"""
|
|
code = self._create_code()
|
|
|
|
post_data = self._auth_code_post_data(code=code.code)
|
|
|
|
response = self._post_request(post_data)
|
|
|
|
response_dic = json.loads(response.content.decode('utf-8'))
|
|
id_token = JWT().unpack(response_dic['id_token'].encode('utf-8')).payload()
|
|
|
|
self.assertEqual(id_token.get('test_idtoken_processing_hook'), FAKE_RANDOM_STRING)
|
|
self.assertEqual(id_token.get('test_idtoken_processing_hook_user_email'), self.user.email)
|
|
|
|
self.assertEqual(id_token.get('test_idtoken_processing_hook2'), FAKE_RANDOM_STRING)
|
|
self.assertEqual(id_token.get('test_idtoken_processing_hook_user_email2'), self.user.email)
|
|
|
|
@override_settings(
|
|
OIDC_IDTOKEN_PROCESSING_HOOK=(
|
|
'oidc_provider.tests.app.utils.fake_idtoken_processing_hook',
|
|
'oidc_provider.tests.app.utils.fake_idtoken_processing_hook2',
|
|
)
|
|
)
|
|
def test_additional_idtoken_processing_hook_two_elements_in_tuple(self):
|
|
"""
|
|
Test custom function for setting OIDC_IDTOKEN_PROCESSING_HOOK.
|
|
"""
|
|
code = self._create_code()
|
|
|
|
post_data = self._auth_code_post_data(code=code.code)
|
|
|
|
response = self._post_request(post_data)
|
|
|
|
response_dic = json.loads(response.content.decode('utf-8'))
|
|
id_token = JWT().unpack(response_dic['id_token'].encode('utf-8')).payload()
|
|
|
|
self.assertEqual(id_token.get('test_idtoken_processing_hook'), FAKE_RANDOM_STRING)
|
|
self.assertEqual(id_token.get('test_idtoken_processing_hook_user_email'), self.user.email)
|
|
|
|
self.assertEqual(id_token.get('test_idtoken_processing_hook2'), FAKE_RANDOM_STRING)
|
|
self.assertEqual(id_token.get('test_idtoken_processing_hook_user_email2'), self.user.email)
|
|
|
|
def test_pkce_parameters(self):
|
|
"""
|
|
Test Proof Key for Code Exchange by OAuth Public Clients.
|
|
https://tools.ietf.org/html/rfc7636
|
|
"""
|
|
code = create_code(user=self.user, client=self.client,
|
|
scope=['openid', 'email'], nonce=FAKE_NONCE, is_authentication=True,
|
|
code_challenge=FAKE_CODE_CHALLENGE, code_challenge_method='S256')
|
|
code.save()
|
|
|
|
post_data = self._auth_code_post_data(code=code.code)
|
|
|
|
# Add parameters.
|
|
post_data['code_verifier'] = FAKE_CODE_VERIFIER
|
|
|
|
response = self._post_request(post_data)
|
|
|
|
json.loads(response.content.decode('utf-8'))
|
|
|
|
def test_client_credentials_grant_type(self):
|
|
fake_scopes_list = ['scopeone', 'scopetwo']
|
|
|
|
# Add scope for this client.
|
|
self.client.scope = fake_scopes_list
|
|
self.client.save()
|
|
|
|
post_data = {
|
|
'client_id': self.client.client_id,
|
|
'client_secret': self.client.client_secret,
|
|
'grant_type': 'client_credentials',
|
|
}
|
|
response = self._post_request(post_data)
|
|
response_dict = json.loads(response.content.decode('utf-8'))
|
|
|
|
# Ensure access token exists in the response, also check if scopes are
|
|
# the ones we registered previously.
|
|
self.assertTrue('access_token' in response_dict)
|
|
self.assertEqual(' '.join(fake_scopes_list), response_dict['scope'])
|
|
|
|
# Create a protected resource and test the access_token.
|
|
|
|
@require_http_methods(['GET'])
|
|
@protected_resource_view(fake_scopes_list)
|
|
def protected_api(request, *args, **kwargs):
|
|
return JsonResponse({'protected': 'information'}, status=200)
|
|
|
|
# Deploy view on some url. So, base url could be anything.
|
|
request = self.factory.get(
|
|
'/api/protected/?access_token={0}'.format(response_dict['access_token']))
|
|
response = protected_api(request)
|
|
response_dict = json.loads(response.content.decode('utf-8'))
|
|
|
|
self.assertEqual(response.status_code, 200)
|
|
self.assertTrue('protected' in response_dict)
|
|
|
|
# Protected resource test ends here.
|
|
|
|
# Clean scopes for this client.
|
|
self.client.scope = ''
|
|
self.client.save()
|
|
|
|
response = self._post_request(post_data)
|
|
response_dict = json.loads(response.content.decode('utf-8'))
|
|
|
|
# It should fail when client does not have any scope added.
|
|
self.assertEqual(400, response.status_code)
|
|
self.assertEqual('invalid_scope', response_dict['error'])
|