django-oidc-provider/oidc_provider/lib/utils/token.py

160 lines
4.2 KiB
Python
Raw Normal View History

2015-01-08 20:55:24 +00:00
from datetime import timedelta
import time
2015-03-02 20:37:54 +00:00
import uuid
2016-10-03 15:54:54 +00:00
from Cryptodome.PublicKey.RSA import importKey
from django.utils import dateformat, timezone
from jwkest.jwk import RSAKey as jwk_RSAKey
2016-03-22 19:17:56 +00:00
from jwkest.jwk import SYMKey
from jwkest.jws import JWS
from jwkest.jwt import JWT
2015-03-02 20:37:54 +00:00
from oidc_provider.lib.utils.common import get_issuer
from oidc_provider.models import (
Code,
RSAKey,
Token,
)
2015-02-18 18:07:22 +00:00
from oidc_provider import settings
2015-01-08 20:55:24 +00:00
2017-08-08 22:41:42 +00:00
def create_id_token(user, aud, nonce='', at_hash='', request=None, scope=None):
"""
Creates the id_token dictionary.
2015-01-08 20:55:24 +00:00
See: http://openid.net/specs/openid-connect-core-1_0.html#IDToken
Return a dic.
"""
2017-08-08 22:41:42 +00:00
if scope is None:
scope = []
sub = settings.get('OIDC_IDTOKEN_SUB_GENERATOR', import_str=True)(user=user)
2015-02-26 19:14:36 +00:00
expires_in = settings.get('OIDC_IDTOKEN_EXPIRE')
2015-01-08 20:55:24 +00:00
# Convert datetimes into timestamps.
now = int(time.time())
iat_time = now
exp_time = int(now + expires_in)
user_auth_time = user.last_login or user.date_joined
auth_time = int(dateformat.format(user_auth_time, 'U'))
2015-01-08 20:55:24 +00:00
dic = {
2016-05-25 21:58:58 +00:00
'iss': get_issuer(request=request),
2015-03-02 20:37:54 +00:00
'sub': sub,
'aud': str(aud),
2015-01-08 20:55:24 +00:00
'exp': exp_time,
'iat': iat_time,
'auth_time': auth_time,
2015-01-08 20:55:24 +00:00
}
if nonce:
dic['nonce'] = str(nonce)
if at_hash:
dic['at_hash'] = at_hash
2016-09-09 16:10:12 +00:00
if ('email' in scope) and getattr(user, 'email', None):
dic['email'] = user.email
processing_hook = settings.get('OIDC_IDTOKEN_PROCESSING_HOOK')
if isinstance(processing_hook, (list, tuple)):
for hook in processing_hook:
dic = settings.import_from_str(hook)(dic, user=user)
else:
dic = settings.import_from_str(processing_hook)(dic, user=user)
2015-01-08 20:55:24 +00:00
return dic
2017-08-08 22:41:42 +00:00
2016-03-22 19:17:56 +00:00
def encode_id_token(payload, client):
"""
2015-01-08 20:55:24 +00:00
Represent the ID Token as a JSON Web Token (JWT).
Return a hash.
"""
keys = get_client_alg_keys(client)
_jws = JWS(payload, alg=client.jwt_alg)
return _jws.sign_compact(keys)
2015-01-08 20:55:24 +00:00
2017-08-08 22:41:42 +00:00
def decode_id_token(token, client):
"""
Represent the ID Token as a JSON Web Token (JWT).
Return a hash.
"""
keys = get_client_alg_keys(client)
return JWS().verify_compact(token, keys=keys)
2017-08-08 22:41:42 +00:00
def client_id_from_id_token(id_token):
"""
Extracts the client id from a JSON Web Token (JWT).
Returns a string or None.
"""
payload = JWT().unpack(id_token).payload()
return payload.get('aud', None)
2017-08-08 22:41:42 +00:00
def create_token(user, client, scope, id_token_dic=None):
"""
2015-01-08 20:55:24 +00:00
Create and populate a Token object.
Return a Token object.
"""
2015-01-08 20:55:24 +00:00
token = Token()
token.user = user
token.client = client
token.access_token = uuid.uuid4().hex
if id_token_dic is not None:
token.id_token = id_token_dic
2015-01-08 20:55:24 +00:00
token.refresh_token = uuid.uuid4().hex
token.expires_at = timezone.now() + timedelta(
2015-02-26 19:14:36 +00:00
seconds=settings.get('OIDC_TOKEN_EXPIRE'))
2015-01-08 20:55:24 +00:00
token.scope = scope
return token
2017-08-08 22:41:42 +00:00
2016-04-06 21:03:30 +00:00
def create_code(user, client, scope, nonce, is_authentication,
code_challenge=None, code_challenge_method=None):
"""
Create and populate a Code object.
Return a Code object.
"""
code = Code()
code.user = user
code.client = client
2016-04-06 21:03:30 +00:00
code.code = uuid.uuid4().hex
if code_challenge and code_challenge_method:
code.code_challenge = code_challenge
code.code_challenge_method = code_challenge_method
2016-04-06 21:03:30 +00:00
code.expires_at = timezone.now() + timedelta(
seconds=settings.get('OIDC_CODE_EXPIRE'))
code.scope = scope
code.nonce = nonce
2016-02-16 20:33:12 +00:00
code.is_authentication = is_authentication
return code
2017-08-08 22:41:42 +00:00
def get_client_alg_keys(client):
"""
Takes a client and returns the set of keys associated with it.
Returns a list of keys.
"""
if client.jwt_alg == 'RS256':
keys = []
for rsakey in RSAKey.objects.all():
keys.append(jwk_RSAKey(key=importKey(rsakey.key), kid=rsakey.kid))
if not keys:
raise Exception('You must add at least one RSA Key.')
elif client.jwt_alg == 'HS256':
keys = [SYMKey(key=client.client_secret, alg=client.jwt_alg)]
else:
raise Exception('Unsupported key algorithm.')
return keys