Remplace AES encryption with database. For saving PKCE parameters.

This commit is contained in:
Ignacio Fiorentino 2016-04-07 16:18:47 -03:00
parent b1b8247cb0
commit e495d6c41d
6 changed files with 63 additions and 23 deletions

View file

@ -1,4 +1,4 @@
from base64 import b64decode, urlsafe_b64encode from base64 import b64decode, urlsafe_b64decode, urlsafe_b64encode
import hashlib import hashlib
import logging import logging
import re import re
@ -73,10 +73,11 @@ class TokenEndpoint(object):
logger.debug('[Token] Client does not exist: %s', self.params.client_id) logger.debug('[Token] Client does not exist: %s', self.params.client_id)
raise TokenError('invalid_client') raise TokenError('invalid_client')
if not (self.client.client_secret == self.params.client_secret): if self.client.client_type == 'confidential':
logger.debug('[Token] Invalid client secret: client %s do not have secret %s', if not (self.client.client_secret == self.params.client_secret):
self.client.client_id, self.client.client_secret) logger.debug('[Token] Invalid client secret: client %s do not have secret %s',
raise TokenError('invalid_client') self.client.client_id, self.client.client_secret)
raise TokenError('invalid_client')
if self.params.grant_type == 'authorization_code': if self.params.grant_type == 'authorization_code':
if not (self.params.redirect_uri in self.client.redirect_uris): if not (self.params.redirect_uri in self.client.redirect_uris):
@ -97,16 +98,13 @@ class TokenEndpoint(object):
# Validate PKCE parameters. # Validate PKCE parameters.
if self.params.code_verifier: if self.params.code_verifier:
obj = AES.new(hashlib.md5(django_settings.SECRET_KEY).hexdigest(), AES.MODE_CBC) if self.code.code_challenge_method == 'S256':
code_challenge, code_challenge_method = tuple(obj.decrypt(self.code.code.decode('hex')).split(':'))
if code_challenge_method == 'S256':
new_code_challenge = urlsafe_b64encode(hashlib.sha256(self.params.code_verifier.encode('ascii')).digest()).replace('=', '') new_code_challenge = urlsafe_b64encode(hashlib.sha256(self.params.code_verifier.encode('ascii')).digest()).replace('=', '')
else: else:
new_code_challenge = self.params.code_verifier new_code_challenge = self.params.code_verifier
# TODO: We should explain the error. # TODO: We should explain the error.
if not (new_code_challenge == code_challenge): if not (new_code_challenge == self.code.code_challenge):
raise TokenError('invalid_grant') raise TokenError('invalid_grant')
elif self.params.grant_type == 'refresh_token': elif self.params.grant_type == 'refresh_token':

View file

@ -1,10 +1,9 @@
from base64 import urlsafe_b64decode, urlsafe_b64encode
from datetime import timedelta from datetime import timedelta
import time import time
import uuid import uuid
from Crypto.Cipher import AES
from Crypto.PublicKey.RSA import importKey from Crypto.PublicKey.RSA import importKey
from django.conf import settings as django_settings
from django.utils import timezone from django.utils import timezone
from hashlib import md5 from hashlib import md5
from jwkest.jwk import RSAKey as jwk_RSAKey from jwkest.jwk import RSAKey as jwk_RSAKey
@ -108,16 +107,11 @@ def create_code(user, client, scope, nonce, is_authentication,
code.user = user code.user = user
code.client = client code.client = client
if not code_challenge: code.code = uuid.uuid4().hex
code.code = uuid.uuid4().hex
else: if code_challenge and code_challenge_method:
obj = AES.new(md5(django_settings.SECRET_KEY).hexdigest(), AES.MODE_CBC) code.code_challenge = code_challenge
code.code_challenge_method = code_challenge_method
# Default is 'plain' method.
code_challenge_method = 'plain' if not code_challenge_method else code_challenge_method
ciphertext = obj.encrypt(code_challenge + ':' + code_challenge_method)
code.code = ciphertext.encode('hex')
code.expires_at = timezone.now() + timedelta( code.expires_at = timezone.now() + timedelta(
seconds=settings.get('OIDC_CODE_EXPIRE')) seconds=settings.get('OIDC_CODE_EXPIRE'))

View file

@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.9 on 2016-04-07 19:12
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('oidc_provider', '0012_auto_20160405_2041'),
]
operations = [
migrations.AddField(
model_name='code',
name='code_challenge',
field=models.CharField(max_length=255, null=True),
),
migrations.AddField(
model_name='code',
name='code_challenge_method',
field=models.CharField(max_length=255, null=True),
),
]

View file

@ -86,6 +86,8 @@ class Code(BaseCodeTokenModel):
code = models.CharField(max_length=255, unique=True) code = models.CharField(max_length=255, unique=True)
nonce = models.CharField(max_length=255, blank=True, default='') nonce = models.CharField(max_length=255, blank=True, default='')
is_authentication = models.BooleanField(default=False) is_authentication = models.BooleanField(default=False)
code_challenge = models.CharField(max_length=255, null=True)
code_challenge_method = models.CharField(max_length=255, null=True)
class Meta: class Meta:
verbose_name = _(u'Authorization Code') verbose_name = _(u'Authorization Code')

View file

@ -13,7 +13,8 @@ from oidc_provider.models import *
FAKE_NONCE = 'cb584e44c43ed6bd0bc2d9c7e242837d' FAKE_NONCE = 'cb584e44c43ed6bd0bc2d9c7e242837d'
FAKE_RANDOM_STRING = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(32)) FAKE_RANDOM_STRING = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(32))
FAKE_CODE_CHALLENGE = 'pG6flQqJa7INfIKb5cZVAXhTqvTKehIck6aQhdUuyWc' FAKE_CODE_CHALLENGE = 'YlYXEqXuRm-Xgi2BOUiK50JW1KsGTX6F1TDnZSC8VTg'
FAKE_CODE_VERIFIER = 'SmxGa0XueyNh5bDgTcSrqzAh2_FmXEqU8kDT6CuXicw'
def create_fake_user(): def create_fake_user():

View file

@ -445,3 +445,23 @@ class TokenTestCase(TestCase):
self.assertEqual(id_token.get('test_idtoken_processing_hook2'), FAKE_RANDOM_STRING) self.assertEqual(id_token.get('test_idtoken_processing_hook2'), FAKE_RANDOM_STRING)
self.assertEqual(id_token.get('test_idtoken_processing_hook_user_email2'), self.user.email) self.assertEqual(id_token.get('test_idtoken_processing_hook_user_email2'), self.user.email)
def test_pkce_parameters(self):
"""
Test Proof Key for Code Exchange by OAuth Public Clients.
https://tools.ietf.org/html/rfc7636
"""
import pdb;pdb.set_trace()
code = create_code(user=self.user, client=self.client,
scope=['openid', 'email'], nonce=FAKE_NONCE, is_authentication=True,
code_challenge=FAKE_CODE_CHALLENGE, code_challenge_method='S256')
code.save()
post_data = self._auth_code_post_data(code=code.code)
# Add parameters.
post_data['code_verifier'] = FAKE_CODE_VERIFIER
response = self._post_request(post_data)
response_dic = json.loads(response.content.decode('utf-8'))