Merge pull request #144 from uhavin/feature-granttype-password

Feature granttype password
This commit is contained in:
Juan Ignacio Fiorentino 2016-12-12 14:25:10 -03:00 committed by GitHub
commit fc6241d3d0
2 changed files with 136 additions and 23 deletions

View file

@ -2,7 +2,7 @@ from base64 import b64decode, urlsafe_b64encode
import hashlib import hashlib
import logging import logging
import re import re
from django.contrib.auth import authenticate
from oidc_provider.lib.utils.common import cleanup_url_from_query_string from oidc_provider.lib.utils.common import cleanup_url_from_query_string
try: try:
@ -34,6 +34,7 @@ class TokenEndpoint(object):
def __init__(self, request): def __init__(self, request):
self.request = request self.request = request
self.params = {} self.params = {}
self.user = None
self._extract_params() self._extract_params()
def _extract_params(self): def _extract_params(self):
@ -122,23 +123,15 @@ class TokenEndpoint(object):
raise TokenError('invalid_grant') raise TokenError('invalid_grant')
elif self.params['grant_type'] == 'password': elif self.params['grant_type'] == 'password':
from django.contrib.auth import authenticate user = authenticate(
user = authenticate(username=self.params['username'], password=self.params['password']) username=self.params['username'],
password=self.params['password']
)
if not user: if not user:
raise TokenError('Invalid user credentials') raise TokenError('Invalid user credentials')
self.token = create_token(user, self.client, self.params['scope'].split(' ')) self.user = user
self.token.id_token = create_id_token(
user=user,
aud=self.client.client_id,
nonce='self.code.nonce',
at_hash=self.token.at_hash,
request=self.request,
scope=self.params['scope'],
)
self.token.save()
elif self.params['grant_type'] == 'refresh_token': elif self.params['grant_type'] == 'refresh_token':
if not self.params['refresh_token']: if not self.params['refresh_token']:
@ -163,7 +156,30 @@ class TokenEndpoint(object):
elif self.params['grant_type'] == 'refresh_token': elif self.params['grant_type'] == 'refresh_token':
return self.create_refresh_response_dic() return self.create_refresh_response_dic()
elif self.params['grant_type'] == 'password': elif self.params['grant_type'] == 'password':
return {'access_token': self.token.access_token} return self.create_access_token_response_dic()
def create_access_token_response_dic(self):
token = create_token(
self.user,
self.client,
self.params['scope'].split(' '))
token.id_token = create_id_token(
user=self.user,
aud=self.client.client_id,
nonce='self.code.nonce',
at_hash=token.at_hash,
request=self.request,
scope=self.params['scope'],
)
token.save()
return {
'access_token': token.access_token,
'refresh_token': token.refresh_token,
'expires_in': settings.get('OIDC_TOKEN_EXPIRE'),
'token_type': 'bearer'
}
def create_code_response_dic(self): def create_code_response_dic(self):
token = create_token( token = create_token(

View file

@ -19,7 +19,7 @@ from django.utils import timezone
from jwkest.jwk import KEYS from jwkest.jwk import KEYS
from jwkest.jws import JWS from jwkest.jws import JWS
from jwkest.jwt import JWT from jwkest.jwt import JWT
from mock import patch from mock import patch, Mock
from oidc_provider.lib.utils.token import create_code from oidc_provider.lib.utils.token import create_code
from oidc_provider.models import Token from oidc_provider.models import Token
@ -51,6 +51,14 @@ class TokenTestCase(TestCase):
self.user = create_fake_user() self.user = create_fake_user()
self.client = create_fake_client(response_type='code') self.client = create_fake_client(response_type='code')
def _password_grant_post_data(self):
return {
'username': 'johndoe',
'password': '1234',
'grant_type': 'password',
'scope': 'openid email',
}
def _auth_code_post_data(self, code): def _auth_code_post_data(self, code):
""" """
All the data that will be POSTed to the Token Endpoint. All the data that will be POSTed to the Token Endpoint.
@ -128,6 +136,100 @@ class TokenTestCase(TestCase):
return userinfo(request) return userinfo(request)
def _auth_header(self):
user_pass = self.client.client_id + ':' + self.client.client_secret
auth = b'Basic ' + b64encode(user_pass.encode('utf-8'))
auth_header = {'HTTP_AUTHORIZATION': auth.decode('utf-8')}
return auth_header
# Resource Owner Password Credentials Grant
# requirements to satisfy in all test_password_grant methods
# https://tools.ietf.org/html/rfc6749#section-4.3.2
#
# grant_type
# REQUIRED. Value MUST be set to "password".
# username
# REQUIRED. The resource owner username.
# password
# REQUIRED. The resource owner password.
# scope
# OPTIONAL. The scope of the access request as described by
# Section 3.3.
#
# The authorization server MUST:
# o require client authentication for confidential clients or for any
# client that was issued client credentials (or with other
# authentication requirements),
# o authenticate the client if client authentication is included, and
# o validate the resource owner password credentials using its
# existing password validation algorithm.
def test_password_grant_get_access_token_without_scope(self):
post_data = self._password_grant_post_data()
del (post_data['scope'])
response = self._post_request(
post_data=post_data,
extras=self._auth_header()
)
response_dict = json.loads(response.content.decode('utf-8'))
self.assertIn('access_token', response_dict)
def test_password_grant_get_access_token_with_scope(self):
response = self._post_request(
post_data=self._password_grant_post_data(),
extras=self._auth_header()
)
response_dict = json.loads(response.content.decode('utf-8'))
self.assertIn('access_token', response_dict)
def test_password_grant_get_access_token_invalid_user_credentials(self):
invalid_post = self._password_grant_post_data()
invalid_post['password'] = 'wrong!'
response = self._post_request(
post_data=invalid_post,
extras=self._auth_header()
)
self.assertEqual(400, response.status_code)
def test_password_grant_get_access_token_invalid_client_credentials(self):
self.client.client_id = 'foo'
self.client.client_secret = 'bar'
response = self._post_request(
post_data=self._password_grant_post_data(),
extras=self._auth_header()
)
self.assertEqual(400, response.status_code)
@patch('oidc_provider.lib.utils.token.uuid')
@override_settings(OIDC_TOKEN_EXPIRE=120)
def test_password_grant_full_response(self, mock_uuid):
test_hex = 'fake_token'
mock_uuid4 = Mock(spec=uuid.uuid4)
mock_uuid4.hex = test_hex
mock_uuid.uuid4.return_value = mock_uuid4
response = self._post_request(
post_data=self._password_grant_post_data(),
extras=self._auth_header()
)
response_dict = json.loads(response.content.decode('utf-8'))
expected_response_dic = {
"access_token": 'fake_token',
"refresh_token": 'fake_token',
"expires_in": 120,
"token_type": "bearer",
}
self.assertDictEqual(expected_response_dic, response_dict)
@override_settings(OIDC_TOKEN_EXPIRE=720) @override_settings(OIDC_TOKEN_EXPIRE=720)
def test_authorization_code(self): def test_authorization_code(self):
""" """
@ -307,12 +409,7 @@ class TokenTestCase(TestCase):
del basicauth_data['client_id'] del basicauth_data['client_id']
del basicauth_data['client_secret'] del basicauth_data['client_secret']
# Generate HTTP Basic Auth header with id and secret. response = self._post_request(basicauth_data, self._auth_header())
user_pass = self.client.client_id + ':' + self.client.client_secret
auth_header = b'Basic ' + b64encode(user_pass.encode('utf-8'))
response = self._post_request(basicauth_data, {
'HTTP_AUTHORIZATION': auth_header.decode('utf-8'),
})
response.content.decode('utf-8') response.content.decode('utf-8')
self.assertEqual('invalid_client' in response.content.decode('utf-8'), self.assertEqual('invalid_client' in response.content.decode('utf-8'),